doctor3030 / winstonKafka

MIT License
1 stars 1 forks source link

Getting this error `throw new KafkaJSError('The producer is disconnected')` when trying to publish logs to kafka #2

Open EggsyOnCode opened 3 months ago

EggsyOnCode commented 3 months ago

Stack Trace for reference:

        throw new KafkaJSError('The producer is disconnected')
              ^
KafkaJSError: The producer is disconnected
    at validateConnectionStatus (/home/xen/Desktop/code/dist-tx-activity/node_modules/kafkajs/src/producer/messageProducer.js:31:15)
    at sendBatch (/home/xen/Desktop/code/dist-tx-activity/node_modules/kafkajs/src/producer/messageProducer.js:82:5)
    at Object.send (/home/xen/Desktop/code/dist-tx-activity/node_modules/kafkajs/src/producer/messageProducer.js:120:12)
    at KafkaTransport.logToKafka (/home/xen/Desktop/code/dist-tx-activity/node_modules/winston-logger-kafka/lib/logger.js:133:35)
    at KafkaTransport.log (/home/xen/Desktop/code/dist-tx-activity/node_modules/winston-logger-kafka/lib/logger.js:139:14)
    at KafkaTransport._write (/home/xen/Desktop/code/dist-tx-activity/node_modules/winston-transport/modern.js:82:19)
    at doWrite (/home/xen/Desktop/code/dist-tx-activity/node_modules/readable-stream/lib/_stream_writable.js:390:139)
    at writeOrBuffer (/home/xen/Desktop/code/dist-tx-activity/node_modules/readable-stream/lib/_stream_writable.js:381:5)
    at KafkaTransport.Writable.write (/home/xen/Desktop/code/dist-tx-activity/node_modules/readable-stream/lib/_stream_writable.js:302:11)
    at DerivedLogger.ondata (/home/xen/Desktop/code/dist-tx-activity/node_modules/readable-stream/lib/_stream_readable.js:629:20) {
  retriable: true,
  helpUrl: undefined
}

The error seems to be an initialization issue in logger.js inside the construcotor of KafkaTransport

class KafkaTransport extends Transport {
    constructor(kafkaConfig) {
        super(Transport);
        if (!(kafkaConfig === null || kafkaConfig === void 0 ? void 0 : kafkaConfig.clientConfig)) {
            throw new Error(`[winston kafka transport] kafka client config required.`);
        }
        if (!(kafkaConfig === null || kafkaConfig === void 0 ? void 0 : kafkaConfig.sinkTopic)) {
            throw new Error(`[winston kafka transport] sink topic required.`);
        }
        // if (kafkaConfig && kafkaConfig.clientConfig && kafkaConfig.sinkTopic) {
        this._config = kafkaConfig;
        this._kafkaProducer = new kafkajs.Kafka(kafkaConfig.clientConfig).producer(kafkaConfig.producerConfig);
        this._kafkaProducer.connect().then((_) => {
            console.log('Logger connected to Kafka.');
        });
        this._sinkTopic = kafkaConfig.sinkTopic;
        // }
        // else {
        //     this._kafkaProducer = new kafkajs.Kafka({brokers: ['localhost:9092']}).producer();
        //     this._kafkaProducer.connect().then((_) => {
        //         console.log('Logger connected to Kafka.');
        //     });
        //     this._sinkTopic = 'test_topic';
        // }
    }
doctor3030 commented 3 months ago

See my answer to the previous issue. Note the "sleep" to allow kafka producer to connect/disconnect gracefully.