confluentinc / librdkafka

The Apache Kafka C/C++ library
Other
182 stars 3.14k forks source link

cooperative-sticky `performReassignments` can get stuck in an infinite loop #4783

Open mensfeld opened 1 month ago

mensfeld commented 1 month ago

Description

I do not know why, but the cooperative-sticky assignment can get stuck in an infinite loop causing 100% CPU usage and hanging.

How to reproduce

At the moment I am able to reproduce it fully in a stable manner.

I subscribe few consumers from the same consumer group to two topics. Each consumer is subscribed to one out of two topics. I start with 1 consumer instance per topic and then every 5 seconds I add one more consumer instance. Everything is fine until I start adding consumers subscribed to the second topic.

Reproduction:

when client.id is not set (or set to the same value) it will cause the described behaviour. If I randomize it it will not.

require 'rdkafka'
require 'securerandom'

CONFIG = {
  'bootstrap.servers': '127.0.0.1:9092',
  "partition.assignment.strategy": "cooperative-sticky",
  "group.id": SecureRandom.uuid
}.freeze

def config
  if ARGV.empty?
    CONFIG
  else
    CONFIG.merge(
      'client.id': "karafka_#{Time.now.to_f}_#{rand}"
    )
  end
end

10.times do |listener|
  Thread.new do
    consumer = ::Rdkafka::Config.new(config).consumer
    consumer.subscribe('workflows_states')

    loop do
      consumer.poll(100)
    end
  end
end

15.times do |listener|
  Thread.new do
    consumer = ::Rdkafka::Config.new(config).consumer
    consumer.subscribe('workflows_activities')

    loop do
      consumer.poll(100)
    end
  end

  sleep(5)
end

sleep

Topics configuration:

config(
  partitions: 10,
  'cleanup.policy': 'compact',
  'min.cleanable.dirty.ratio': 0.01,
  'segment.ms': 1000,
  'delete.retention.ms': 1000,
  'max.compaction.lag.ms': 1000,
  'min.compaction.lag.ms': 0,
  'file.delete.delay.ms': 1000
)

config(
  partitions: 30,
  'cleanup.policy': 'compact',
  'min.cleanable.dirty.ratio': 0.01,
  'segment.ms': 1000,
  'delete.retention.ms': 1000,
  'max.compaction.lag.ms': 1000,
  'min.compaction.lag.ms': 0,
  'file.delete.delay.ms': 1000
)

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

Full librdkafka config

{:"allow.auto.create.topics"=>"true", :"bootstrap.servers"=>"127.0.0.1:9092", :"client.id"=>"constant4-d57cffac-3cda-4a59-b4e0-ed4632f8e405", :"partition.assignment.strategy"=>"cooperative-sticky", :"statistics.interval.ms"=>5000, :"topic.metadata.refresh.interval.ms"=>5000, :"group.id"=>"6c727ccb-1414-4aea-aa2c-131de394e516", :"auto.offset.reset"=>"latest", :"enable.auto.offset.store"=>false}

Broker logs

Since I'm running multi-threaded setup with few consumer instances in one ruby process to simulate it and with randomness of them joining and rejoning there are some logs but they do not differ from when the instance is not stuck

kafka  | [2024-07-16 15:46:39,908] INFO [GroupCoordinator 1]: Dynamic member with unknown member id joins group testme in Stable state. Created a new member id constant-0bb86e08-233c-410e-ae04-b6e3bb473014 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
kafka  | [2024-07-16 15:46:39,908] INFO [GroupCoordinator 1]: Preparing to rebalance group testme in state PreparingRebalance with old generation 599 (__consumer_offsets-38) (reason: Adding new member constant-0bb86e08-233c-410e-ae04-b6e3bb473014 with group instance id None; client reason: not provided) (kafka.coordinator.group.GroupCoordinator)
kafka  | [2024-07-16 15:46:42,754] INFO [GroupCoordinator 1]: Stabilized group testme generation 600 (__consumer_offsets-38) with 6 members (kafka.coordinator.group.GroupCoordinator)
kafka  | [2024-07-16 15:46:42,754] INFO [GroupCoordinator 1]: Assignment received from leader constant-effb8d15-3d87-40a2-800a-fcd0ca5065ed for group testme for generation 600. The group has 6 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
kafka  | [2024-07-16 15:46:42,755] INFO [GroupCoordinator 1]: Preparing to rebalance group testme in state PreparingRebalance with old generation 600 (__consumer_offsets-38) (reason: Updating metadata for member constant-7cabf7cd-72bb-4c59-bbf5-cc489abf960c during Stable; client reason: not provided) (kafka.coordinator.group.GroupCoordinator)
kafka  | [2024-07-16 15:46:45,757] INFO [GroupCoordinator 1]: Stabilized group testme generation 601 (__consumer_offsets-38) with 6 members (kafka.coordinator.group.GroupCoordinator)
kafka  | [2024-07-16 15:46:45,757] INFO [GroupCoordinator 1]: Assignment received from leader constant-effb8d15-3d87-40a2-800a-fcd0ca5065ed for group testme for generation 601. The group has 6 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)

Additional info

watch ps -T -p pid that shows extreme usage of CPU or rdk:main thread:

    PID    SPID TTY          TIME CMD
