nodefluent / kafka-streams

equivalent to kafka-streams :octopus: for nodejs :sparkles::turtle::rocket::sparkles:
https://nodefluent.github.io/kafka-streams/
MIT License
830 stars 111 forks source link

Confluent / Avro support? #131

Open codeburke opened 5 years ago

codeburke commented 5 years ago

Apologies if I missed it anywhere, but is there any plan for implementing support for the Confluent protocol using Avro SerDes? I looked in both node-sinek and this library and didn't see any mention of that anywhere.

yarncraft commented 5 years ago

Had the same question, I tried to open the SSL/SASL example: https://github.com/nodefluent/node-sinek/tree/master/sasl-ssl-example but it is down. Could you guys add an example on how to use this framework with Confluent Cloud (not the local deployed version).

elimenko commented 4 years ago

This issue is still actual, block us from establishing connection to confluent cloud. Are there any updates how to connect using SSL/SASL? Searching through the source code still didn't help

yarncraft commented 4 years ago

I managed to connect with the confluent cloud through the following way:

config.js

const debug = require("debug");
const path = require("path");

const logger = {
  debug: debug("sinek:debug"),
  info: debug("sinek:info"),
  warn: debug("sinek:warn"),
  error: debug("sinek:error")
};

const consumerConfig = {
  logger,
  noptions: {
    debug: "all",
    "metadata.broker.list": "***.gcp.confluent.cloud:9092",
    "group.id": "example-group",
    "enable.auto.commit": false,
    event_cb: true,
    "compression.codec": "none",
    "retry.backoff.ms": 200,
    "message.send.max.retries": 10,
    "socket.keepalive.enable": true,
    "queue.buffering.max.messages": 100000,
    "queue.buffering.max.ms": 1000,
    "batch.num.messages": 1000000,

    "security.protocol": "sasl_ssl",
    "sasl.mechanisms": "PLAIN",
    "sasl.username": "***",
    "sasl.password": "***",
    "api.version.request": true
  },
  tconf: {
    "auto.offset.reset": "latest"
  }
};

const producerConfig = {
  logger,
  noptions: {
    //"debug": "all",
    "metadata.broker.list": "***.gcp.confluent.cloud:9092",
    "client.id": "example-client",
    event_cb: true,
    "compression.codec": "none",
    "retry.backoff.ms": 200,
    "message.send.max.retries": 10,
    "socket.keepalive.enable": true,
    "queue.buffering.max.messages": 100000,
    "queue.buffering.max.ms": 1000,
    "batch.num.messages": 1000000,

    "security.protocol": "sasl_ssl",
    "sasl.mechanisms": "PLAIN",
    "sasl.username": "***",
    "sasl.password": "***",
    "api.version.request": true
  },
  tconf: {
    "request.required.acks": 1
  }
};

module.exports = {
  consumerConfig,
  producerConfig
};

consumer.js

const { KafkaStreams } = require("../kafkastreams.js");
const { consumerConfig: config } = require("../config.js");

const kafkaStreams = new KafkaStreams(config);
const kafkaTopicName = "***";
const stream = kafkaStreams.getKStream(kafkaTopicName);

Whereby *** is filled in with the correct details

yarncraft commented 4 years ago

Furthermore for avro deserialisation in Node.js you can write your own deserializer given the avro schemas with: https://github.com/mtth/avsc

Until this feature is built-in you might just want to use this as a step in your streaming pipeline.