ballerina-platform / ballerina-library

The Ballerina Library
https://ballerina.io/learn/api-docs/ballerina/
Apache License 2.0
136 stars 58 forks source link

Improve kafka transactions #2854

Open dilanSachi opened 2 years ago

dilanSachi commented 2 years ago

Description: For now, kafka supports transactions in the following way.

transaction {
    string[] messages = consumer->pollPayload(1);
    check producer->send({ topic: topic, value: messages[0], partition: 0});
    check consumer->'commit();
    check commit;
}

We need to support read-process-write behaviour atomically for the transactions by using the APIs, kafkaProducer.sendOffsetsToTransaction().

Need to check on this more and will update here with more info.

Ref https://www.baeldung.com/kafka-exactly-once https://www.confluent.io/blog/transactions-apache-kafka/

dilanSachi commented 2 years ago

In the current implementatoin for an instance like this,

transaction {
    string[] messages = consumer->pollPayload(1);
    check producer->send({ topic: topic, value: messages[0], partition: 0});
    check commit;
}

We create a KafkaTransactionContext when the kafka:Producer object is created by checking the transactionId config of the producer. Therefore, we don't have access to the kafka:Consumer in order to get the consumed offsets by the consumer. Therefore, we are not able to use the transaction java api kafkaProducer.sendOffsetsToTransaction() to atomically execute the read-process-write scenario