Open Anshshah-1609 opened 2 years ago
const { Kafka } = require('kafkajs'); const client = new Kafka({ clientId: "transactional_client", brokers: ["localhost:9092", "localhost:9093", "localhost:9094"], retry: { initialRetryTime: 100, retries: 8 } }); const producer = client.producer({ transactionalId: 'my-transactional-producer', maxInFlightRequests: 1, idempotent: true }); const transactionFunction = async () => { const transaction = await producer.transaction() try { setInterval(async () => { await transaction.send({ topic: 'transaction_topic', messages: [{ key: 'my_transaction', value: 'this is my first kafka transaction' }] }); await transaction.commit() }, 2000); } catch (e) { await transaction.abort(); } } transactionFunction();
When I am running kafka server locally and trying to run this file with nodemon filename it shows producer disconnected after 2 3 seconds.
const { Kafka } = require('kafkajs'); const client = new Kafka({ clientId: "transactional_client", brokers: ["localhost:9092", "localhost:9093", "localhost:9094"], retry: { initialRetryTime: 100, retries: 8 } }); const producer = client.producer({ transactionalId: 'my-transactional-producer', maxInFlightRequests: 1, idempotent: true }); const transactionFunction = async () => { const transaction = await producer.transaction() try { setInterval(async () => { await transaction.send({ topic: 'transaction_topic', messages: [{ key: 'my_transaction', value: 'this is my first kafka transaction' }] }); await transaction.commit() }, 2000); } catch (e) { await transaction.abort(); } } transactionFunction();