karafka / rdkafka-ruby

Modern and performant Kafka client library for Ruby based on librdkafka
https://karafka.io
Other
354 stars 120 forks source link

OAUTHBEARER OIDC support is not working since v0.16.0 #505

Closed bachmanity1 closed 1 month ago

bachmanity1 commented 1 month ago

The OAUTHBEARER OIDC support that was introduced in https://github.com/confluentinc/librdkafka/pull/3560 is working fine in versions up to v0.15.0, but it's broken since version v0.16.0.

I'm using the code shown below:

gem 'rdkafka', '0.15.0'
require 'rdkafka'

# credentials & endpoints not shown for security reasons

config = {
  :"bootstrap.servers" => "localhost:10992",
  :"security.protocol" => "SASL_SSL",
  :"sasl.mechanism" => "OAUTHBEARER",
  :"sasl.oauthbearer.method" => "OIDC",
  :"sasl.oauthbearer.token.endpoint.url" => "token-endpoint",
  :"sasl.oauthbearer.scope" => "scope",
  :"sasl.oauthbearer.client.id" => "client-id",
  :"sasl.oauthbearer.client.secret" => "client-secret",
  :"ssl.ca.location" => "ca.crt",
  :"message.max.bytes" => 20971520,
  :"acks" => "all",
  :"debug" => "broker,topic,msg,conf",
}

producer = Rdkafka::Config.new(config).producer
topic = "test"

puts "Starting to produce messages..."
5.times do |i|
  message = "Test message from ruby #{i}"
  puts "Producing message: #{message}"
  producer.produce(
    topic:   topic,
    payload: message,
    key:     "key-#{i}"
  ).wait