2043439 2043439 pts/1    00:00:02 ruby
2043439 2043467 pts/1    00:00:00 ruby
2043439 2043474 pts/1    00:00:00 FFI Callback Di
2043439 2043526 pts/1    00:00:00 karafka.web.pro
2043439 2043527 pts/1    00:00:01 karafka.web.tra
2043439 2043528 pts/1    00:00:01 karafka.listene
2043439 2043529 pts/1    00:00:00 karafka.listene
2043439 2043530 pts/1    00:00:01 karafka.worker#
2043439 2043531 pts/1    00:00:01 karafka.worker#
2043439 2043532 pts/1    00:00:01 karafka.worker#
2043439 2043533 pts/1    00:00:01 karafka.worker#
2043439 2043534 pts/1    00:00:01 karafka.worker#
2043439 2043535 pts/1    00:00:01 karafka.worker#
2043439 2043536 pts/1    00:00:00 karafka.worker#
2043439 2043537 pts/1    00:00:00 karafka.worker#
2043439 2043538 pts/1    00:00:00 karafka.worker#
2043439 2043539 pts/1    00:00:00 karafka.worker#
2043439 2043572 pts/1    00:00:00 rdk:main
2043439 2043573 pts/1    00:00:00 rdk:broker-1
2043439 2043574 pts/1    00:00:00 rdk:broker1
2043439 2043575 pts/1    00:00:01 rdkafka.native_
2043439 2043576 pts/1    00:00:00 rdk:broker-1
2043439 2043577 pts/1    00:30:07 rdk:main
2043439 2043578 pts/1    00:00:00 rdk:broker-1
2043439 2043579 pts/1    00:00:00 rdk:broker1
2043439 2043580 pts/1    00:00:00 rdk:broker-1
2043439 2043581 pts/1    00:00:00 rdk:main
2043439 2043582 pts/1    00:00:00 rdk:broker-1
2043439 2043583 pts/1    00:00:00 rdk:broker1
2043439 2043584 pts/1    00:00:00 rdk:broker-1
2043439 2043585 pts/1    00:00:00 rdk:main
2043439 2043586 pts/1    00:00:00 rdk:broker-1
2043439 2043587 pts/1    00:00:00 rdk:broker1
2043439 2046393 pts/1    00:00:00 karafka.listene
2043439 2046395 pts/1    00:00:00 rdk:broker-1
2043439 2046396 pts/1    00:00:00 rdk:main
2043439 2046397 pts/1    00:00:00 rdk:broker-1
2043439 2046398 pts/1    00:00:00 rdk:broker1
2043439 2049456 pts/1    00:00:00 karafka.listene
2043439 2049457 pts/1    00:00:00 rdk:broker-1
2043439 2049458 pts/1    00:00:00 rdk:main
2043439 2049459 pts/1    00:00:00 rdk:broker-1
2043439 2049460 pts/1    00:00:00 rdk:broker1
2043439 2052518 pts/1    00:00:00 karafka.listene
2043439 2052519 pts/1    00:00:00 rdk:broker-1
2043439 2052520 pts/1    00:00:00 rdk:main
2043439 2052521 pts/1    00:00:00 rdk:broker-1
2043439 2052522 pts/1    00:00:00 rdk:broker1
2043439 2054831 pts/1    00:00:00 karafka.listene
2043439 2054832 pts/1    00:00:00 rdk:broker-1
2043439 2054833 pts/1    00:00:00 rdk:main
2043439 2054834 pts/1    00:00:00 rdk:broker-1
2043439 2054835 pts/1    00:00:00 rdk:broker1
2043439 2064446 pts/1    00:00:00 config.rb:42
2043439 2120740 pts/1    00:00:00 karafka.listene
2043439 2120741 pts/1    00:00:00 rdk:broker-1
2043439 2120742 pts/1    00:00:00 rdk:main
2043439 2120743 pts/1    00:00:00 rdk:broker-1

gdb of this thread:

[Switching to thread 23 (Thread 0x7fe1b13f6640 (LWP 2043577))]
#0  0x00007fe1ccbc1479 in rd_string_hash (str=<optimized out>, len=-1) at rdstring.c:267
267 rdstring.c: No such file or directory.
(gdb) bt
#0  0x00007fe1ccbc1479 in rd_string_hash (str=<optimized out>, len=-1) at rdstring.c:267
#1  0x00007fe1ccc340c1 in rd_map_get (rmap=rmap@entry=0x7fe1b13f5370, key=key@entry=0x7fe15c020aa2) at rdmap.c:114
#2  0x00007fe1ccbb79af in isBalanced (partition2AllPotentialConsumers=0x7fe1b13f5430, consumer2AllPotentialPartitions=0x7fe1b13f5490, sortedCurrentSubscriptions=0x7fe1b13f52e0, 
    currentAssignment=<optimized out>, rk=0x7fe19c56c040) at rdkafka_sticky_assignor.c:851
#3  performReassignments (rk=rk@entry=0x7fe19c56c040, partitionMovements=partitionMovements@entry=0x7fe1b13f5670, reassignablePartitions=reassignablePartitions@entry=0x7fe15c013f80, 
    currentAssignment=currentAssignment@entry=0x7fe1b13f5370, prevAssignment=prevAssignment@entry=0x7fe1b13f53d0, sortedCurrentSubscriptions=sortedCurrentSubscriptions@entry=0x7fe1b13f52e0, 
    consumer2AllPotentialPartitions=0x7fe1b13f5490, partition2AllPotentialConsumers=0x7fe1b13f5430, currentPartitionConsumer=0x7fe1b13f54f0, rkri=0x0) at rdkafka_sticky_assignor.c:911
#4  0x00007fe1ccbb976c in balance (rkri=0x0, revocationRequired=0 '\000', currentPartitionConsumer=0x7fe1b13f54f0, partition2AllPotentialConsumers=0x7fe1b13f5430, 
    consumer2AllPotentialPartitions=0x7fe1b13f5490, sortedCurrentSubscriptions=0x7fe1b13f52e0, unassignedPartitions=0x7fe15c014080, sortedPartitions=0x7fe15c013f80, prevAssignment=0x7fe1b13f53d0, 
    currentAssignment=0x7fe1b13f5370, partitionMovements=0x7fe1b13f5670, rk=0x7fe19c56c040) at rdkafka_sticky_assignor.c:1288
#5  rd_kafka_sticky_assignor_assign_cb (rk=0x7fe19c56c040, rkas=<optimized out>, member_id=<optimized out>, metadata=<optimized out>, members=0x7fe15c00b500, member_cnt=<optimized out>, 
    eligible_topics=0x7fe15c008860, eligible_topic_cnt=2, errstr=0x7fe1b13f58e0 "", errstr_size=512, opaque=0x0) at rdkafka_sticky_assignor.c:2038
#6  0x00007fe1ccba0bbd in rd_kafka_assignor_run (rkcg=rkcg@entry=0x7fe19c569180, rkas=rkas@entry=0x7fe19c56d490, metadata=0x7fe15c013930, members=members@entry=0x7fe15c00b500, member_cnt=member_cnt@entry=5, 
    errstr=errstr@entry=0x7fe1b13f58e0 "", errstr_size=512) at rdkafka_assignor.c:377
#7  0x00007fe1ccb82171 in rd_kafka_cgrp_assignor_run (rkcg=0x7fe19c569180, rkas=0x7fe19c56d490, err=<optimized out>, metadata=<optimized out>, members=0x7fe15c00b500, member_cnt=5) at rdkafka_cgrp.c:1846
#8  0x00007fe1ccb827c6 in rd_kafka_cgrp_assignor_handle_Metadata_op (rk=<optimized out>, rkq=<optimized out>, rko=<optimized out>) at rdkafka_cgrp.c:1913
#9  0x00007fe1ccb53cb9 in rd_kafka_op_call (rk=<optimized out>, rkq=<optimized out>, rko=0x7fe15c01fd40) at rdkafka_op.c:744
#10 0x00007fe1ccb54120 in rd_kafka_op_handle (rk=0x7fe19c56c040, rkq=0x7fe1b13f5be0, rko=0x7fe15c01fd40, cb_type=RD_KAFKA_Q_CB_CALLBACK, opaque=0x7fe19c569180, callback=0x7fe1ccb8e8e0 <rd_kafka_cgrp_op_serve>)
    at rdkafka_op.c:915
