tulios / kafkajs

A modern Apache Kafka client for node.js
https://kafka.js.org
MIT License
3.7k stars 523 forks source link

Retry on CONCURRENT_TRANSACTIONS error for an idempotent transactional producer? #296

Closed plameniv closed 5 years ago

plameniv commented 5 years ago

We are in the process of testing out different edge cases for the EOS implementation. One edge case we came across is: what happens if a service crashes before committing/aborting a transaction and it comes back up creating a new transaction before the previous transaction timeout elapsed. We currently see a CONCURRENT_TRANSACTIONS error which crashes the service.

What would be the best way to handle this scenario:

  1. the producer should have a way to abort the previous transaction so it doesn't have to wait the timeout period,
  2. kafkajs would automatically abort the previous transaction and retry the new transaction,
  3. kafkajs doesn’t abort but retries the new transaction until success or
  4. the current behavior where it bubbles up as an error out of the producer and the caller of the producer has to do the retry itself (or handle the error in other ways)

Reproducer:

import * as ip from "ip";
import * as kafkajs from "kafkajs";
function sleep(ms: number) { return new Promise((resolve) => setTimeout(resolve, ms)); }

const waitTimeInSeconds = 60; // MODIFY THIS (60+ seconds required for transaction to timeout)

const id: number = Date.now();
const transactionalId = "transactional-id";
const host = process.env.HOST_IP || ip.address();
const broker: string = `${host}:9092`;
const topic = `test-topic-${id}`;
const acks = -1;
const maxInFlightRequests = 1;
const connectionTimeout = 1000;
const key = `sink-1-${id}`;
const payload1 = "1001";
const payload2 = "1002";
const messages = [
    { key, value: payload1 },
    { key, value: payload2 },
];
let adminInitialized = false;

async function doWrite(noCommit: boolean) {
    const client = new kafkajs.Kafka({
        clientId: "kafka-sink",
        brokers: [broker],
        connectionTimeout,
        maxInFlightRequests,
    });

    if (!adminInitialized) {
        const admin = client.admin();
        try {
            await admin.connect();
            await admin.createTopics({
                waitForLeaders: true,
                topics: [{ topic, numPartitions: 4, configEntries: [] }],
            });
        } finally {
            await admin.disconnect();
        }
        adminInitialized = true;
    }

    const producer = client.producer({
        // An idempotent producer enforces EoS messaging
        // idempotent: transactional ? true : false,
        idempotent: true,
        transactionalId,
    });
    await producer.connect();

    const transaction: kafkajs.Transaction = await producer.transaction();
    try {
        await transaction.send({
            acks,
            topic,
            messages,
        });
    } catch (e) {
        try {
            await transaction.abort();
        } catch (e) {
            throw e;
        }
        throw e;
    }

    if (!noCommit) {
        await transaction.commit();
        await producer.disconnect();
    }
}

async function doAll() {
    console.log("First doWrite");
    await doWrite(true);
    console.log(`Wait time of ${waitTimeInSeconds} seconds started: `);
    for (let ii = 0; ii < waitTimeInSeconds; ii++) {
        await sleep(1000);
        if ((ii + 1) % 5 === 0) {
            console.log(`${ii + 1} seconds passed`);
        }
    }
    console.log("Second doWrite");
    try {
        await doWrite(false);
    } catch (e) {
        console.log("ERROR:\n", e);
    }
    console.log("Done with doWrites");
}

doAll();
tulios commented 5 years ago

Hi @plameniv, thanks for the great report. Based on the options you presented, which one is the best for your use case?

sklose commented 5 years ago

I think from our point of view option 2 (kafkajs would automatically abort the previous transaction and retry the new transaction) would make most sense. I can't think of a scenario where the incomplete transaction could be continued, so automatically aborting and then retrying the new transaction seems to be sensible to me. Curious to hear your thoughts on it, maybe I am missing some use case.

tulios commented 5 years ago

Hi, @sklose @plameniv I took some time to talk with other library developers, and we agree that transactions are short-lived entities and KafkaJS should handle the error, abort the transactions and automatically create a new one, basically option 2.

I think this is the last piece of our 1.5.0 release; I'll get this done and get back to you.