end
Click to expand logs
D, [2024-09-08T23:42:21.749531 #30000] DEBUG -- : rdkafka: [thrd::0/internal]: :0/internal: Enter main broker thread
D, [2024-09-08T23:42:21.749567 #30000] DEBUG -- : rdkafka: [thrd:app]: sasl_ssl://kafka-external-test.localhost:10992/bootstrap: Added new broker with NodeId -1
D, [2024-09-08T23:42:21.749578 #30000] DEBUG -- : rdkafka: [thrd:app]: sasl_ssl://kafka-external-test.localhost:10992/bootstrap: Selected for cluster connection: bootstrap servers added (broker has 0 connection attempt(s))
D, [2024-09-08T23:42:21.749587 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-external-test.localhost]: sasl_ssl://kafka-external-test.localhost:10992/bootstrap: Enter main broker thread
D, [2024-09-08T23:42:21.749595 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-external-test.localhost]: sasl_ssl://kafka-external-test.localhost:10992/bootstrap: Received CONNECT op
D, [2024-09-08T23:42:21.749603 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-external-test.localhost]: sasl_ssl://kafka-external-test.localhost:10992/bootstrap: Broker changed state INIT -> TRY_CONNECT
D, [2024-09-08T23:42:21.749619 #30000] DEBUG -- : rdkafka: [thrd:app]: librdkafka v2.3.0 (0x20300ff) rdkafka#producer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,sasl_oauthbearer,http,oidc, GCC GXX OSXLD LIBDL PLUGINS ZLIB SSL SASL_CYRUS CURL HDRHISTOGRAM SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER OAUTHBEARER_OIDC, debug 0x40046)
D, [2024-09-08T23:42:21.749628 #30000] DEBUG -- : rdkafka: [thrd:app]: Client configuration:
D, [2024-09-08T23:42:21.749636 #30000] DEBUG -- : rdkafka: [thrd:app]:   client.software.version = 2.3.0
D, [2024-09-08T23:42:21.749643 #30000] DEBUG -- : rdkafka: [thrd:app]:   metadata.broker.list = kafka-external-test.localhost:10992
D, [2024-09-08T23:42:21.749652 #30000] DEBUG -- : rdkafka: [thrd:app]:   message.max.bytes = 20971520
D, [2024-09-08T23:42:21.749659 #30000] DEBUG -- : rdkafka: [thrd:app]:   debug = broker,topic,msg,conf
D, [2024-09-08T23:42:21.749666 #30000] DEBUG -- : rdkafka: [thrd:app]:   error_cb = 0x1027941f0
D, [2024-09-08T23:42:21.749673 #30000] DEBUG -- : rdkafka: [thrd:app]:   stats_cb = 0x1027941e0
D, [2024-09-08T23:42:21.749680 #30000] DEBUG -- : rdkafka: [thrd:app]:   log_cb = 0x1027941d0
D, [2024-09-08T23:42:21.749687 #30000] DEBUG -- : rdkafka: [thrd:app]:   log.queue = true
D, [2024-09-08T23:42:21.749694 #30000] DEBUG -- : rdkafka: [thrd:app]:   opaque = 0x8c
D, [2024-09-08T23:42:21.749704 #30000] DEBUG -- : rdkafka: [thrd:app]:   default_topic_conf = 0x11dfb8840
D, [2024-09-08T23:42:21.749724 #30000] DEBUG -- : rdkafka: [thrd:app]:   api.version.request = true
D, [2024-09-08T23:42:21.749734 #30000] DEBUG -- : rdkafka: [thrd:app]:   security.protocol = sasl_ssl
D, [2024-09-08T23:42:21.749742 #30000] DEBUG -- : rdkafka: [thrd:app]:   ssl.ca.location = ca.crt
D, [2024-09-08T23:42:21.749751 #30000] DEBUG -- : rdkafka: [thrd:app]:   sasl.mechanisms = OAUTHBEARER
D, [2024-09-08T23:42:21.749762 #30000] DEBUG -- : rdkafka: [thrd:app]:   oauthbearer_token_refresh_cb = 0x11e101090
D, [2024-09-08T23:42:21.749773 #30000] DEBUG -- : rdkafka: [thrd:app]:   sasl.oauthbearer.method = oidc
D, [2024-09-08T23:42:21.749789 #30000] DEBUG -- : rdkafka: [thrd:app]:   sasl.oauthbearer.client.id = kafka-test.broker
D, [2024-09-08T23:42:21.749799 #30000] DEBUG -- : rdkafka: [thrd:app]:   sasl.oauthbearer.client.secret = v=S1;d=kafka-test;n=broker;
D, [2024-09-08T23:42:21.749822 #30000] DEBUG -- : rdkafka: [thrd:app]:   sasl.oauthbearer.token.endpoint.url = https://otoken.com/zts/v1/oauth2/token
D, [2024-09-08T23:42:21.749829 #30000] DEBUG -- : rdkafka: [thrd:app]:   dr_msg_cb = 0x102794810
D, [2024-09-08T23:42:21.749837 #30000] DEBUG -- : rdkafka: [thrd:app]: Default topic configuration:
D, [2024-09-08T23:42:21.749844 #30000] DEBUG -- : rdkafka: [thrd:app]:   request.required.acks = -1
Starting to produce messages...
Producing message: Test message from ruby 0
D, [2024-09-08T23:42:21.749932 #30000] DEBUG -- : rdkafka: [thrd:app]: New local topic: test
D, [2024-09-08T23:42:21.749959 #30000] DEBUG -- : rdkafka: [thrd:app]: NEW test [-1] 0x13e00c800 refcnt 0x13e00c890 (at rd_kafka_topic_new0:472)
D, [2024-09-08T23:42:21.749967 #30000] DEBUG -- : rdkafka: [thrd:app]: Topic "test" configuration (default_topic_conf):
D, [2024-09-08T23:42:21.749974 #30000] DEBUG -- : rdkafka: [thrd:app]:   request.required.acks = -1
D, [2024-09-08T23:42:21.787335 #30000] DEBUG -- : rdkafka: [thrd:background]: Wake-up sent to 1 broker thread in state >= TRY_CONNECT: OAUTHBEARER token update
D, [2024-09-08T23:42:21.787365 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-external-test.localhost]: sasl_ssl://kafka-external-test.localhost:10992/bootstrap: broker in state TRY_CONNECT connecting
D, [2024-09-08T23:42:21.787373 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-external-test.localhost]: sasl_ssl://kafka-external-test.localhost:10992/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
D, [2024-09-08T23:42:21.793871 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-external-test.localhost]: sasl_ssl://kafka-external-test.localhost:10992/bootstrap: Connecting to ipv4#10.105.98.234:10992 (sasl_ssl) with socket 6
D, [2024-09-08T23:42:21.798319 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-external-test.localhost]: sasl_ssl://kafka-external-test.localhost:10992/bootstrap: Connected to ipv4#10.105.98.234:10992
D, [2024-09-08T23:42:21.798329 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-external-test.localhost]: sasl_ssl://kafka-external-test.localhost:10992/bootstrap: Broker changed state CONNECT -> SSL_HANDSHAKE
D, [2024-09-08T23:42:21.814587 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-external-test.localhost]: sasl_ssl://kafka-external-test.localhost:10992/bootstrap: Connected (#1)
D, [2024-09-08T23:42:21.814606 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-external-test.localhost]: sasl_ssl://kafka-external-test.localhost:10992/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
D, [2024-09-08T23:42:21.814620 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-external-test.localhost]: sasl_ssl://kafka-external-test.localhost:10992/bootstrap: Broker changed state SSL_HANDSHAKE -> APIVERSION_QUERY
D, [2024-09-08T23:42:21.819267 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-external-test.localhost]: sasl_ssl://kafka-external-test.localhost:10992/bootstrap: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2,IdempotentProducer,ZSTD,SaslAuthReq
D, [2024-09-08T23:42:21.819284 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-external-test.localhost]: sasl_ssl://kafka-external-test.localhost:10992/bootstrap: Auth in state APIVERSION_QUERY (handshake supported)
D, [2024-09-08T23:42:21.819311 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-external-test.localhost]: sasl_ssl://kafka-external-test.localhost:10992/bootstrap: Broker changed state APIVERSION_QUERY -> AUTH_HANDSHAKE
D, [2024-09-08T23:42:21.823437 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-external-test.localhost]: sasl_ssl://kafka-external-test.localhost:10992/bootstrap: Broker supported SASL mechanisms: OAUTHBEARER
D, [2024-09-08T23:42:21.823451 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-external-test.localhost]: sasl_ssl://kafka-external-test.localhost:10992/bootstrap: Auth in state AUTH_HANDSHAKE (handshake supported)
D, [2024-09-08T23:42:21.823470 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-external-test.localhost]: sasl_ssl://kafka-external-test.localhost:10992/bootstrap: Broker changed state AUTH_HANDSHAKE -> AUTH_REQ
D, [2024-09-08T23:42:21.873991 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-external-test.localhost]: sasl_ssl://kafka-external-test.localhost:10992/bootstrap: SASL OAUTHBEARER authentication successful (principal=kafka-test.broker)
D, [2024-09-08T23:42:21.874008 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-external-test.localhost]: sasl_ssl://kafka-external-test.localhost:10992/bootstrap: Broker changed state AUTH_REQ -> UP
D, [2024-09-08T23:42:21.878639 #30000] DEBUG -- : rdkafka: [thrd:main]: sasl_ssl://kafka-broker-test-0.localhost:19000/0: Added new broker with NodeId 0
D, [2024-09-08T23:42:21.878654 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-0.localhost]: sasl_ssl://kafka-broker-test-0.localhost:19000/0: Enter main broker thread
D, [2024-09-08T23:42:21.878664 #30000] DEBUG -- : rdkafka: [thrd:main]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: Added new broker with NodeId 1
D, [2024-09-08T23:42:21.878675 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: Enter main broker thread
D, [2024-09-08T23:42:21.878684 #30000] DEBUG -- : rdkafka: [thrd:main]: sasl_ssl://kafka-broker-test-2.localhost:19002/2: Added new broker with NodeId 2
D, [2024-09-08T23:42:21.878696 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-2.localhost]: sasl_ssl://kafka-broker-test-2.localhost:19002/2: Enter main broker thread
D, [2024-09-08T23:42:21.878709 #30000] DEBUG -- : rdkafka: [thrd:main]: sasl_ssl://kafka-broker-test-3.localhost:19003/3: Added new broker with NodeId 3
D, [2024-09-08T23:42:21.878718 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-3.localhost]: sasl_ssl://kafka-broker-test-3.localhost:19003/3: Enter main broker thread
D, [2024-09-08T23:42:21.878734 #30000] DEBUG -- : rdkafka: [thrd:main]: sasl_ssl://kafka-broker-test-4.localhost:19004/4: Added new broker with NodeId 4
D, [2024-09-08T23:42:21.878744 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-4.localhost]: sasl_ssl://kafka-broker-test-4.localhost:19004/4: Enter main broker thread
D, [2024-09-08T23:42:21.878761 #30000] DEBUG -- : rdkafka: [thrd:main]: Topic test changed state unknown -> exists
D, [2024-09-08T23:42:21.878801 #30000] DEBUG -- : rdkafka: [thrd:main]: Topic test partition count changed from 0 to 12
D, [2024-09-08T23:42:21.878810 #30000] DEBUG -- : rdkafka: [thrd:main]: NEW test [0] 0x12d0a1800 refcnt 0x12d0a1890 (at rd_kafka_topic_partition_cnt_update:912)
D, [2024-09-08T23:42:21.878820 #30000] DEBUG -- : rdkafka: [thrd:main]: NEW test [1] 0x12d0a5200 refcnt 0x12d0a5290 (at rd_kafka_topic_partition_cnt_update:912)
D, [2024-09-08T23:42:21.878828 #30000] DEBUG -- : rdkafka: [thrd:main]: NEW test [2] 0x12d0ae200 refcnt 0x12d0ae290 (at rd_kafka_topic_partition_cnt_update:912)
D, [2024-09-08T23:42:21.878835 #30000] DEBUG -- : rdkafka: [thrd:main]: NEW test [3] 0x12d0b4e00 refcnt 0x12d0b4e90 (at rd_kafka_topic_partition_cnt_update:912)
D, [2024-09-08T23:42:21.878842 #30000] DEBUG -- : rdkafka: [thrd:main]: NEW test [4] 0x12d0b5400 refcnt 0x12d0b5490 (at rd_kafka_topic_partition_cnt_update:912)
D, [2024-09-08T23:42:21.878850 #30000] DEBUG -- : rdkafka: [thrd:main]: NEW test [5] 0x12d0b5a00 refcnt 0x12d0b5a90 (at rd_kafka_topic_partition_cnt_update:912)
D, [2024-09-08T23:42:21.878857 #30000] DEBUG -- : rdkafka: [thrd:main]: NEW test [6] 0x12d0b6000 refcnt 0x12d0b6090 (at rd_kafka_topic_partition_cnt_update:912)
D, [2024-09-08T23:42:21.878864 #30000] DEBUG -- : rdkafka: [thrd:main]: NEW test [7] 0x12d0b6600 refcnt 0x12d0b6690 (at rd_kafka_topic_partition_cnt_update:912)
D, [2024-09-08T23:42:21.878871 #30000] DEBUG -- : rdkafka: [thrd:main]: NEW test [8] 0x12d0b6c00 refcnt 0x12d0b6c90 (at rd_kafka_topic_partition_cnt_update:912)
D, [2024-09-08T23:42:21.878878 #30000] DEBUG -- : rdkafka: [thrd:main]: NEW test [9] 0x12d0b7200 refcnt 0x12d0b7290 (at rd_kafka_topic_partition_cnt_update:912)
D, [2024-09-08T23:42:21.878885 #30000] DEBUG -- : rdkafka: [thrd:main]: NEW test [10] 0x12d0b7800 refcnt 0x12d0b7890 (at rd_kafka_topic_partition_cnt_update:912)
D, [2024-09-08T23:42:21.878892 #30000] DEBUG -- : rdkafka: [thrd:main]: NEW test [11] 0x12d0b7e00 refcnt 0x12d0b7e90 (at rd_kafka_topic_partition_cnt_update:912)
D, [2024-09-08T23:42:21.878899 #30000] DEBUG -- : rdkafka: [thrd:main]:   Topic test partition 0 Leader 1 Epoch 1
D, [2024-09-08T23:42:21.878907 #30000] DEBUG -- : rdkafka: [thrd:main]: test [0]: leader -1 epoch -1 -> leader 1 epoch 1
D, [2024-09-08T23:42:21.878914 #30000] DEBUG -- : rdkafka: [thrd:main]: test [0]: delegate to broker sasl_ssl://kafka-broker-test-1.localhost:19001/1 (rktp 0x12d0a1800, term 0, ref 3)
D, [2024-09-08T23:42:21.878921 #30000] DEBUG -- : rdkafka: [thrd:main]: test [0]: delegating to broker sasl_ssl://kafka-broker-test-1.localhost:19001/1 for partition with 0 messages (0 bytes) queued
D, [2024-09-08T23:42:21.878929 #30000] DEBUG -- : rdkafka: [thrd:main]: Migrating topic test [0] 0x12d0a1800 from (none) to sasl_ssl://kafka-broker-test-1.localhost:19001/1 (sending PARTITION_JOIN to sasl_ssl://kafka-broker-test-1.localhost:19001/1)
D, [2024-09-08T23:42:21.878936 #30000] DEBUG -- : rdkafka: [thrd:main]:   Topic test partition 1 Leader 1 Epoch 1
D, [2024-09-08T23:42:21.878944 #30000] DEBUG -- : rdkafka: [thrd:main]: test [1]: leader -1 epoch -1 -> leader 1 epoch 1
D, [2024-09-08T23:42:21.878951 #30000] DEBUG -- : rdkafka: [thrd:main]: test [1]: delegate to broker sasl_ssl://kafka-broker-test-1.localhost:19001/1 (rktp 0x12d0a5200, term 0, ref 3)
D, [2024-09-08T23:42:21.878958 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: Topic test [0]: joining broker (rktp 0x12d0a1800, 0 message(s) queued)
D, [2024-09-08T23:42:21.878965 #30000] DEBUG -- : rdkafka: [thrd:main]: test [1]: delegating to broker sasl_ssl://kafka-broker-test-1.localhost:19001/1 for partition with 0 messages (0 bytes) queued
D, [2024-09-08T23:42:21.878972 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: Added test [0] to active list (1 entries, opv 0, 0 messages queued): joining
D, [2024-09-08T23:42:21.878988 #30000] DEBUG -- : rdkafka: [thrd:main]: Migrating topic test [1] 0x12d0a5200 from (none) to sasl_ssl://kafka-broker-test-1.localhost:19001/1 (sending PARTITION_JOIN to sasl_ssl://kafka-broker-test-1.localhost:19001/1)
D, [2024-09-08T23:42:21.878991 #30000] DEBUG -- : rdkafka: [thrd:main]:   Topic test partition 2 Leader 3 Epoch 2
D, [2024-09-08T23:42:21.878993 #30000] DEBUG -- : rdkafka: [thrd:main]: test [2]: leader -1 epoch -1 -> leader 3 epoch 2
D, [2024-09-08T23:42:21.878995 #30000] DEBUG -- : rdkafka: [thrd:main]: test [2]: delegate to broker sasl_ssl://kafka-broker-test-3.localhost:19003/3 (rktp 0x12d0ae200, term 0, ref 3)
D, [2024-09-08T23:42:21.878997 #30000] DEBUG -- : rdkafka: [thrd:main]: test [2]: delegating to broker sasl_ssl://kafka-broker-test-3.localhost:19003/3 for partition with 0 messages (0 bytes) queued
D, [2024-09-08T23:42:21.878999 #30000] DEBUG -- : rdkafka: [thrd:main]: Migrating topic test [2] 0x12d0ae200 from (none) to sasl_ssl://kafka-broker-test-3.localhost:19003/3 (sending PARTITION_JOIN to sasl_ssl://kafka-broker-test-3.localhost:19003/3)
D, [2024-09-08T23:42:21.879001 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: Topic test [1]: joining broker (rktp 0x12d0a5200, 0 message(s) queued)
D, [2024-09-08T23:42:21.879003 #30000] DEBUG -- : rdkafka: [thrd:main]:   Topic test partition 3 Leader 2 Epoch 1
D, [2024-09-08T23:42:21.879004 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: Added test [1] to active list (2 entries, opv 0, 0 messages queued): joining
D, [2024-09-08T23:42:21.879006 #30000] DEBUG -- : rdkafka: [thrd:main]: test [3]: leader -1 epoch -1 -> leader 2 epoch 1
D, [2024-09-08T23:42:21.879008 #30000] DEBUG -- : rdkafka: [thrd:main]: test [3]: delegate to broker sasl_ssl://kafka-broker-test-2.localhost:19002/2 (rktp 0x12d0b4e00, term 0, ref 3)
D, [2024-09-08T23:42:21.879016 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-3.localhost]: sasl_ssl://kafka-broker-test-3.localhost:19003/3: Topic test [2]: joining broker (rktp 0x12d0ae200, 0 message(s) queued)
D, [2024-09-08T23:42:21.879023 #30000] DEBUG -- : rdkafka: [thrd:main]: test [3]: delegating to broker sasl_ssl://kafka-broker-test-2.localhost:19002/2 for partition with 0 messages (0 bytes) queued
D, [2024-09-08T23:42:21.879030 #30000] DEBUG -- : rdkafka: [thrd:main]: Migrating topic test [3] 0x12d0b4e00 from (none) to sasl_ssl://kafka-broker-test-2.localhost:19002/2 (sending PARTITION_JOIN to sasl_ssl://kafka-broker-test-2.localhost:19002/2)
D, [2024-09-08T23:42:21.879037 #30000] DEBUG -- : rdkafka: [thrd:main]:   Topic test partition 4 Leader 1 Epoch 1
D, [2024-09-08T23:42:21.879044 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-3.localhost]: sasl_ssl://kafka-broker-test-3.localhost:19003/3: Added test [2] to active list (1 entries, opv 0, 0 messages queued): joining
D, [2024-09-08T23:42:21.879056 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-2.localhost]: sasl_ssl://kafka-broker-test-2.localhost:19002/2: Topic test [3]: joining broker (rktp 0x12d0b4e00, 0 message(s) queued)
D, [2024-09-08T23:42:21.879063 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-2.localhost]: sasl_ssl://kafka-broker-test-2.localhost:19002/2: Added test [3] to active list (1 entries, opv 0, 0 messages queued): joining
D, [2024-09-08T23:42:21.879070 #30000] DEBUG -- : rdkafka: [thrd:main]: test [4]: leader -1 epoch -1 -> leader 1 epoch 1
D, [2024-09-08T23:42:21.879077 #30000] DEBUG -- : rdkafka: [thrd:main]: test [4]: delegate to broker sasl_ssl://kafka-broker-test-1.localhost:19001/1 (rktp 0x12d0b5400, term 0, ref 3)
D, [2024-09-08T23:42:21.879085 #30000] DEBUG -- : rdkafka: [thrd:main]: test [4]: delegating to broker sasl_ssl://kafka-broker-test-1.localhost:19001/1 for partition with 0 messages (0 bytes) queued
D, [2024-09-08T23:42:21.879092 #30000] DEBUG -- : rdkafka: [thrd:main]: Migrating topic test [4] 0x12d0b5400 from (none) to sasl_ssl://kafka-broker-test-1.localhost:19001/1 (sending PARTITION_JOIN to sasl_ssl://kafka-broker-test-1.localhost:19001/1)
D, [2024-09-08T23:42:21.879099 #30000] DEBUG -- : rdkafka: [thrd:main]:   Topic test partition 5 Leader 0 Epoch 0
D, [2024-09-08T23:42:21.879106 #30000] DEBUG -- : rdkafka: [thrd:main]: test [5]: leader -1 epoch -1 -> leader 0 epoch 0
D, [2024-09-08T23:42:21.879114 #30000] DEBUG -- : rdkafka: [thrd:main]: test [5]: delegate to broker sasl_ssl://kafka-broker-test-0.localhost:19000/0 (rktp 0x12d0b5a00, term 0, ref 3)
D, [2024-09-08T23:42:21.879122 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: Topic test [4]: joining broker (rktp 0x12d0b5400, 0 message(s) queued)
D, [2024-09-08T23:42:21.879129 #30000] DEBUG -- : rdkafka: [thrd:main]: test [5]: delegating to broker sasl_ssl://kafka-broker-test-0.localhost:19000/0 for partition with 0 messages (0 bytes) queued
D, [2024-09-08T23:42:21.879136 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: Added test [4] to active list (3 entries, opv 0, 0 messages queued): joining
D, [2024-09-08T23:42:21.879143 #30000] DEBUG -- : rdkafka: [thrd:main]: Migrating topic test [5] 0x12d0b5a00 from (none) to sasl_ssl://kafka-broker-test-0.localhost:19000/0 (sending PARTITION_JOIN to sasl_ssl://kafka-broker-test-0.localhost:19000/0)
D, [2024-09-08T23:42:21.879150 #30000] DEBUG -- : rdkafka: [thrd:main]:   Topic test partition 6 Leader 0 Epoch 0
D, [2024-09-08T23:42:21.879157 #30000] DEBUG -- : rdkafka: [thrd:main]: test [6]: leader -1 epoch -1 -> leader 0 epoch 0
D, [2024-09-08T23:42:21.879166 #30000] DEBUG -- : rdkafka: [thrd:main]: test [6]: delegate to broker sasl_ssl://kafka-broker-test-0.localhost:19000/0 (rktp 0x12d0b6000, term 0, ref 3)
D, [2024-09-08T23:42:21.879175 #30000] DEBUG -- : rdkafka: [thrd:main]: test [6]: delegating to broker sasl_ssl://kafka-broker-test-0.localhost:19000/0 for partition with 0 messages (0 bytes) queued
D, [2024-09-08T23:42:21.879184 #30000] DEBUG -- : rdkafka: [thrd:main]: Migrating topic test [6] 0x12d0b6000 from (none) to sasl_ssl://kafka-broker-test-0.localhost:19000/0 (sending PARTITION_JOIN to sasl_ssl://kafka-broker-test-0.localhost:19000/0)
D, [2024-09-08T23:42:21.879194 #30000] DEBUG -- : rdkafka: [thrd:main]:   Topic test partition 7 Leader 3 Epoch 1
D, [2024-09-08T23:42:21.879202 #30000] DEBUG -- : rdkafka: [thrd:main]: test [7]: leader -1 epoch -1 -> leader 3 epoch 1
D, [2024-09-08T23:42:21.879216 #30000] DEBUG -- : rdkafka: [thrd:main]: test [7]: delegate to broker sasl_ssl://kafka-broker-test-3.localhost:19003/3 (rktp 0x12d0b6600, term 0, ref 3)
D, [2024-09-08T23:42:21.879218 #30000] DEBUG -- : rdkafka: [thrd:main]: test [7]: delegating to broker sasl_ssl://kafka-broker-test-3.localhost:19003/3 for partition with 0 messages (0 bytes) queued
D, [2024-09-08T23:42:21.879220 #30000] DEBUG -- : rdkafka: [thrd:main]: Migrating topic test [7] 0x12d0b6600 from (none) to sasl_ssl://kafka-broker-test-3.localhost:19003/3 (sending PARTITION_JOIN to sasl_ssl://kafka-broker-test-3.localhost:19003/3)
D, [2024-09-08T23:42:21.879222 #30000] DEBUG -- : rdkafka: [thrd:main]:   Topic test partition 8 Leader 2 Epoch 1
D, [2024-09-08T23:42:21.879224 #30000] DEBUG -- : rdkafka: [thrd:main]: test [8]: leader -1 epoch -1 -> leader 2 epoch 1
D, [2024-09-08T23:42:21.879226 #30000] DEBUG -- : rdkafka: [thrd:main]: test [8]: delegate to broker sasl_ssl://kafka-broker-test-2.localhost:19002/2 (rktp 0x12d0b6c00, term 0, ref 3)
D, [2024-09-08T23:42:21.879228 #30000] DEBUG -- : rdkafka: [thrd:main]: test [8]: delegating to broker sasl_ssl://kafka-broker-test-2.localhost:19002/2 for partition with 0 messages (0 bytes) queued
D, [2024-09-08T23:42:21.879229 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-0.localhost]: sasl_ssl://kafka-broker-test-0.localhost:19000/0: Topic test [5]: joining broker (rktp 0x12d0b5a00, 0 message(s) queued)
D, [2024-09-08T23:42:21.879232 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-0.localhost]: sasl_ssl://kafka-broker-test-0.localhost:19000/0: Added test [5] to active list (1 entries, opv 0, 0 messages queued): joining
D, [2024-09-08T23:42:21.879241 #30000] DEBUG -- : rdkafka: [thrd:main]: Migrating topic test [8] 0x12d0b6c00 from (none) to sasl_ssl://kafka-broker-test-2.localhost:19002/2 (sending PARTITION_JOIN to sasl_ssl://kafka-broker-test-2.localhost:19002/2)
D, [2024-09-08T23:42:21.879249 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-3.localhost]: sasl_ssl://kafka-broker-test-3.localhost:19003/3: Topic test [7]: joining broker (rktp 0x12d0b6600, 0 message(s) queued)
D, [2024-09-08T23:42:21.879256 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-0.localhost]: sasl_ssl://kafka-broker-test-0.localhost:19000/0: Topic test [6]: joining broker (rktp 0x12d0b6000, 0 message(s) queued)
D, [2024-09-08T23:42:21.879264 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-3.localhost]: sasl_ssl://kafka-broker-test-3.localhost:19003/3: Added test [7] to active list (2 entries, opv 0, 0 messages queued): joining
D, [2024-09-08T23:42:21.879272 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-0.localhost]: sasl_ssl://kafka-broker-test-0.localhost:19000/0: Added test [6] to active list (2 entries, opv 0, 0 messages queued): joining
D, [2024-09-08T23:42:21.879279 #30000] DEBUG -- : rdkafka: [thrd:main]:   Topic test partition 9 Leader 0 Epoch 1
D, [2024-09-08T23:42:21.879286 #30000] DEBUG -- : rdkafka: [thrd:main]: test [9]: leader -1 epoch -1 -> leader 0 epoch 1
D, [2024-09-08T23:42:21.879293 #30000] DEBUG -- : rdkafka: [thrd:main]: test [9]: delegate to broker sasl_ssl://kafka-broker-test-0.localhost:19000/0 (rktp 0x12d0b7200, term 0, ref 3)
D, [2024-09-08T23:42:21.879301 #30000] DEBUG -- : rdkafka: [thrd:main]: test [9]: delegating to broker sasl_ssl://kafka-broker-test-0.localhost:19000/0 for partition with 0 messages (0 bytes) queued
D, [2024-09-08T23:42:21.879309 #30000] DEBUG -- : rdkafka: [thrd:main]: Migrating topic test [9] 0x12d0b7200 from (none) to sasl_ssl://kafka-broker-test-0.localhost:19000/0 (sending PARTITION_JOIN to sasl_ssl://kafka-broker-test-0.localhost:19000/0)
D, [2024-09-08T23:42:21.879316 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-2.localhost]: sasl_ssl://kafka-broker-test-2.localhost:19002/2: Topic test [8]: joining broker (rktp 0x12d0b6c00, 0 message(s) queued)
D, [2024-09-08T23:42:21.879326 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-2.localhost]: sasl_ssl://kafka-broker-test-2.localhost:19002/2: Added test [8] to active list (2 entries, opv 0, 0 messages queued): joining
D, [2024-09-08T23:42:21.879338 #30000] DEBUG -- : rdkafka: [thrd:main]:   Topic test partition 10 Leader 3 Epoch 1
D, [2024-09-08T23:42:21.879347 #30000] DEBUG -- : rdkafka: [thrd:main]: test [10]: leader -1 epoch -1 -> leader 3 epoch 1
D, [2024-09-08T23:42:21.879355 #30000] DEBUG -- : rdkafka: [thrd:main]: test [10]: delegate to broker sasl_ssl://kafka-broker-test-3.localhost:19003/3 (rktp 0x12d0b7800, term 0, ref 3)
D, [2024-09-08T23:42:21.879362 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-0.localhost]: sasl_ssl://kafka-broker-test-0.localhost:19000/0: Topic test [9]: joining broker (rktp 0x12d0b7200, 0 message(s) queued)
D, [2024-09-08T23:42:21.879370 #30000] DEBUG -- : rdkafka: [thrd:main]: test [10]: delegating to broker sasl_ssl://kafka-broker-test-3.localhost:19003/3 for partition with 0 messages (0 bytes) queued
D, [2024-09-08T23:42:21.879378 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-0.localhost]: sasl_ssl://kafka-broker-test-0.localhost:19000/0: Added test [9] to active list (3 entries, opv 0, 0 messages queued): joining
D, [2024-09-08T23:42:21.879386 #30000] DEBUG -- : rdkafka: [thrd:main]: Migrating topic test [10] 0x12d0b7800 from (none) to sasl_ssl://kafka-broker-test-3.localhost:19003/3 (sending PARTITION_JOIN to sasl_ssl://kafka-broker-test-3.localhost:19003/3)
D, [2024-09-08T23:42:21.879394 #30000] DEBUG -- : rdkafka: [thrd:main]:   Topic test partition 11 Leader 2 Epoch 1
D, [2024-09-08T23:42:21.879403 #30000] DEBUG -- : rdkafka: [thrd:main]: test [11]: leader -1 epoch -1 -> leader 2 epoch 1
D, [2024-09-08T23:42:21.879412 #30000] DEBUG -- : rdkafka: [thrd:main]: test [11]: delegate to broker sasl_ssl://kafka-broker-test-2.localhost:19002/2 (rktp 0x12d0b7e00, term 0, ref 3)
D, [2024-09-08T23:42:21.879420 #30000] DEBUG -- : rdkafka: [thrd:main]: test [11]: delegating to broker sasl_ssl://kafka-broker-test-2.localhost:19002/2 for partition with 0 messages (0 bytes) queued
D, [2024-09-08T23:42:21.879428 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-3.localhost]: sasl_ssl://kafka-broker-test-3.localhost:19003/3: Topic test [10]: joining broker (rktp 0x12d0b7800, 0 message(s) queued)
D, [2024-09-08T23:42:21.879435 #30000] DEBUG -- : rdkafka: [thrd:main]: Migrating topic test [11] 0x12d0b7e00 from (none) to sasl_ssl://kafka-broker-test-2.localhost:19002/2 (sending PARTITION_JOIN to sasl_ssl://kafka-broker-test-2.localhost:19002/2)
D, [2024-09-08T23:42:21.879453 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-2.localhost]: sasl_ssl://kafka-broker-test-2.localhost:19002/2: Topic test [11]: joining broker (rktp 0x12d0b7e00, 0 message(s) queued)
D, [2024-09-08T23:42:21.879455 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-2.localhost]: sasl_ssl://kafka-broker-test-2.localhost:19002/2: Added test [11] to active list (3 entries, opv 0, 0 messages queued): joining
D, [2024-09-08T23:42:21.879457 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-3.localhost]: sasl_ssl://kafka-broker-test-3.localhost:19003/3: Added test [10] to active list (3 entries, opv 0, 0 messages queued): joining
D, [2024-09-08T23:42:21.879459 #30000] DEBUG -- : rdkafka: [thrd:main]: Partitioning 1 unassigned messages in topic test to 12 partitions
D, [2024-09-08T23:42:21.879461 #30000] DEBUG -- : rdkafka: [thrd:main]: 1/1 messages were partitioned in topic test
D, [2024-09-08T23:42:21.879463 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: test [0] 1 message(s) queued but broker not up
D, [2024-09-08T23:42:21.879465 #30000] DEBUG -- : rdkafka: [thrd:main]: sasl_ssl://kafka-external-test.localhost:10992/bootstrap: 1/1 requested topic(s) seen in metadata
D, [2024-09-08T23:42:21.879467 #30000] DEBUG -- : rdkafka: [thrd:main]: sasl_ssl://kafka-external-test.localhost:10992/bootstrap: ClusterId update "" -> "ZWVjMDM5OWUtZWIzNS00Mg=="
D, [2024-09-08T23:42:21.879469 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: Broker changed state INIT -> TRY_CONNECT
D, [2024-09-08T23:42:21.879470 #30000] DEBUG -- : rdkafka: [thrd:main]: sasl_ssl://kafka-external-test.localhost:10992/bootstrap: ControllerId update -1 -> 4
D, [2024-09-08T23:42:21.879472 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: broker in state TRY_CONNECT connecting
D, [2024-09-08T23:42:21.879474 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: Broker changed state TRY_CONNECT -> CONNECT
D, [2024-09-08T23:42:21.884748 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: Connecting to ipv4#10.105.98.234:19001 (sasl_ssl) with socket 23
D, [2024-09-08T23:42:21.884787 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: test [0] 1 message(s) queued but broker not up
D, [2024-09-08T23:42:21.884799 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: test [0] 1 message(s) queued but broker not up
D, [2024-09-08T23:42:21.888906 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: Connected to ipv4#10.105.98.234:19001
D, [2024-09-08T23:42:21.888919 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: Broker changed state CONNECT -> SSL_HANDSHAKE
D, [2024-09-08T23:42:21.888990 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: test [0] 1 message(s) queued but broker not up
D, [2024-09-08T23:42:21.897869 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: test [0] 1 message(s) queued but broker not up
D, [2024-09-08T23:42:21.898159 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: test [0] 1 message(s) queued but broker not up
D, [2024-09-08T23:42:21.904047 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: Connected (#1)
D, [2024-09-08T23:42:21.904158 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: Updated enabled protocol features +ApiVersion to ApiVersion
D, [2024-09-08T23:42:21.904201 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: Broker changed state SSL_HANDSHAKE -> APIVERSION_QUERY
D, [2024-09-08T23:42:21.904226 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: test [0] 1 message(s) queued but broker not up
D, [2024-09-08T23:42:21.908114 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2,IdempotentProducer,ZSTD,SaslAuthReq
D, [2024-09-08T23:42:21.908186 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: Auth in state APIVERSION_QUERY (handshake supported)
D, [2024-09-08T23:42:21.908206 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: Broker changed state APIVERSION_QUERY -> AUTH_HANDSHAKE
D, [2024-09-08T23:42:21.908221 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: test [0] 1 message(s) queued but broker not up
D, [2024-09-08T23:42:21.912274 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: Broker supported SASL mechanisms: OAUTHBEARER
D, [2024-09-08T23:42:21.912317 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: Auth in state AUTH_HANDSHAKE (handshake supported)
D, [2024-09-08T23:42:21.912343 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: Broker changed state AUTH_HANDSHAKE -> AUTH_REQ
D, [2024-09-08T23:42:21.912410 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: test [0] 1 message(s) queued but broker not up
D, [2024-09-08T23:42:21.961665 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: SASL OAUTHBEARER authentication successful (principal=kafka-test.broker)
D, [2024-09-08T23:42:21.961828 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: Broker changed state AUTH_REQ -> UP
D, [2024-09-08T23:42:21.961967 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: test [0]: Produce MessageSet with 1 message(s) (97 bytes, ApiVersion 7, MsgVersion 2, MsgId 0, BaseSeq -1, PID{Invalid}, uncompressed)
D, [2024-09-08T23:42:21.967516 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: test [0]: MessageSet with 1 message(s) (MsgId 0, BaseSeq -1) delivered
Producing message: Test message from ruby 1
D, [2024-09-08T23:42:22.060199 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-3.localhost]: sasl_ssl://kafka-broker-test-3.localhost:19003/3: test [2] 1 message(s) queued but broker not up
D, [2024-09-08T23:42:22.060360 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-3.localhost]: sasl_ssl://kafka-broker-test-3.localhost:19003/3: Broker changed state INIT -> TRY_CONNECT
D, [2024-09-08T23:42:22.060456 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-3.localhost]: sasl_ssl://kafka-broker-test-3.localhost:19003/3: broker in state TRY_CONNECT connecting
D, [2024-09-08T23:42:22.060519 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-3.localhost]: sasl_ssl://kafka-broker-test-3.localhost:19003/3: Broker changed state TRY_CONNECT -> CONNECT
D, [2024-09-08T23:42:22.068275 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-3.localhost]: sasl_ssl://kafka-broker-test-3.localhost:19003/3: Connecting to ipv4#10.105.98.234:19003 (sasl_ssl) with socket 24
D, [2024-09-08T23:42:22.068445 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-3.localhost]: sasl_ssl://kafka-broker-test-3.localhost:19003/3: test [2] 1 message(s) queued but broker not up
D, [2024-09-08T23:42:22.068532 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-3.localhost]: sasl_ssl://kafka-broker-test-3.localhost:19003/3: test [2] 1 message(s) queued but broker not up
D, [2024-09-08T23:42:22.072933 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-3.localhost]: sasl_ssl://kafka-broker-test-3.localhost:19003/3: Connected to ipv4#10.105.98.234:19003
D, [2024-09-08T23:42:22.073055 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-3.localhost]: sasl_ssl://kafka-broker-test-3.localhost:19003/3: Broker changed state CONNECT -> SSL_HANDSHAKE
D, [2024-09-08T23:42:22.073603 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-3.localhost]: sasl_ssl://kafka-broker-test-3.localhost:19003/3: test [2] 1 message(s) queued but broker not up
D, [2024-09-08T23:42:22.085812 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-3.localhost]: sasl_ssl://kafka-broker-test-3.localhost:19003/3: test [2] 1 message(s) queued but broker not up
D, [2024-09-08T23:42:22.091731 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-3.localhost]: sasl_ssl://kafka-broker-test-3.localhost:19003/3: Connected (#1)
D, [2024-09-08T23:42:22.091802 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-3.localhost]: sasl_ssl://kafka-broker-test-3.localhost:19003/3: Updated enabled protocol features +ApiVersion to ApiVersion
D, [2024-09-08T23:42:22.091848 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-3.localhost]: sasl_ssl://kafka-broker-test-3.localhost:19003/3: Broker changed state SSL_HANDSHAKE -> APIVERSION_QUERY
D, [2024-09-08T23:42:22.091895 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-3.localhost]: sasl_ssl://kafka-broker-test-3.localhost:19003/3: test [2] 1 message(s) queued but broker not up
D, [2024-09-08T23:42:22.096157 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-3.localhost]: sasl_ssl://kafka-broker-test-3.localhost:19003/3: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2,IdempotentProducer,ZSTD,SaslAuthReq
D, [2024-09-08T23:42:22.096259 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-3.localhost]: sasl_ssl://kafka-broker-test-3.localhost:19003/3: Auth in state APIVERSION_QUERY (handshake supported)
D, [2024-09-08T23:42:22.096349 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-3.localhost]: sasl_ssl://kafka-broker-test-3.localhost:19003/3: Broker changed state APIVERSION_QUERY -> AUTH_HANDSHAKE
D, [2024-09-08T23:42:22.096424 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-3.localhost]: sasl_ssl://kafka-broker-test-3.localhost:19003/3: test [2] 1 message(s) queued but broker not up
D, [2024-09-08T23:42:22.100363 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-3.localhost]: sasl_ssl://kafka-broker-test-3.localhost:19003/3: Broker supported SASL mechanisms: OAUTHBEARER
D, [2024-09-08T23:42:22.100495 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-3.localhost]: sasl_ssl://kafka-broker-test-3.localhost:19003/3: Auth in state AUTH_HANDSHAKE (handshake supported)
D, [2024-09-08T23:42:22.100588 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-3.localhost]: sasl_ssl://kafka-broker-test-3.localhost:19003/3: Broker changed state AUTH_HANDSHAKE -> AUTH_REQ
D, [2024-09-08T23:42:22.100666 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-3.localhost]: sasl_ssl://kafka-broker-test-3.localhost:19003/3: test [2] 1 message(s) queued but broker not up
D, [2024-09-08T23:42:22.150201 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-3.localhost]: sasl_ssl://kafka-broker-test-3.localhost:19003/3: SASL OAUTHBEARER authentication successful (principal=kafka-test.broker)
D, [2024-09-08T23:42:22.150485 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-3.localhost]: sasl_ssl://kafka-broker-test-3.localhost:19003/3: Broker changed state AUTH_REQ -> UP
D, [2024-09-08T23:42:22.150517 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-3.localhost]: sasl_ssl://kafka-broker-test-3.localhost:19003/3: test [2]: Produce MessageSet with 1 message(s) (97 bytes, ApiVersion 7, MsgVersion 2, MsgId 0, BaseSeq -1, PID{Invalid}, uncompressed)
D, [2024-09-08T23:42:22.156281 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-3.localhost]: sasl_ssl://kafka-broker-test-3.localhost:19003/3: test [2]: MessageSet with 1 message(s) (MsgId 0, BaseSeq -1) delivered
Producing message: Test message from ruby 2
D, [2024-09-08T23:42:22.170807 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: test [0]: Produce MessageSet with 1 message(s) (97 bytes, ApiVersion 7, MsgVersion 2, MsgId 0, BaseSeq -1, PID{Invalid}, uncompressed)
D, [2024-09-08T23:42:22.177236 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: test [0]: MessageSet with 1 message(s) (MsgId 0, BaseSeq -1) delivered
Producing message: Test message from ruby 3
D, [2024-09-08T23:42:22.274303 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-3.localhost]: sasl_ssl://kafka-broker-test-3.localhost:19003/3: test [2]: Produce MessageSet with 1 message(s) (97 bytes, ApiVersion 7, MsgVersion 2, MsgId 0, BaseSeq -1, PID{Invalid}, uncompressed)
D, [2024-09-08T23:42:22.280828 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-3.localhost]: sasl_ssl://kafka-broker-test-3.localhost:19003/3: test [2]: MessageSet with 1 message(s) (MsgId 0, BaseSeq -1) delivered
Producing message: Test message from ruby 4
D, [2024-09-08T23:42:22.379558 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: test [1]: Produce MessageSet with 1 message(s) (97 bytes, ApiVersion 7, MsgVersion 2, MsgId 0, BaseSeq -1, PID{Invalid}, uncompressed)
D, [2024-09-08T23:42:22.386001 #30000] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-broker-test-1.localhost]: sasl_ssl://kafka-broker-test-1.localhost:19001/1: test [1]: MessageSet with 1 message(s) (MsgId 0, BaseSeq -1) delivered

When exactly same code is used with rdkafka v0.16.0+ it doesn't work.

Click to expand logs
D, [2024-09-08T23:52:13.643850 #32998] DEBUG -- : rdkafka: [thrd::0/internal]: :0/internal: Enter main broker thread
D, [2024-09-08T23:52:13.643882 #32998] DEBUG -- : rdkafka: [thrd:app]: sasl_ssl://kafka-external-test.localhost:10992/bootstrap: Added new broker with NodeId -1
D, [2024-09-08T23:52:13.643900 #32998] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-external-test.n3r-project-kafka-test.sv]: sasl_ssl://kafka-external-test.localhost:10992/bootstrap: Enter main broker thread
D, [2024-09-08T23:52:13.643926 #32998] DEBUG -- : rdkafka: [thrd:app]: sasl_ssl://kafka-external-test.localhost:10992/bootstrap: Selected for cluster connection: bootstrap servers added (broker has 0 connection attempt(s))
D, [2024-09-08T23:52:13.643941 #32998] DEBUG -- : rdkafka: [thrd:app]: librdkafka v2.3.0 (0x20300ff) rdkafka#producer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,sasl_oauthbearer,http,oidc, GCC GXX OSXLD LIBDL PLUGINS ZLIB SSL SASL_CYRUS CURL HDRHISTOGRAM SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER OAUTHBEARER_OIDC, debug 0x40046)
D, [2024-09-08T23:52:13.643963 #32998] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-external-test.n3r-project-kafka-test.sv]: sasl_ssl://kafka-external-test.localhost:10992/bootstrap: Received CONNECT op
D, [2024-09-08T23:52:13.643980 #32998] DEBUG -- : rdkafka: [thrd:sasl_ssl://kafka-external-test.n3r-project-kafka-test.sv]: sasl_ssl://kafka-external-test.localhost:10992/bootstrap: Broker changed state INIT -> TRY_CONNECT
D, [2024-09-08T23:52:13.643996 #32998] DEBUG -- : rdkafka: [thrd:app]: Client configuration:
D, [2024-09-08T23:52:13.644010 #32998] DEBUG -- : rdkafka: [thrd:app]:   client.software.version = 2.3.0
D, [2024-09-08T23:52:13.644018 #32998] DEBUG -- : rdkafka: [thrd:app]:   metadata.broker.list = kafka-external-test.localhost:10992
D, [2024-09-08T23:52:13.644026 #32998] DEBUG -- : rdkafka: [thrd:app]:   message.max.bytes = 20971520
D, [2024-09-08T23:52:13.644034 #32998] DEBUG -- : rdkafka: [thrd:app]:   debug = broker,topic,msg,conf
D, [2024-09-08T23:52:13.644042 #32998] DEBUG -- : rdkafka: [thrd:app]:   error_cb = 0x1024d4380
D, [2024-09-08T23:52:13.644049 #32998] DEBUG -- : rdkafka: [thrd:app]:   stats_cb = 0x1024d4370
D, [2024-09-08T23:52:13.644056 #32998] DEBUG -- : rdkafka: [thrd:app]:   log_cb = 0x1024d4360
D, [2024-09-08T23:52:13.644066 #32998] DEBUG -- : rdkafka: [thrd:app]:   log.queue = true
D, [2024-09-08T23:52:13.644073 #32998] DEBUG -- : rdkafka: [thrd:app]:   opaque = 0x8c
D, [2024-09-08T23:52:13.644080 #32998] DEBUG -- : rdkafka: [thrd:app]:   default_topic_conf = 0x102b3e090
D, [2024-09-08T23:52:13.644089 #32998] DEBUG -- : rdkafka: [thrd:app]:   api.version.request = true
D, [2024-09-08T23:52:13.644101 #32998] DEBUG -- : rdkafka: [thrd:app]:   security.protocol = sasl_ssl
D, [2024-09-08T23:52:13.644115 #32998] DEBUG -- : rdkafka: [thrd:app]:   ssl.ca.location = ca.crt
D, [2024-09-08T23:52:13.644128 #32998] DEBUG -- : rdkafka: [thrd:app]:   sasl.mechanisms = OAUTHBEARER
D, [2024-09-08T23:52:13.644138 #32998] DEBUG -- : rdkafka: [thrd:app]:   oauthbearer_token_refresh_cb = 0x1024d4390
D, [2024-09-08T23:52:13.644148 #32998] DEBUG -- : rdkafka: [thrd:app]:   sasl.oauthbearer.method = oidc
D, [2024-09-08T23:52:13.644161 #32998] DEBUG -- : rdkafka: [thrd:app]:   sasl.oauthbearer.client.id = kafka-test.broker
D, [2024-09-08T23:52:13.644172 #32998] DEBUG -- : rdkafka: [thrd:app]:   sasl.oauthbearer.client.secret = v=S1;d=kafka-test;n=broker;
D, [2024-09-08T23:52:13.644186 #32998] DEBUG -- : rdkafka: [thrd:app]:   sasl.oauthbearer.scope = kafka-test:domain
D, [2024-09-08T23:52:13.644195 #32998] DEBUG -- : rdkafka: [thrd:app]:   sasl.oauthbearer.token.endpoint.url = https://otoken.com/zts/v1/oauth2/token
D, [2024-09-08T23:52:13.644202 #32998] DEBUG -- : rdkafka: [thrd:app]:   dr_msg_cb = 0x1024d49d0
D, [2024-09-08T23:52:13.644225 #32998] DEBUG -- : rdkafka: [thrd:app]: Default topic configuration:
D, [2024-09-08T23:52:13.644236 #32998] DEBUG -- : rdkafka: [thrd:app]:   request.required.acks = -1
Starting to produce messages...
Producing message: Test message from ruby 0
D, [2024-09-08T23:52:13.644357 #32998] DEBUG -- : rdkafka: [thrd:app]: New local topic: test
D, [2024-09-08T23:52:13.644365 #32998] DEBUG -- : rdkafka: [thrd:app]: NEW test [-1] 0x12300e200 refcnt 0x12300e290 (at rd_kafka_topic_new0:472)
D, [2024-09-08T23:52:13.644373 #32998] DEBUG -- : rdkafka: [thrd:app]: Topic "test" configuration (default_topic_conf):
D, [2024-09-08T23:52:13.644381 #32998] DEBUG -- : rdkafka: [thrd:app]:   request.required.acks = -1
D, [2024-09-08T23:52:13.644392 #32998] DEBUG -- : rdkafka: [thrd:app]: Not selecting any broker for cluster connection: still suppressed for 49ms: leader query
D, [2024-09-08T23:52:14.647170 #32998] DEBUG -- : rdkafka: [thrd:main]: Topic test metadata information unknown
D, [2024-09-08T23:52:14.647408 #32998] DEBUG -- : rdkafka: [thrd:main]: Topic test partition count is zero: should refresh metadata
D, [2024-09-08T23:52:14.647565 #32998] DEBUG -- : rdkafka: [thrd:main]: Cluster connection already in progress: refresh unavailable topics
D, [2024-09-08T23:52:14.647633 #32998] DEBUG -- : rdkafka: [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 49ms: no cluster connection
D, [2024-09-08T23:52:15.651181 #32998] DEBUG -- : rdkafka: [thrd:main]: Topic test metadata information unknown
D, [2024-09-08T23:52:15.651368 #32998] DEBUG -- : rdkafka: [thrd:main]: Topic test partition count is zero: should refresh metadata
D, [2024-09-08T23:52:15.651501 #32998] DEBUG -- : rdkafka: [thrd:main]: Cluster connection already in progress: refresh unavailable topics
D, [2024-09-08T23:52:15.651642 #32998] DEBUG -- : rdkafka: [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 49ms: no cluster connection
D, [2024-09-08T23:52:16.653914 #32998] DEBUG -- : rdkafka: [thrd:main]: Topic test metadata information unknown
D, [2024-09-08T23:52:16.654123 #32998] DEBUG -- : rdkafka: [thrd:main]: Topic test partition count is zero: should refresh metadata
D, [2024-09-08T23:52:16.654255 #32998] DEBUG -- : rdkafka: [thrd:main]: Cluster connection already in progress: refresh unavailable topics
D, [2024-09-08T23:52:16.654362 #32998] DEBUG -- : rdkafka: [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 49ms: no cluster connection
D, [2024-09-08T23:52:17.654659 #32998] DEBUG -- : rdkafka: [thrd:main]: Topic test metadata information unknown
D, [2024-09-08T23:52:17.654821 #32998] DEBUG -- : rdkafka: [thrd:main]: Topic test partition count is zero: should refresh metadata
D, [2024-09-08T23:52:17.654882 #32998] DEBUG -- : rdkafka: [thrd:main]: Cluster connection already in progress: refresh unavailable topics
D, [2024-09-08T23:52:17.654952 #32998] DEBUG -- : rdkafka: [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 49ms: no cluster connection
D, [2024-09-08T23:52:18.654754 #32998] DEBUG -- : rdkafka: [thrd:main]: Topic test metadata information unknown
D, [2024-09-08T23:52:18.654967 #32998] DEBUG -- : rdkafka: [thrd:main]: Topic test partition count is zero: should refresh metadata
D, [2024-09-08T23:52:18.655077 #32998] DEBUG -- : rdkafka: [thrd:main]: Cluster connection already in progress: refresh unavailable topics
D, [2024-09-08T23:52:18.655171 #32998] DEBUG -- : rdkafka: [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 49ms: no cluster connection
D, [2024-09-08T23:52:19.657448 #32998] DEBUG -- : rdkafka: [thrd:main]: Topic test metadata information unknown
D, [2024-09-08T23:52:19.657636 #32998] DEBUG -- : rdkafka: [thrd:main]: Topic test partition count is zero: should refresh metadata
D, [2024-09-08T23:52:19.657753 #32998] DEBUG -- : rdkafka: [thrd:main]: Cluster connection already in progress: refresh unavailable topics
D, [2024-09-08T23:52:19.657820 #32998] DEBUG -- : rdkafka: [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 49ms: no cluster connection
D, [2024-09-08T23:52:20.661586 #32998] DEBUG -- : rdkafka: [thrd:main]: Topic test metadata information unknown
D, [2024-09-08T23:52:20.661778 #32998] DEBUG -- : rdkafka: [thrd:main]: Topic test partition count is zero: should refresh metadata
D, [2024-09-08T23:52:20.661884 #32998] DEBUG -- : rdkafka: [thrd:main]: Cluster connection already in progress: refresh unavailable topics
D, [2024-09-08T23:52:20.662121 #32998] DEBUG -- : rdkafka: [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 49ms: no cluster connection
continued with same logs ...

As you can see from the logs, both versions v0.15.0 and v0.16.0 use the same version of librdkafka (v2.3.0), so this issue doesn’t seem to stem from the C code. I’ve also tested the same configuration with librdkafka v2.3.0 and confirmed that it works fine. My guess is that this issue is related to the token refresh callbacks. Looking at the logs, it seems like the default callbacks provided by the C code are never called starting from version v0.16.0. Also, there are no logs from the background thread in v0.16.0 (those that start with [thrd:background]). When I checked the diff between v0.15.0 and v0.16.0, it looks like #410 touched the OAuthBearer-related code, so maybe something got broken there.

mensfeld commented 1 month ago

The fact that you do not have the background thread is probably because the callback now needs to be explicitly handled to handle oauthbearer token refresh requests in a periodic fashion (at 80% of the token validity time).

It might have worked before because without explicit callback librdkafka would apply its own logic for it and not the one from the user (which allows for explicit token refreshes).

I would not consider it a bug. You can try implementing the your own oauthbearer_token_refresh_callback and see if it is called and if it is, just follow the code docs to implement your own flow for this.

If you can prove that with the explicit callback usage this does not work I will consider this a bug but of a low priority unless more people are affected. Maybe @bruce-szalwinski-he can check it out if he wants but this is not something I am willing to invest time at the moment as the oauthbearer flows I use work and had no reports about them since they were released.

If you consider this critical to your software you can support me and then I can prioritize it. If not, you will have to wait.

mensfeld commented 1 month ago

Update to above: I looked at your code and most likely it is you relying on a undocumented behavior of implicit callback execution. This will not work since in order to support all flows an explicit one had to be included. Also your example does not show, that your version works after token validity time expires.

bachmanity1 commented 1 month ago

I looked at your code and most likely it is you relying on a undocumented behavior of implicit callback execution.

Yes, you're right. My code relies on implicit callback execution.

This will not work since in order to support all flows an explicit one had to be included.

This seems valid, but can you explain a bit more about this? Is it possible to change the code to fall back to implicit callbacks when explicit callbacks aren't passed? I think this would make users' lives a lot easier because implicit callbacks would probably work with most OIDC providers (as per KIP-768).

Also your example does not show, that your version works after token validity time expires.

Yeah, you're right. My example doesn't show that it works after the token validity time expires, but I don't think we need to worry about that. The pull request https://github.com/confluentinc/librdkafka/pull/3560 already includes logic to refresh the token before it expires. All the other wrappers around librdkafka rely on this logic too.

mensfeld commented 1 month ago

This seems valid, but can you explain a bit more about this?

This change was made to allow any oauth flow possible including once based on IAM, etc. It allows for time sensitive tokens regeneration with an external provider code.

Is it possible to change the code to fall back to implicit callbacks when explicit callbacks aren't passed?

Probably yes but I am not willing to investigate nor work on this. I may accept a PR if someone else is willing. The current (new) solution is explicit and handles several more cases for many vendors.

I think this would make users' lives a lot easier because implicit callbacks would probably work with most OIDC providers (as per KIP-768).

Not true. The whole reason why we replaced the implicit with explicit was to support librdkafka explicit flow for any token flow.

Yeah, you're right. My example doesn't show that it works after the token validity time expires, but I don't think we need to worry about that.

This is exactly what we need to worry about. Implicit trigger on initial token set does not revalidate not re-update it.

The librdkafka PR flow clearly states:

Retrieve jwt from token provider and forward it to the broker

Meaning there is external action happening that has to be in sync with librdkafka. It may sound easy but it is not. You would have to setup timer for tracking, refresh flow + update flow. Update flow would have to happen when no polling is happening (or event queue split would have to be done). It is absolutely not user-friendly compared to explicit token refresh flow that can be implemented in rdkafka and that is part of karafka and waterdrop.

With the explicit flow, there is no need to manage any timing or other details beyond token obtaining and updating as states in the docs.

bachmanity1 commented 1 month ago

With the explicit flow, there is no need to manage any timing or other details beyond token obtaining and updating as states in the docs.

I'm not trying to be rude, but I don't really understand what you're getting at. From what I understand, it seems like you think librdkafka can only retrieve the token during initialization and that users need to explicitly refresh the token when it expires. However, librdkafka actually supports automatic token refresh out of the box, so users don't need to handle the timing manually. You can check the code here: link.

mensfeld commented 1 month ago

You are not rude. Your understanding is correct and I did simplify things a bit. Some flows do have token refresh and librdkafka invokes the callback we (rdkafka) uses at the 80% of time. You can also do it fully manually.

The flow that we have allows for invocation of token retrieval logic that is fully independent from librdkafka, allowing for third party token management that is unknown for librdkafka, for example such as https://github.com/bruce-szalwinski-he/aws-msk-iam-sasl-signer-ruby

The explicit invocation of callback allows to support any token management as long as it is time sensitive. This allows for support of things like the one I linked.

mensfeld commented 1 month ago

FYI there are several ways to manage token refreshes in regards to polling and event polling in librdkafka that with explicit callbacks make things like independent event handling possible when refresh is needed during long processing (aka bypassing max poll interval) and several other things.

As said, I am not willing to invest my time into simplifying this flow unless there is a bigger community pressure of people wanting to deal with this stuff. My time is fairly limited and I do my best to pick up things to work on that will benefit the community the most.

mensfeld commented 1 month ago

I am going to close it. As said, if you are willing to rework this to support the old case that would work for some in an automatic way (which should not be hard I think because it only requires a callback deregistration / lack of registration to default) I am willing to accept a PR :)