#11 0x00007fe1ccb50a71 in rd_kafka_q_serve (rkq=0x7fe19c56d1a0, timeout_ms=<optimized out>, max_cnt=max_cnt@entry=0, cb_type=cb_type@entry=RD_KAFKA_Q_CB_CALLBACK, callback=callback@entry=0x0, 
    opaque=opaque@entry=0x0) at rdkafka_queue.c:578
#12 0x00007fe1ccb1f415 in rd_kafka_thread_main (arg=0x7fe19c56c040) at rdkafka.c:2136
#13 0x00007fe1e85dd935 in start_thread (arg=<optimized out>) at ./nptl/pthread_create.c:439
#14 0x00007fe1e866f850 in clone3 () at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:81

it looks like this code never exits:

https://github.com/confluentinc/librdkafka/blob/6eaf89fb124c421b66b43b195879d458a3a31f86/src/rdkafka_sticky_assignor.c#L899

I suspect (though I did not confirm this yet) that it may be caused by a consumer instance leaving during the sticky rebalance or something similar causing this to run forever.

I tried to stop with GDB several times and it's always in the same place (when tracking the backtrace)

frame details:

Stack level 3, frame at 0x7fe1b13f51e0:
 rip = 0x7fe1ccbb79af in performReassignments (rdkafka_sticky_assignor.c:911); saved rip = 0x7fe1ccbb976c
 called by frame at 0x7fe1b13f5780, caller of frame at 0x7fe1b13f51e0
 source language c.
 Arglist at 0x7fe1b13f5098, args: rk=rk@entry=0x7fe19c56c040, partitionMovements=partitionMovements@entry=0x7fe1b13f5670, reassignablePartitions=reassignablePartitions@entry=0x7fe15c013f80, 
    currentAssignment=currentAssignment@entry=0x7fe1b13f5370, prevAssignment=prevAssignment@entry=0x7fe1b13f53d0, sortedCurrentSubscriptions=sortedCurrentSubscriptions@entry=0x7fe1b13f52e0, 
    consumer2AllPotentialPartitions=0x7fe1b13f5490, partition2AllPotentialConsumers=0x7fe1b13f5430, currentPartitionConsumer=0x7fe1b13f54f0, rkri=0x0
 Locals at 0x7fe1b13f5098, Previous frame's sp is 0x7fe1b13f51e0
 Saved registers:

locals for this frame

(gdb) info locals
i = 3
reassignmentPerformed = 1 '\001'
modified = 1 '\001'
saveIsBalanced = 0 '\000'
iterations = <optimized out>
__PRETTY_FUNCTION__ = "performReassignments"

Forcing a rebalance by keeping GBD beyond max.poll.interval.ms does not trigger an exit from this loop. It keeps running forever.

assignor logs from the moment it happens:

rdkafka: [thrd:app]: librdkafka v2.4.0 (0x20400ff) constant2#consumer-8 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer,http,oidc, GCC GXX PKGCONFIG INSTALL GNULD LDS C11THREADS LIBDL PLUGINS ZLIB SSL SASL_CYRUS ZSTD CURL HDRHISTOGRAM LZ4_EXT SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER OAUTHBEARER_OIDC CRC32C_HW, debug 0x20000)
[19e71f6f5ef7] Polling messages...
[a673678b51f5] Polled 0 messages in 1000.12ms
[a673678b51f5] Polling messages...
[c91b9abea810] Polled 0 messages in 1000.11ms
[c91b9abea810] Polling messages...
rdkafka: [thrd:main]: Group "testme4" running cooperative-sticky assignor for 5 member(s) and 2 eligible subscribed topic(s):
rdkafka: [thrd:main]:  Member "constant2-1475e4df-b4d3-4639-a178-9de089030968" with 1 owned partition(s) and 1 subscribed topic(s):
rdkafka: [thrd:main]:   workflows_states [-1]
rdkafka: [thrd:main]:  Member "constant2-8b9eaecb-16b1-441d-8fd4-c5c896030f33" with 0 owned partition(s) and 1 subscribed topic(s):
rdkafka: [thrd:main]:   workflows_activities [-1]
rdkafka: [thrd:main]:  Member "constant2-df71cfbf-e314-40ff-92fd-e821995ab761" with 2 owned partition(s) and 1 subscribed topic(s):
rdkafka: [thrd:main]:   workflows_activities [-1]
rdkafka: [thrd:main]:  Member "constant2-fe6625a3-cd85-449f-8230-3c09c603180d" with 1 owned partition(s) and 1 subscribed topic(s):
rdkafka: [thrd:main]:   workflows_states [-1]
rdkafka: [thrd:main]:  Member "constant2-6884b88a-b45f-4b3c-8f8c-26fec65e7804" (me) with 3 owned partition(s) and 1 subscribed topic(s):
rdkafka: [thrd:main]:   workflows_activities [-1]
rdkafka: [thrd:main]: Sort 7 partitions in existing assignment
rdkafka: [thrd:main]: Prepared balanced reassignment for 5 consumers, 7 available partition(s) where of 0 are unassigned (initializing=false, revocationRequired=false, 0 fixed assignments)
rdkafka: [thrd:main]: workflows_activities [0] can be moved from consumer constant2-df71cfbf-e314-40ff-92fd-e821995ab761 (2 partition(s)) to consumer constant2-8b9eaecb-16b1-441d-8fd4-c5c896030f33 (0 partition(s)) for a more balanced assignment
rdkafka: [thrd:main]: workflows_activities [0] can be moved from consumer constant2-df71cfbf-e314-40ff-92fd-e821995ab761 (2 partition(s)) to consumer constant2-8b9eaecb-16b1-441d-8fd4-c5c896030f33 (0 partition(s)) for a more balanced assignment
rdkafka: [thrd:main]: workflows_activities [0] can be moved from consumer constant2-df71cfbf-e314-40ff-92fd-e821995ab761 (2 partition(s)) to consumer constant2-8b9eaecb-16b1-441d-8fd4-c5c896030f33 (0 partition(s)) for a more balanced assignment
rdkafka: [thrd:main]: workflows_activities [0] reassigned to constant2-8b9eaecb-16b1-441d-8fd4-c5c896030f33 (from constant2-df71cfbf-e314-40ff-92fd-e821995ab761)

