tulios / kafkajs

A modern Apache Kafka client for node.js
https://kafka.js.org
MIT License
3.7k stars 523 forks source link

Commiting offset. #384

Open AlexKonstantinov1991 opened 5 years ago

AlexKonstantinov1991 commented 5 years ago

Hello, i want to commit an offset for a consumer, but i'm getting an error: KafkaJSNonRetriableError: Must provide transactional id for transactional producer

Also i would like there to be a method for consumer, smth like commit(offset: number)

My code looks smth like:

class MyController {
  async created() {
    this.kafka = new Kafka({
      clientId: 'my-app',
      brokers: ['localhost:9092']
    });
    this.consumer = this.kafka.consumer({ groupId: 'my-group'});
    this.producer = this.kafka.producer({
      idempotent: true,
      retry: {
        maxInFlightRequests: 1,
      }
    });
    await this.consumer.connect();
    await this.producer.connect();
    await this.consumer.subscribe({
      topic: 'my-topic',
      fromBeginning: true,
    });
    await this.consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        console.log({
          partition,
          offset: message.offset,
          value: message.value.toString(),
        });

        let transaction: Transaction | undefined;

        try {
          transaction = await this.producer.transaction();
          await transaction.sendOffsets({
            consumerGroupId: 'my-group',
            topics: [{
              topic: 'my-topic',
              partitions: [{
                partition: 0,
                offset: message.offset.toString(),
              }]
            }],
          });
          await transaction.commit();
        } catch (e) {
          if (!transaction) {
            console.log(e);
          } else {
            await transaction.abort();
          }
        }
      },
      autoCommit: false,
    });
  }
}
JaapRood commented 5 years ago

KafkaJSNonRetriableError: Must provide transactional id for transactional producer is thrown because each producer that uses transactions must be assigned a transactionalId.

this.producer = this.kafka.producer({
      idempotent: true,
      transactionalId: 'some-id-you-determine-yourself',
      retry: {
        maxInFlightRequests: 1,
      }
    });

The purpose of this ID is basically fault detection: if another producer with the same transactionalId starts a transaction, the existing transaction is aborted (citation needed). This is to make sure that if apps crash with open transactions, when the app is rebooted, all previous messages can be aborted and the processing can be retried safely.

Given your example, and your use of eachMessage, it might be useful to set the transactionalId to a combination of the topic and group name, e.g. 'my-group--my-topic'. That way, if your app crashes and restarts, the previous transaction is aborted. Note that this does mean you can only run a single instance of this app at any given time. Harder to setup, but the safest way, would be to have a producer per partition that you're processing and use the partition as part of the transactionalId, as this is the unit of concurrency within Kafka. Depends on your needs!

For committing directly with the consumer, have a look at it #378, as I think it proposes what you're after.

JaapRood commented 5 years ago

Reviewing the documentation, there is actually nothing to explain the above! I might have some time this week to fix that.

AlexKonstantinov1991 commented 5 years ago

Reviewing the documentation, there is actually nothing to explain the above! I might have some time this week to fix that.

Hello!

There is still no information in documentation about that. Just to remind.

JaapRood commented 5 years ago

Fair enough, too! Unfortunately it went down my list of priorities before I could get it done, also because there seem to be other issues with exactly-once producing that should probably be fixed first.

eliw00d commented 5 years ago

@JaapRood What are the other issues with exactly-once producing? We are thinking of switching to transactions but if it won't work as intended we might hold off.

JaapRood commented 5 years ago

@eliw00d I only tried it once, while there were also some other bugs in our own processing framework and kafkajs (that since have been resolved), so I could never pin it down to something specific I could recreate. That said, I also never tried again, with priorities going elsewhere. This thread on the KafkaJS Slack has the few details:

Basically, all of our tests for consumption are timing-out as soon as we create a producer with idempotent: true and maxInFlightRequests set to 1

mariojunior commented 3 years ago

Hello guys. Any conclusion here? I'm getting this "Must provide transactional id for transactional producer" error moving from producer.send to transaction.send method. Then, I saw the answer from @JaapRood about the "transactionalId" argument when creating a new producer. Done that, now I'm getting an error message: "Transactional Id authorization failed".

Is it a client config issue? Or, maybe, is that related to any Server Broker config???

Thanks.

mariojunior commented 3 years ago

Nevermind... it worked. My "Transactionl ID" must start with the same resource's name configured on Kafka ACLs definition. Also, was need a slight change on the resource's permission for the operation "Write", adding a "type: transactionalId". After that, my producer was able to send and commit messages to the broker.

Thanks.

megakoresh commented 2 years ago

This is still not documented but this issue can be closed in favor of https://github.com/tulios/kafkajs/issues/789

eliw00d commented 2 years ago

@JaapRood Came back to this as we're looking into transactions again. You mentioned:

Harder to setup, but the safest way, would be to have a producer per partition that you're processing and use the partition as part of the transactionalId, as this is the unit of concurrency within Kafka.

Since the recommendation seems to be using partition should there be a function you can pass to the producer to make the transactional ID from information the producer has, like partition? Otherwise, it seems like you would have to use a custom partitioner or something like that to know which partition the producer is writing to, correct?

JaapRood commented 2 years ago

Since the recommendation seems to be using partition should there be a function you can pass to the producer to make the transactional ID from information the producer has, like partition? Otherwise, it seems like you would have to use a custom partitioner or something like that to know which partition the producer is writing to, correct?

If I remember correctly (and this is a while ago) we would setup a Producer for every partition that was assigned to a consumer following a rebalance, so didn't need any of the mechanisms you describe. The partition, I believe, is in reference to the partition of the topic we were consuming from, not producing too. But, to be honest, it's been nearly two years since I've touched or had to live with that codebase, so I'm fuzzy on the details.