nodefluent / sequelize-kafka-connect

:gem: nodejs kafka connect connector for MySQL, Postgres, SQLite and MSSQL
MIT License
39 stars 10 forks source link

unable to connect fromconsumer to DB2 #71

Open workingMids opened 4 years ago

workingMids commented 4 years ago

Hi, I am using this connector to read messages from consumer and write those into DB2 with some checks. I am calling the runSinkConnector function by pasing config & implemented the etl function but it's not being called.


var config = {
  kafka: {
    //zkConStr: "localhost:2181/",
    kafkaHost: brokaerlist,
    logger: null,
    groupId: 'kafka',
    clientName: "kc-sequelize-test-name",
    workerPerPartition: 1,
    options: {
      sessionTimeout: 8000,
      protocol: ["roundrobin"],
      fromOffset: "earliest", //latest
      fetchMaxBytes: 1024 * 100,
      fetchMinBytes: 1,
      fetchMaxWaitMs: 10,
      heartbeatInterval: 250,
      retryMinTimeout: 250,
      requireAcks: 1,
      //ackTimeoutMs: 100,
      //partitionerType: 3
    }
  },
  topic: topic,
  partitions: 1,
  maxTasks: 1,
  pollInterval: 2000,
  produceKeyed: true,
  produceCompressionType: 0,
  connector: {
    options: {
      host: "XXXX",
      port: 50001,
      dialect: "sqlite",
      pool: {
        max: 5,
        min: 0,
        idle: 10000
      },
      storage: null
    },
    database: "XXX",
    user: "XXXX",
    password: "XXXX",
    maxPollCount: 50,
    table: "table nameXXXX",
    incrementingColumnName: "id"
  },
  http: {
    port: 8000,
    middlewares: []
  },
  enableMetrics: true,
  batch: {
    batchSize: 100,
    commitEveryNBatch: 1,
    concurrency: 1,
    commitSync: true
  }
};
const etlFunc = (messageValue, callback) => {

  //type is an example json format field
  if (messageValue.type === "publish") {
    console.log(messageValue)
    return callback(null, {

      id: messageValue.payload.id,
      name: messageValue.payload.name
    });
  }

  if (messageValue.type === "unpublish") {
    return callback(null, null); //null value will cause deletion
  }

  callback(new Error("unknown messageValue.type"));
};

const converter = ConverterFactory.createSinkSchemaConverter(tableSchema, etlFunc);

runSinkConnector(config, [converter], onError).then(config => {
  //runs forever until: config.stop();
});
var consumer = new Kafka.KafkaConsumer({
  'group.id': 'kafka',
  'metadata.broker.list': config1.broker_List,
  'security.protocol': 'SASL_SSL',
  'ssl.endpoint.identification.algorithm': 'https',
  'sasl.mechanism': 'PLAIN',
  'sasl.username': config1.sasl_username,
  'sasl.password': config1.sasl_password,
  'enable.auto.commit': false,
  'rebalance_cb': function (err, assignment) {

    if (err.code === Kafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) {
      // Note: this can throw when you are disconnected. Take care and wrap it in
      // a try catch if that matters to you
      this.assign(assignment);
    } else if (err.code == Kafka.CODES.ERRORS.ERR__REVOKE_PARTITIONS) {
      // Same as above
      this.unassign();
    } else {
      // We had a real error
      console.error(err);
    }

  }
}, {
});
consumer.on('data', async function (data) {

  if (data && typeof data !== 'undefined') {
    try {

      consumer.pause([topic]);
      // function to persist the data
      await processMessage(data, data.offset);
      consumer.commit();
      consumer.resume([topic]);
    } catch (error) {
      console.log('data consuming error', error);
    }
  }
});
```processMessage is function to strore records into DB2 with some checks.this message is being called but query is not executing.
Am I using this connector in right way?
I didn't understand how to use connector for my task.
Could you write  more detail about use custom broker and describe your test example.