streamnative / kop

Kafka-on-Pulsar - A protocol handler that brings native Kafka protocol to Apache Pulsar
https://streamnative.io/docs/kop
Apache License 2.0
447 stars 132 forks source link

[BUG] Failed with librdkafka when enable idempotence option #2008

Open NiuBlibing opened 9 months ago

NiuBlibing commented 9 months ago

Describe the bug When set enable.idempotence true, the librdkafka will fail with the following

% Running producer loop. Press Ctrl-C to exit
%0|1695637496.333|FATAL|rdkafka#producer-1| [thrd:main]: Fatal error: Local: Required feature not supported by broker: Idempotent producer not supported by any of the 1 connected broker(s): requires Apache Kafka broker version >= 0.11.0
% Failed to produce to topic persistent://test/test-ns/testtopic: Local: Fatal error
% Flushing outstanding messages..
% Error: _FATAL: Fatal error: Local: Required feature not supported by broker: Idempotent producer not supported by any of the 1 connected broker(s): requires Apache Kafka broker version >= 0.11.0
% FATAL ERROR: _UNSUPPORTED_FEATURE: Idempotent producer not supported by any of the 1 connected broker(s): requires Apache Kafka broker version >= 0.11.0
% Terminating on fatal error
% Message delivery failed: Local: Purged in queue
% 1 message(s) produced, 0 delivered, 1 failed

To Reproduce Steps to reproduce the behavior:

  1. use librdkafka's example idempotent_producer.c, set user name and password(I use sasl for kop)
  2. idempotent_producer

Expected behavior Support enable.idempotence option

Additional context pulsar version 3.0.0 kop version: 3.0.0.4 librdkafka: v1.9.2 broker.conf:

messagingProtocols=kafka
protocolHandlerDirectory=./protocols
allowAutoTopicCreationType=partitioned
kafkaListeners=PLAINTEXT://localhost:9092
kafkaAdvertisedListeners=PLAINTEXT://localhost:9092
brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
brokerDeleteInactiveTopicsEnabled=false
kafkaTransactionCoordinatorEnabled=true
brokerDeduplicationEnabled=true
kafkaBrokerId=1
saslAllowedMechanisms=PLAIN

idempotent_producer.patch

diff --git a/examples/idempotent_producer.c b/examples/idempotent_producer.c
index 1e799eaf..7c14fb6f 100644
--- a/examples/idempotent_producer.c
+++ b/examples/idempotent_producer.c
@@ -197,6 +197,35 @@ int main(int argc, char **argv) {
                 return 1;
         }

+        if (rd_kafka_conf_set(conf, "security.protocol", "SASL_PLAINTEXT",
+                          errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
+        {
+            fprintf(stderr, "%s\n", errstr);
+            rd_kafka_conf_destroy(conf);
+            return 1;
+        }
+            if (rd_kafka_conf_set(conf, "sasl.mechanisms", "PLAIN",
+                          errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
+    {
+        fprintf(stderr, "%s\n", errstr);
+        rd_kafka_conf_destroy(conf);
+        return 1;
+    }
+    if (rd_kafka_conf_set(conf, "sasl.username", "test/test-ns",
+                          errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
+    {
+        fprintf(stderr, "%s\n", errstr);
+        rd_kafka_conf_destroy(conf);
+        return 1;
+    }
+    if (rd_kafka_conf_set(conf, "sasl.password", "token:123456",
+                          errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
+    {
+        fprintf(stderr, "%s\n", errstr);
+        rd_kafka_conf_destroy(conf);
+        return 1;
+    }
+
         /* Set the delivery report callback.
          * This callback will be called once per message to inform
          * the application if delivery succeeded or failed.
BewareMyPower commented 8 months ago

@Demogorgon314 Could you help with this issue?