More detailed logs (assignor,generic,broker,cgrp,interceptor):

rdkafka: [thrd:app]: Group "testme4": updating member id "(not-set)" -> ""
rdkafka: [thrd:app]: GroupCoordinator: Added new broker with NodeId -1
rdkafka: [thrd:GroupCoordinator]: GroupCoordinator: Enter main broker thread
rdkafka: [thrd:main]: Group "testme4" changed state init -> query-coord (join-state init)
rdkafka: [thrd:main]: Broadcasting state change
rdkafka: [thrd:main]: Group "testme4": no broker available for coordinator query: intervaled in state query-coord
rdkafka: [thrd::0/internal]: :0/internal: Enter main broker thread
rdkafka: [thrd:app]: 127.0.0.1:9092/bootstrap: Added new broker with NodeId -1
rdkafka: [thrd:app]: librdkafka v2.4.0 (0x20400ff) constant3#consumer-8 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer,http,oidc, GCC GXX PKGCONFIG INSTALL GNULD LDS C11THREADS LIBDL PLUGINS ZLIB SSL SASL_CYRUS ZSTD CURL HDRHISTOGRAM LZ4_EXT SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER OAUTHBEARER_OIDC CRC32C_HW, debug 0x20903)
rdkafka: [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Enter main broker thread
rdkafka: [thrd:main]: Group "testme4" received op SUBSCRIBE in state query-coord (join-state init)
rdkafka: [thrd:main]: Group "testme4": subscribe to new subscription of 1 topics (join-state init)
rdkafka: [thrd:main]: 127.0.0.1:9092/bootstrap: Selected for cluster connection: coordinator query (broker has 0 connection attempt(s))
[c564110fbf63] Polling messages...
rdkafka: [thrd:main]: Group "testme4": no broker available for coordinator query: intervaled in state query-coord
rdkafka: [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Received CONNECT op
rdkafka: [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Broker changed state INIT -> TRY_CONNECT
rdkafka: [thrd:127.0.0.1:9092/bootstrap]: Broadcasting state change
rdkafka: [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: broker in state TRY_CONNECT connecting
rdkafka: [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
rdkafka: [thrd:127.0.0.1:9092/bootstrap]: Broadcasting state change
rdkafka: [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Connecting to ipv4#127.0.0.1:9092 (plaintext) with socket 58
rdkafka: [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Connected to ipv4#127.0.0.1:9092
rdkafka: [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Connected (#1)
rdkafka: [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
rdkafka: [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Broker changed state CONNECT -> APIVERSION_QUERY
rdkafka: [thrd:127.0.0.1:9092/bootstrap]: Broadcasting state change
rdkafka: [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2,IdempotentProducer,ZSTD,SaslAuthReq
rdkafka: [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Broker changed state APIVERSION_QUERY -> UP
rdkafka: [thrd:127.0.0.1:9092/bootstrap]: Broadcasting state change
rdkafka: [thrd:main]: 127.0.0.1:9092/bootstrap: Group "testme4": querying for coordinator: intervaled in state query-coord
rdkafka: [thrd:main]: Group "testme4" changed state query-coord -> wait-coord (join-state init)
rdkafka: [thrd:main]: Broadcasting state change
[4585b7d5fd06] Polled 0 messages in 1000.12ms
[4585b7d5fd06] Polling messages...
rdkafka: [thrd:main]: Topic workflows_states [0]: stored offset INVALID (leader epoch -1), committed offset 1102437 (leader epoch -1): not including in commit
rdkafka: [thrd:main]: Assignment dump (started_cnt=1, wait_stop_cnt=0)
rdkafka: [thrd:main]: List with 1 partition(s):
rdkafka: [thrd:main]:  workflows_states [0] offset STORED
rdkafka: [thrd:main]: List with 0 partition(s):
rdkafka: [thrd:main]: List with 0 partition(s):
rdkafka: [thrd:main]: List with 0 partition(s):
rdkafka: [thrd:main]: Group "testme4": assignment operations done in join-state steady (rebalance rejoin=false)
rdkafka: [thrd:main]: GroupCoordinator/1: Heartbeat for group "testme4" generation id 37
rdkafka: [thrd:main]: Group "testme4" heartbeat error response in state up (join-state steady, 1 partition(s) assigned): Broker: Group rebalance in progress
rdkafka: [thrd:main]: Group "testme4": Rejoining group with 1 owned partition(s): Group is rebalancing
rdkafka: [thrd:main]: Group "testme4" changed join state steady -> init (state up)
rdkafka: [thrd:main]: Group "testme4": join with 1 subscribed topic(s)
rdkafka: [thrd:main]: consumer join: metadata for subscription is up to date (4981ms old)
rdkafka: [thrd:main]: 127.0.0.1:9092/1: Joining group "testme4" with 1 subscribed topic(s) and member id "constant3-af7909dc-adc4-40a2-934b-3c7db00e46c0"
rdkafka: [thrd:main]: Group "testme4" changed join state init -> wait-join (state up)
rdkafka: [thrd:GroupCoordinator]: Broadcasting state change
rdkafka: [thrd:main]: JoinGroup response: GenerationId 38, Protocol cooperative-sticky, LeaderId constant3-7fab2ad5-4fde-4076-9675-75643afa957c, my MemberId constant3-af7909dc-adc4-40a2-934b-3c7db00e46c0, member metadata count 0: (no error)
rdkafka: [thrd:main]: Group "testme4" changed join state wait-join -> wait-sync (state up)
rdkafka: [thrd:GroupCoordinator]: Broadcasting state change
[7a91ec18d905] Polled 0 messages in 1000.11ms
[7a91ec18d905] Polling messages...
rdkafka: [thrd:main]: Topic workflows_activities [0]: stored offset INVALID (leader epoch -1), committed offset 80406 (leader epoch -1): not including in commit
rdkafka: [thrd:main]: Topic workflows_activities [1]: stored offset INVALID (leader epoch -1), committed offset 79966 (leader epoch -1): not including in commit
rdkafka: [thrd:main]: Assignment dump (started_cnt=2, wait_stop_cnt=0)
rdkafka: [thrd:main]: List with 2 partition(s):
rdkafka: [thrd:main]:  workflows_activities [0] offset STORED
rdkafka: [thrd:main]:  workflows_activities [1] offset STORED
rdkafka: [thrd:main]: List with 0 partition(s):
rdkafka: [thrd:main]: List with 0 partition(s):
rdkafka: [thrd:main]: List with 0 partition(s):
rdkafka: [thrd:main]: Group "testme4": assignment operations done in join-state steady (rebalance rejoin=false)
rdkafka: [thrd:main]: GroupCoordinator/1: Heartbeat for group "testme4" generation id 37
rdkafka: [thrd:main]: Group "testme4" heartbeat error response in state up (join-state steady, 2 partition(s) assigned): Broker: Group rebalance in progress
rdkafka: [thrd:main]: Group "testme4": Rejoining group with 2 owned partition(s): Group is rebalancing
rdkafka: [thrd:main]: Group "testme4" changed join state steady -> init (state up)
rdkafka: [thrd:main]: Group "testme4": join with 1 subscribed topic(s)
rdkafka: [thrd:main]: consumer join: metadata for subscription is up to date (4982ms old)
rdkafka: [thrd:main]: 127.0.0.1:9092/1: Joining group "testme4" with 1 subscribed topic(s) and member id "constant3-827aea58-05f4-4cf1-a98c-1b098320bce9"
rdkafka: [thrd:main]: Group "testme4" changed join state init -> wait-join (state up)
rdkafka: [thrd:GroupCoordinator]: Broadcasting state change
rdkafka: [thrd:main]: JoinGroup response: GenerationId 38, Protocol cooperative-sticky, LeaderId constant3-7fab2ad5-4fde-4076-9675-75643afa957c, my MemberId constant3-827aea58-05f4-4cf1-a98c-1b098320bce9, member metadata count 0: (no error)
rdkafka: [thrd:main]: Group "testme4" changed join state wait-join -> wait-sync (state up)
rdkafka: [thrd:GroupCoordinator]: Broadcasting state change
[4bcddb16978c] Polled 0 messages in 1000.09ms
[4bcddb16978c] Polling messages...
rdkafka: [thrd:main]: Topic workflows_states [1]: stored offset INVALID (leader epoch -1), committed offset 1098836 (leader epoch -1): not including in commit
rdkafka: [thrd:main]: Assignment dump (started_cnt=1, wait_stop_cnt=0)
rdkafka: [thrd:main]: List with 1 partition(s):
rdkafka: [thrd:main]:  workflows_states [1] offset STORED
rdkafka: [thrd:main]: List with 0 partition(s):
rdkafka: [thrd:main]: List with 0 partition(s):
rdkafka: [thrd:main]: List with 0 partition(s):
rdkafka: [thrd:main]: Group "testme4": assignment operations done in join-state steady (rebalance rejoin=false)
rdkafka: [thrd:main]: GroupCoordinator/1: Heartbeat for group "testme4" generation id 37
rdkafka: [thrd:main]: Group "testme4" heartbeat error response in state up (join-state steady, 1 partition(s) assigned): Broker: Group rebalance in progress
rdkafka: [thrd:main]: Group "testme4": Rejoining group with 1 owned partition(s): Group is rebalancing
rdkafka: [thrd:main]: Group "testme4" changed join state steady -> init (state up)
rdkafka: [thrd:main]: Group "testme4": join with 1 subscribed topic(s)
rdkafka: [thrd:main]: consumer join: metadata for subscription is up to date (4961ms old)
rdkafka: [thrd:main]: 127.0.0.1:9092/1: Joining group "testme4" with 1 subscribed topic(s) and member id "constant3-087f5a75-942b-4dd6-97eb-8bde922dab34"
rdkafka: [thrd:main]: Group "testme4" changed join state init -> wait-join (state up)
rdkafka: [thrd:GroupCoordinator]: Broadcasting state change
rdkafka: [thrd:main]: JoinGroup response: GenerationId 38, Protocol cooperative-sticky, LeaderId constant3-7fab2ad5-4fde-4076-9675-75643afa957c, my MemberId constant3-087f5a75-942b-4dd6-97eb-8bde922dab34, member metadata count 0: (no error)
rdkafka: [thrd:main]: Group "testme4" changed join state wait-join -> wait-sync (state up)
rdkafka: [thrd:GroupCoordinator]: Broadcasting state change
[fd6e389a6f0a] Polled 0 messages in 1000.11ms
[fd6e389a6f0a] Polling messages...
rdkafka: [thrd:main]: Topic workflows_activities [2]: stored offset INVALID (leader epoch -1), committed offset 80326 (leader epoch -1): not including in commit
rdkafka: [thrd:main]: Topic workflows_activities [3]: stored offset INVALID (leader epoch -1), committed offset 80112 (leader epoch -1): not including in commit
rdkafka: [thrd:main]: Topic workflows_activities [4]: stored offset INVALID (leader epoch -1), committed offset 80529 (leader epoch -1): not including in commit
rdkafka: [thrd:main]: Assignment dump (started_cnt=3, wait_stop_cnt=0)
rdkafka: [thrd:main]: List with 3 partition(s):
rdkafka: [thrd:main]:  workflows_activities [2] offset STORED
rdkafka: [thrd:main]:  workflows_activities [3] offset STORED
rdkafka: [thrd:main]:  workflows_activities [4] offset STORED
rdkafka: [thrd:main]: List with 0 partition(s):
rdkafka: [thrd:main]: List with 0 partition(s):
rdkafka: [thrd:main]: List with 0 partition(s):
rdkafka: [thrd:main]: Group "testme4": assignment operations done in join-state steady (rebalance rejoin=false)
rdkafka: [thrd:main]: GroupCoordinator/1: Heartbeat for group "testme4" generation id 37
rdkafka: [thrd:main]: Group "testme4" heartbeat error response in state up (join-state steady, 3 partition(s) assigned): Broker: Group rebalance in progress
rdkafka: [thrd:main]: Group "testme4": Rejoining group with 3 owned partition(s): Group is rebalancing
rdkafka: [thrd:main]: Group "testme4" changed join state steady -> init (state up)
rdkafka: [thrd:main]: Group "testme4": join with 1 subscribed topic(s)
rdkafka: [thrd:main]: consumer join: metadata for subscription is up to date (4961ms old)
rdkafka: [thrd:main]: 127.0.0.1:9092/1: Joining group "testme4" with 1 subscribed topic(s) and member id "constant3-7fab2ad5-4fde-4076-9675-75643afa957c"
rdkafka: [thrd:main]: Group "testme4" changed join state init -> wait-join (state up)
rdkafka: [thrd:GroupCoordinator]: Broadcasting state change
rdkafka: [thrd:main]: JoinGroup response: GenerationId 38, Protocol cooperative-sticky, LeaderId constant3-7fab2ad5-4fde-4076-9675-75643afa957c (me), my MemberId constant3-7fab2ad5-4fde-4076-9675-75643afa957c, member metadata count 5: (no error)
rdkafka: [thrd:main]: I am elected leader for group "testme4" with 5 member(s)
rdkafka: [thrd:main]: Group "testme4": resetting group leader info: JoinGroup response clean-up
rdkafka: [thrd:main]: Group "testme4" changed join state wait-join -> wait-metadata (state up)
rdkafka: [thrd:main]: Group "testme4" running cooperative-sticky assignor for 5 member(s) and 2 eligible subscribed topic(s):
rdkafka: [thrd:main]:  Member "constant3-7fab2ad5-4fde-4076-9675-75643afa957c" (me) with 3 owned partition(s) and 1 subscribed topic(s):
rdkafka: [thrd:main]:   workflows_activities [-1]
rdkafka: [thrd:main]:  Member "constant3-53802b00-adeb-4a3b-983d-c8b7e97e42e8" with 0 owned partition(s) and 1 subscribed topic(s):
rdkafka: [thrd:main]:   workflows_activities [-1]
rdkafka: [thrd:main]:  Member "constant3-af7909dc-adc4-40a2-934b-3c7db00e46c0" with 1 owned partition(s) and 1 subscribed topic(s):
rdkafka: [thrd:main]:   workflows_states [-1]
rdkafka: [thrd:main]:  Member "constant3-827aea58-05f4-4cf1-a98c-1b098320bce9" with 2 owned partition(s) and 1 subscribed topic(s):
rdkafka: [thrd:main]:   workflows_activities [-1]
rdkafka: [thrd:main]:  Member "constant3-087f5a75-942b-4dd6-97eb-8bde922dab34" with 1 owned partition(s) and 1 subscribed topic(s):
rdkafka: [thrd:main]:   workflows_states [-1]
rdkafka: [thrd:main]: Sort 7 partitions in existing assignment
rdkafka: [thrd:main]: Prepared balanced reassignment for 5 consumers, 7 available partition(s) where of 0 are unassigned (initializing=false, revocationRequired=false, 0 fixed assignments)
rdkafka: [thrd:main]: workflows_activities [0] can be moved from consumer constant3-827aea58-05f4-4cf1-a98c-1b098320bce9 (2 partition(s)) to consumer constant3-53802b00-adeb-4a3b-983d-c8b7e97e42e8 (0 partition(s)) for a more balanced assignment
rdkafka: [thrd:main]: workflows_activities [0] can be moved from consumer constant3-827aea58-05f4-4cf1-a98c-1b098320bce9 (2 partition(s)) to consumer constant3-53802b00-adeb-4a3b-983d-c8b7e97e42e8 (0 partition(s)) for a more balanced assignment
rdkafka: [thrd:main]: workflows_activities [0] can be moved from consumer constant3-827aea58-05f4-4cf1-a98c-1b098320bce9 (2 partition(s)) to consumer constant3-53802b00-adeb-4a3b-983d-c8b7e97e42e8 (0 partition(s)) for a more balanced assignment
rdkafka: [thrd:main]: workflows_activities [0] reassigned to constant3-53802b00-adeb-4a3b-983d-c8b7e97e42e8 (from constant3-827aea58-05f4-4cf1-a98c-1b098320bce9)
rdkafka: [thrd:main]: 127.0.0.1:9092/bootstrap: Group "testme4" coordinator is 127.0.0.1:9092 id 1
rdkafka: [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: NodeId changed from -1 to 1
rdkafka: [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/1: Name changed from 127.0.0.1:9092/bootstrap to 127.0.0.1:9092/1
rdkafka: [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/1: Mapped 0 partition(s) to broker
rdkafka: [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/1: Broker changed state UP -> UPDATE
rdkafka: [thrd:127.0.0.1:9092/bootstrap]: Broadcasting state change
rdkafka: [thrd:127.0.0.1:9092/bootstrap]: Broadcasting state change
rdkafka: [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/1: Broker changed state UPDATE -> UP
rdkafka: [thrd:127.0.0.1:9092/bootstrap]: Broadcasting state change
rdkafka: [thrd:main]: Group "testme4" changing coordinator -1 -> 1
rdkafka: [thrd:main]: Group "testme4" coordinator set to broker 127.0.0.1:9092/1
rdkafka: [thrd:main]: Group "testme4" changed state wait-coord -> wait-broker-transport (join-state init)
rdkafka: [thrd:main]: Broadcasting state change
rdkafka: [thrd:main]: GroupCoordinator: Broker nodename changed from "" to "127.0.0.1:9092"
rdkafka: [thrd:main]: GroupCoordinator: Broker nodeid changed from -1 to 1
rdkafka: [thrd:main]: 127.0.0.1:9092/1: Group "testme4": querying for coordinator: intervaled in state wait-broker-transport
rdkafka: [thrd:GroupCoordinator]: GroupCoordinator/1: Received CONNECT op
rdkafka: [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state INIT -> TRY_CONNECT
rdkafka: [thrd:GroupCoordinator]: Broadcasting state change
rdkafka: [thrd:GroupCoordinator]: GroupCoordinator/1: broker in state TRY_CONNECT connecting
rdkafka: [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state TRY_CONNECT -> CONNECT
rdkafka: [thrd:GroupCoordinator]: Broadcasting state change
rdkafka: [thrd:GroupCoordinator]: GroupCoordinator/1: Connecting to ipv4#127.0.0.1:9092 (plaintext) with socket 59
rdkafka: [thrd:GroupCoordinator]: GroupCoordinator/1: Connected to ipv4#127.0.0.1:9092
rdkafka: [thrd:GroupCoordinator]: GroupCoordinator/1: Connected (#1)
rdkafka: [thrd:GroupCoordinator]: GroupCoordinator/1: Updated enabled protocol features +ApiVersion to ApiVersion
rdkafka: [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state CONNECT -> APIVERSION_QUERY
rdkafka: [thrd:GroupCoordinator]: Broadcasting state change
rdkafka: [thrd:main]: 127.0.0.1:9092/1: Group "testme4" coordinator is 127.0.0.1:9092 id 1
rdkafka: [thrd:GroupCoordinator]: GroupCoordinator/1: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2,IdempotentProducer,ZSTD,SaslAuthReq
rdkafka: [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state APIVERSION_QUERY -> UP
rdkafka: [thrd:GroupCoordinator]: Broadcasting state change
rdkafka: [thrd:main]: Group "testme4" changed state wait-broker-transport -> up (join-state init)
[c564110fbf63] Polled 0 messages in 1001.53ms
rdkafka: [thrd:main]: Broadcasting state change
[c564110fbf63] Polling messages...
rdkafka: [thrd:main]: Group "testme4": join with 0 subscribed topic(s)
rdkafka: [thrd:main]: consumer join: metadata for subscription only available for 0/1 topics (-1ms old)
rdkafka: [thrd:main]: Group "testme4": postponing join until up-to-date metadata is available
rdkafka: [thrd:main]: Group "testme4" changed join state init -> wait-metadata (state up)
rdkafka: [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
rdkafka: [thrd:main]: List with 0 partition(s):
rdkafka: [thrd:main]: List with 0 partition(s):
rdkafka: [thrd:main]: List with 0 partition(s):
rdkafka: [thrd:main]: List with 0 partition(s):
rdkafka: [thrd:main]: Group "testme4": assignment operations done in join-state wait-metadata (rebalance rejoin=false)
rdkafka: [thrd:main]: 127.0.0.1:9092/1: ClusterId update "" -> "kafka-docker-cluster-w"
rdkafka: [thrd:main]: 127.0.0.1:9092/1: ControllerId update -1 -> 1
rdkafka: [thrd:main]: Broadcasting state change
rdkafka: [thrd:main]: Group "testme4": effective subscription list changed from 0 to 1 topic(s):
rdkafka: [thrd:main]:  Topic workflows_activities with 5 partition(s)
rdkafka: [thrd:main]: Group "testme4": subscription updated from metadata change: rejoining group in state wait-metadata
rdkafka: [thrd:main]: Group "testme4": resetting group leader info: group (re)join
rdkafka: [thrd:main]: Group "testme4" (re)joining in join-state wait-metadata with 0 assigned partition(s): Metadata for subscribed topic(s) has changed
rdkafka: [thrd:main]: Group "testme4" initiating rebalance (NONE) in state up (join-state wait-metadata) with 0 assigned partition(s): Metadata for subscribed topic(s) has changed
rdkafka: [thrd:main]: Group "testme4": Rejoining group without an assignment: Metadata for subscribed topic(s) has changed
rdkafka: [thrd:main]: Group "testme4" changed join state wait-metadata -> init (state up)
rdkafka: [thrd:main]: Group "testme4": join with 1 subscribed topic(s)
rdkafka: [thrd:main]: consumer join: metadata for subscription is up to date (0ms old)
rdkafka: [thrd:main]: 127.0.0.1:9092/1: Joining group "testme4" with 1 subscribed topic(s) and member id ""
rdkafka: [thrd:main]: Group "testme4" changed join state init -> wait-join (state up)
rdkafka: [thrd:GroupCoordinator]: Broadcasting state change
rdkafka: [thrd:main]: JoinGroup response: GenerationId -1, Protocol , LeaderId , my MemberId constant3-53802b00-adeb-4a3b-983d-c8b7e97e42e8, member metadata count 0: Broker: Group member needs a valid member ID
rdkafka: [thrd:main]: GroupCoordinator/1: JoinGroupRequest failed: Broker: Group member needs a valid member ID: explicit actions Ignore
rdkafka: [thrd:main]: Group "testme4": updating member id "" -> "constant3-53802b00-adeb-4a3b-983d-c8b7e97e42e8"
rdkafka: [thrd:main]: Group "testme4": Rejoining group without an assignment: JoinGroup error: Broker: Group member needs a valid member ID
rdkafka: [thrd:main]: Group "testme4" changed join state wait-join -> init (state up)
rdkafka: [thrd:main]: Group "testme4": join with 1 subscribed topic(s)
rdkafka: [thrd:main]: consumer join: metadata for subscription is up to date (0ms old)
rdkafka: [thrd:main]: 127.0.0.1:9092/1: Joining group "testme4" with 1 subscribed topic(s) and member id "constant3-53802b00-adeb-4a3b-983d-c8b7e97e42e8"
rdkafka: [thrd:main]: Group "testme4" changed join state init -> wait-join (state up)
rdkafka: [thrd:GroupCoordinator]: Broadcasting state change
rdkafka: [thrd:main]: JoinGroup response: GenerationId 38, Protocol cooperative-sticky, LeaderId constant3-7fab2ad5-4fde-4076-9675-75643afa957c, my MemberId constant3-53802b00-adeb-4a3b-983d-c8b7e97e42e8, member metadata count 0: (no error)
rdkafka: [thrd:main]: Group "testme4" changed join state wait-join -> wait-sync (state up)
rdkafka: [thrd:GroupCoordinator]: Broadcasting state change
[4585b7d5fd06] Polled 0 messages in 1001.3ms
[4585b7d5fd06] Polling messages...
[7a91ec18d905] Polled 0 messages in 1001.29ms

Once in a while when I try to shutdown such hanging consumer I get:

rdkafka: [thrd:main]: Group "a81a480d-1651-4f57-b0d9-5c264cf51bc0": unexpected instruction to revoke current assignment and rebalance (terminating=0, assignment_lost=0, LEAVE_ON_UNASSIGN_DONE=0)
mensfeld commented 1 month ago

Ok I can reproduce it. As long as I have same consumer group different consumers subscribed to different topics and try to upscale, the assignor gets stuck in this loop. When I make the topics use different CGs, it no longer hangs.

I am not sure at the moment whether this is a multi-process issue or an issue that only affects a single process (some state shared?, name collision?) setup with multiple librdkafka consumers subscribed to multiple topics within the same consumer group. Will continue the investigation.

mensfeld commented 1 month ago

Here are some extensive debug logs. The moment I see the 100% CPU spike I kill -9 the whole process. I cannot however pinpoint (yet) where it starts in the logs exactly.

details.txt

What is also interesting is that it is responding (I can poll data) but it hangs when I attempt to close it.

mensfeld commented 1 month ago

Kafka logs from the moment of the hanging assignment:

kafka  | [2024-07-17 12:00:52,235] INFO [GroupCoordinator 1]: Dynamic member with unknown member id joins group eabcf19b-efb8-4f5f-8598-faf7cacb4dd1 in PreparingRebalance state. Created a new member id karafka_production-87f747f2-f652-4df6-b05c-51c380f2f4ec and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
kafka  | [2024-07-17 12:00:56,238] INFO [GroupCoordinator 1]: Stabilized group eabcf19b-efb8-4f5f-8598-faf7cacb4dd1 generation 1 (__consumer_offsets-5) with 12 members (kafka.coordinator.group.GroupCoordinator)
kafka  | [2024-07-17 12:00:56,240] INFO [GroupCoordinator 1]: Assignment received from leader karafka_production-20dfb758-1b0f-49ba-aa40-f3cfe97c1593 for group eabcf19b-efb8-4f5f-8598-faf7cacb4dd1 for generation 1. The group has 12 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
kafka  | [2024-07-17 12:00:57,236] INFO [GroupCoordinator 1]: Dynamic member with unknown member id joins group eabcf19b-efb8-4f5f-8598-faf7cacb4dd1 in Stable state. Created a new member id karafka_production-299aaadc-80d4-4852-a022-e9370c1abf9f and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
kafka  | [2024-07-17 12:00:57,236] INFO [GroupCoordinator 1]: Preparing to rebalance group eabcf19b-efb8-4f5f-8598-faf7cacb4dd1 in state PreparingRebalance with old generation 1 (__consumer_offsets-5) (reason: Adding new member karafka_production-299aaadc-80d4-4852-a022-e9370c1abf9f with group instance id None; client reason: not provided) (kafka.coordinator.group.GroupCoordinator)
kafka  | [2024-07-17 12:00:59,243] INFO [GroupCoordinator 1]: Stabilized group eabcf19b-efb8-4f5f-8598-faf7cacb4dd1 generation 2 (__consumer_offsets-5) with 13 members (kafka.coordinator.group.GroupCoordinator)
kafka  | [2024-07-17 12:00:59,244] INFO [GroupCoordinator 1]: Assignment received from leader karafka_production-20dfb758-1b0f-49ba-aa40-f3cfe97c1593 for group eabcf19b-efb8-4f5f-8598-faf7cacb4dd1 for generation 2. The group has 13 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
kafka  | [2024-07-17 12:00:59,245] INFO [GroupCoordinator 1]: Preparing to rebalance group eabcf19b-efb8-4f5f-8598-faf7cacb4dd1 in state PreparingRebalance with old generation 2 (__consumer_offsets-5) (reason: Updating metadata for member karafka_production-06a7ceb2-c734-4904-b002-acbbdfcbcf68 during Stable; client reason: not provided) (kafka.coordinator.group.GroupCoordinator)
kafka  | [2024-07-17 12:01:02,237] INFO [GroupCoordinator 1]: Dynamic member with unknown member id joins group eabcf19b-efb8-4f5f-8598-faf7cacb4dd1 in PreparingRebalance state. Created a new member id karafka_production-befd7019-b4e1-495d-9449-430659f20326 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
kafka  | [2024-07-17 12:01:02,246] INFO [GroupCoordinator 1]: Stabilized group eabcf19b-efb8-4f5f-8598-faf7cacb4dd1 generation 3 (__consumer_offsets-5) with 14 members (kafka.coordinator.group.GroupCoordinator)
kafka  | [2024-07-17 12:01:07,239] INFO [GroupCoordinator 1]: Dynamic member with unknown member id joins group eabcf19b-efb8-4f5f-8598-faf7cacb4dd1 in CompletingRebalance state. Created a new member id karafka_production-f47116b2-d13c-4466-a6c2-83c49edf6cbc and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
kafka  | [2024-07-17 12:01:07,239] INFO [GroupCoordinator 1]: Preparing to rebalance group eabcf19b-efb8-4f5f-8598-faf7cacb4dd1 in state PreparingRebalance with old generation 3 (__consumer_offsets-5) (reason: Adding new member karafka_production-f47116b2-d13c-4466-a6c2-83c49edf6cbc with group instance id None; client reason: not provided) (kafka.coordinator.group.GroupCoordinator)

what is crazy, is that when I use a random client id that is time based, it does not hang. So I suspect it is somehow related to metadata requests. I can not reproduce it within Karafka always.

mensfeld commented 1 month ago

What is SUPER interesting to me, is the fact that this does not happen if I set client.id as follows (timestamps):

"1721229018.5395756"
"1721229018.539616"
"1721229018.539644"
"1721229018.5396647"
"1721229018.5396793"
"1721229018.5397322"
"1721229023.5395145"
"1721229028.5397005"
"1721229033.5398915"
"1721229038.5401068"
"1721229043.5417292"
"1721229048.5419412"
"1721229053.5420904"
"1721229058.5422208"
"1721229063.5424514"
"1721229068.5425892"
"1721229073.5427814"
"1721229078.5429163"
"1721229083.54326"
"1721229088.5432706"
"1721229093.5435174"
"1721229098.5436523"
"1721229103.5438313"
"1721229108.544021"
"1721229113.5441961"
"1721229118.5444007"
"1721229123.5445986"
"1721229128.5449526"
"1721229133.5449734"
"1721229138.5451102"

but when I use a constant client.id or a random string (like uuid), it fails :man_shrugging:

mensfeld commented 1 month ago

One more update: I wanted to check if this isn't a Kafka issue with how it caches (incorrectly) some of the metadata responses but with RedPanda librdkafka presents the same behaviour and the same mitigation.

emasab commented 1 month ago

Thanks a lot @mensfeld ! Could reproduce it and found the cause. It's because of using the same variable i in one nested loop here: https://github.com/confluentinc/librdkafka/blob/6eaf89fb124c421b66b43b195879d458a3a31f86/src/rdkafka_sticky_assignor.c#L821

mensfeld commented 1 month ago

@emasab you are welcome. Can you explain to me why was it mitigated by client id randomization?

emasab commented 1 month ago

Not in all cases, I could reproduce it in Python even with "client.id": str(time.time()) or with "client.id": str(random.randint(1,1000000)). It happens when number of potential partitions in inner loop is less than number of members in outer loop minus one and members after potential partition count don't have consumerPartitions->cnt == potentialTopicPartitions->cnt that causes a continue and doesn't reset the index.

mensfeld commented 1 month ago

@emasab do you have an ETA for this maybe? Just a PR would help me because I could temporarily cherry-pick this and release as a special release for ppl affected on my side.

mensfeld commented 1 month ago

this seems to work

--- rdkafka_sticky_assignor.c   2024-07-08 09:47:43.000000000 +0200
+++ rdkafka_sticky_assignor_m.c 2024-07-30 09:44:38.529759640 +0200
@@ -769,7 +769,7 @@
         const rd_kafka_topic_partition_list_t *partitions;
         const char *consumer;
         const rd_map_elem_t *elem;
-        int i;
+        int i, j;

         /* The assignment is balanced if minimum and maximum numbers of
          * partitions assigned to consumers differ by at most one. */
@@ -836,9 +836,9 @@

                 /* Otherwise make sure it can't get any more partitions */

-                for (i = 0; i < potentialTopicPartitions->cnt; i++) {
+                for (j = 0; j < potentialTopicPartitions->cnt; j++) {
                         const rd_kafka_topic_partition_t *partition =
-                            &potentialTopicPartitions->elems[i];
+                            &potentialTopicPartitions->elems[j];
                         const char *otherConsumer;
                         int otherConsumerPartitionCount;