Blizzard / node-rdkafka

Node.js bindings for librdkafka
MIT License
2.1k stars 390 forks source link

Transactions: ReadyNotAcked error #1003

Open BryanDollery opened 1 year ago

BryanDollery commented 1 year ago

I'm trying to work with transactions so that I can consume a message and publish new messages in response, all in a single tx. I get an error when I try to init the transaction, and an error when I try to start the tx. Unfortunately, there isn't much in the way of useful info in the error messages, other than:

{
  "message":"Operation not valid in state ReadyNotAcked",
  "code":-172,
  "errno":-172,
  "origin":"kafka"
}

Environment Information

code Sorry, this is quite long:

import { v4 as uuid } from "uuid";
import Kafka from "node-rdkafka";
import { log } from "@bryandollery/simple-logging";

const BusUrl = "localhost:9092";
const Group = `tenants.router.1`;

log.setCtx('test');

const main = async _ => {
  try {
    const producer = await producerConnect();
    const consumer = await consumerConnect();
    await consume(consumer, producer);
  } catch (e) {
    log.error('Event adaptor: main', { cause: e });
  }
};

const handle = async (message, consumer, producer) => handleEvent(JSON.parse(message.value.toString()), consumer, producer);

const handleEvent = async (event, consumer, producer) => {
  const cloudEventData = event.data;

  try {
    producer.beginTransaction(err => log.debug("Started transaction", err));

    publishResults([], [], cloudEventData);
    producer.sendOffsetsToTransaction(event.offset, consumer, 500, err => log.debug("Sent offsets to transaction error", err));
    producer.commitTransaction(500, err => log.debug("Committed transaction", err));
  } catch (e) {
    log.error("Error handling event", event, { ...e });
    producer.abortTransaction(500, err => log.debug("Aborted transaction error", err));
  }
};

const publishResults = async (event, commands, cloudEvent) => {
  for (const thing of [...event, ...commands]) {
    const newEvent = {
      specversion: "1.0",
      datacontenttype: "application/json",
      tenantId: cloudEvent.tenantId,
      type: thing.type,
      subtype: thing.subtype,
      source: `tenants`,
      subject: thing.id,
      time: new Date().toISOString(),
      correlationId: cloudEvent?.correlationId ? `${cloudEvent.correlationId}-${uuid()}` : uuid(),
      data: thing
    };

    await producer.produce(`${`${cloudEvent.tenantId}-tenants`}`, null, Buffer.from(JSON.stringify(newEvent)));
  }
};

const getName = tenant => `${tenant}-tenants`;

const consumerConnect = _ => {
  const consumer = new Kafka.KafkaConsumer({
    'metadata.broker.list': BusUrl,
    'group.id': Group,
    'enable.auto.commit': false
  });

  consumer.on('event.log', logEntry => {
    log.debug('Consumer: log', logEntry);
  });

  consumer.connect();

  return new Promise((resolve, reject) => {
    consumer.on('ready', function (arg) {
      log.debug('Consumer: ready', arg);
      resolve(consumer);
    });

    consumer.on('event.error', function (err) {
      log.error('Consumer: error', err);
      reject(err);
    });
  });

};

const consume = async (consumer, producer) => {
  consumer.subscribe(["tenants-1000"]);
  consumer.on("data", async message => await handle(message, consumer, producer));
  consumer.on("warning", warning => log.warn("Consumer warning", warning));
  consumer.on("disconnected", event => log.info("Consumer disconnected", event));
  consumer.on("error", error => log.error("Consumer error", error));

  await consumer.consume();
  log.debug(`Consuming`, (await consumer.getMetadata()) || "no metadata");
};

const producerConnect = async _ => {
  try {
    const producer = new Kafka.Producer({
      "bootstrap.servers": BusUrl,
      "transactional.id": uuid(),
      'debug': 'all'
    });

    producer.connect();

    return new Promise((resolve, reject) => {
      producer.on("ready", () => {
        log.debug(`Producer: connected`);
        producer.initTransactions(500, err => log.error("Producer: initTransactions error", err));
        resolve(producer);
      });

      producer.on('event.error', function (err) {
        log.error('Producer: Error', err);
        reject(err);
      });
    });
  } catch (e) {
    const err = { message: `FATAL ERROR: Event adaptor Producer connection failed`, type: "error", subtype: "fatal", source: "producerConnect", cause: e };
    log.error(err);
    process.kill(process.pid, "SIGINT");
  }
};

main();

My package.json only has dependencies on node-rdkafka, uuid, and "@bryandollery/simple-logging": "^2.1.0" and, I run it with yarn test.

I use the vscode plugin "Tools for Apache Kafka" to test this with the following file:

PRODUCER keyed-message
topic: tenants-1000
key: 100000
{
  "specversion": "1.0",
  "datacontenttype": "application/json",
  "TenantId": "1000",
  "type": "event",
  "name": "orderPlaced",
  "entity": "tenant",
  "source": "test",
  "subject": 100000,
  "time": "2021-03-03T14:00:00.000Z",
  "correlationId": 10,
  "data": {
    "id": 100000,
    "name": "Tenant 1000",
    "description": "Tenant 1000",
    "created": "2021-03-03T14:00:00.000Z",
    "updated": "2021-03-03T14:00:00.000Z"
  }
}

Startup gives me this output:

{"message":"Producer: connected","ctx":"test","level":"DEBUG","date":1674702576574}
{"message":"Consumer: ready","data":[{"name":"rdkafka#consumer-2"}],"ctx":"test","level":"DEBUG","date":1674702576581}
{"message":"Consuming","data":["no metadata"],"ctx":"test","level":"DEBUG","date":1674702576582}
{"message":"Producer: initTransactions error","data":[null],"ctx":"test","level":"ERROR","date":1674702576583}

And, sending the message adds this output:

{"message":"Error handling event","data":[{"specversion":"1.0","datacontenttype":"application/json","TenantId":"1000","type":"event","name":"orderPlaced","entity":"tenant","source":"test","subject":100000,"time":"2021-03-03T14:00:00.000Z","correlationId":10,"data":{"id":100000,"name":"Tenant 1000","description":"Tenant 1000","created":"2021-03-03T14:00:00.000Z","updated":"2021-03-03T14:00:00.000Z"}},{}],"ctx":"test","level":"ERROR","date":1674701794946}
{"message":"Started transaction","data":[{"message":"Operation not valid in state ReadyNotAcked","code":-172,"errno":-172,"origin":"kafka"}],"ctx":"test","level":"DEBUG","date":1674701794948}
{"message":"Aborted transaction","data":[{"message":"Operation not valid in state ReadyNotAcked","code":-172,"errno":-172,"origin":"kafka","isFatal":false,"isRetriable":false,"isTxnRequiresAbort":false}],"ctx":"test","level":"DEBUG","date":1674701794949}
raman20 commented 1 year ago

I have encountered the same issue, possibly there is some issue in your producer/consumer config. In my case given topic was not created.

one query => are producer/consumer methods work async or sync?

BryanDollery commented 1 year ago

My methods are all async, but the lib uses callbacks. With this test code, I create the topic manually, otherwise I get a different error.