All works well for non-transactional producers, and almost everything works as expected for transactional. Except one thing. Using a custom set of settings via rd_kafka_topic_conf_new + rd_kafka_topic_conf_set works until the producer is transactional and acks is provided with value different than all, causing a crash.
This makes sense as transactional producer requires idempotence and when trying to create it without correct acks I will get the expected:
`acks` must be set to `all` when `enable.idempotence`
However, when I provide acks via rd_kafka_topic_conf_set and then pass the reference from rd_kafka_topic_conf_new to rd_kafka_topic_new and then the topic reference via:
Create per topic config and use it as I described above
Try to send a message
I can reproduce it consistently.
Checklist
IMPORTANT: We will close issues where the checklist has not been completed.
Please provide the following information:
[x] librdkafka version (release number or git tag): 2.3.0
[x] Apache Kafka version: confluentinc/cp-kafka:7.6.1 with KRaft
[x] librdkafka client configuration: defaults
[x] Operating system: Linux 5.15.0-105-generic #115-Ubuntu SMP Mon Apr 15 09:52:04 UTC 2024 x86_64 x86_64 x86_64 GNU/Linux
[x] Provide logs (with debug=.. as necessary) from librdkafka
[x] Provide broker log excerpts
[x] Critical issue NO - I can bypass that by implementing validations at a higher level
Broker log is pretty chill about this:
kafka | [2024-05-07 12:30:49,792] INFO [Broker id=1] Leader 07328bbb-53f3-4773-aec5-2e267c5ee7ef-0 with topic id Some(JORL-9WgRFuoJ_G7d_kzwQ) starts at leader epoch 0 from offset 0 with partition epoch 0, high watermark 0, ISR [1], adding replicas [] and removing replicas [] . Previous leader None and previous leader epoch was -1. (state.change.logger)
kafka | [2024-05-07 12:30:50,864] INFO [TransactionCoordinator id=1] Initialized transactionalId 9d01cf3c-ea98-43b1-be24-b73e52eaa07d with producerId 340 and producer epoch 0 on partition __transaction_state-9 (kafka.coordinator.transaction.TransactionCoordinator)
Proposed solution
This should be validated in the context of librdkafka producer instance and changing acks should not be allowed for transactional producers.
I'm working on implementing support for per-topic configuration properties for the producer (https://karafka.io/docs/Librdkafka-Configuration/#topic-configuration-properties) to the rdkafka Ruby bindings: https://github.com/karafka/rdkafka-ruby/pull/449
All works well for non-transactional producers, and almost everything works as expected for transactional. Except one thing. Using a custom set of settings via
rd_kafka_topic_conf_new
+rd_kafka_topic_conf_set
works until the producer is transactional andacks
is provided with value different thanall
, causing a crash.This makes sense as transactional producer requires idempotence and when trying to create it without correct
acks
I will get the expected:However, when I provide
acks
viard_kafka_topic_conf_set
and then pass the reference fromrd_kafka_topic_conf_new
tord_kafka_topic_new
and then the topic reference via:I get the following crash (part of trace removed for readability):
How to reproduce
I can reproduce it consistently.
Checklist
IMPORTANT: We will close issues where the checklist has not been completed.
Please provide the following information:
2.3.0
confluentinc/cp-kafka:7.6.1 with KRaft
defaults
Linux 5.15.0-105-generic #115-Ubuntu SMP Mon Apr 15 09:52:04 UTC 2024 x86_64 x86_64 x86_64 GNU/Linux
debug=..
as necessary) from librdkafkaBroker log is pretty chill about this:
Proposed solution
This should be validated in the context of librdkafka producer instance and changing
acks
should not be allowed for transactional producers.