confluentinc / librdkafka

The Apache Kafka C/C++ library
Other
200 stars 3.14k forks source link

Question about rd_kafka_cgrp_op_serve: assert: rktp->rktp_assigned #1983

Closed zenonparker closed 3 years ago

zenonparker commented 6 years ago

Description

My application is aborting on this assertion mentioned below (rd_kafka_cgrp_op_serve: assert: rktp->rktp_assigned) but I'm not sure what, if anything, I'm doing wrong here.

The consumer that I create doesn't do any subscribe() calls - only self assign()ment. Unfortunately I can't provide the entire code but it essentially amounts to this:

// Note: my_callback looks exactly like the example in the documentation for rd_kafka_conf_set_rebalance_cb
rd_kafka_conf_set_rebalance_cb(config_handle, &my_callback);
rd_kafka_t* ptr = rd_kafka_new(RD_KAFKA_CONSUMER, ...);
rd_kafka_poll_set_consumer(ptr);
rd_kafka_query_watermark_offsets(...);

// Assign myself the offset of the last message so I can consume it.
rd_kafka_assign(...);

// Consume last message.
rd_kafka_consumer_poll(...);

// Reassign offsets to myself, in this particular example I was setting it to the offset of the last message again.
rd_kafka_assign(...);

// Close my consumer.
rd_kafka_consumer_close(...);

Other notes:

Error output from librdkafka:

*** rdkafka_cgrp.c:2808:rd_kafka_cgrp_op_serve: assert: rktp->rktp_assigned ***
rd_kafka_t 0x7f13380042f0: rdkafka#consumer-1
 producer.msg_cnt 0 (0 bytes)
 rk_rep reply queue: 0 ops
 brokers:
 rd_kafka_broker_t 0x7f1338005770: :0/internal NodeId -1 in state UP (for 0.501s)
  refcnt 2
  outbuf_cnt: 0 waitresp_cnt: 0
  0 messages sent, 0 bytes, 0 errors, 0 timeouts
  0 messages received, 0 bytes, 0 errors
  0 messageset transmissions were retried
  0 toppars:
 rd_kafka_broker_t 0x7f13380062d0: localhost:9092/bootstrap NodeId -1 in state UP (for 0.501s)
  refcnt 2
  outbuf_cnt: 0 waitresp_cnt: 0
  4 messages sent, 110 bytes, 0 errors, 0 timeouts
  4 messages received, 455 bytes, 0 errors
  0 messageset transmissions were retried
  0 toppars:
 rd_kafka_broker_t 0x7f1334000a50: mydev.mydomain.com:9092/0 NodeId 0 in state UP (for 0.500s)
  refcnt 6
  outbuf_cnt: 0 waitresp_cnt: 0
  6 messages sent, 363 bytes, 0 errors, 0 timeouts
  6 messages received, 821 bytes, 0 errors
  0 messageset transmissions were retried
  1 toppars:
   mytopic-test-1 [0] leader mydev.mydomain.com:9092/0
    refcnt 5
    msgq:      0 messages
    xmit_msgq: 0 messages
    total:     0 messages, 0 bytes
 cgrp:
  my_group in state up, flags 0x1
   coord_id 0, managing broker mydev.mydomain.com:9092/0
  toppars:
 topics:
  mytopic-test-1 with 1 partitions, state exists, refcnt 3
   mytopic-test-1 [-1] leader none
    refcnt 1
    msgq:      0 messages
    xmit_msgq: 0 messages
    total:     0 messages, 0 bytes
   desired partitions:

Metadata cache with 1 entries:
  mytopic-test-1 (inserted 1ms ago, expires in 899998ms, 1 partition(s), valid)

Stack trace:

Thread 6 (Thread 0x7f133e367700 (LWP 15185)):
#0  0x00000031ce80e5ba in __lll_unlock_wake () from /lib64/libpthread.so.0
#1  0x00000031ce80b20e in _L_unlock_713 () from /lib64/libpthread.so.0
#2  0x00000031ce80b180 in pthread_mutex_unlock () from /lib64/libpthread.so.0
#3  0x00007f13410e29f9 in mtx_unlock (mtx=mtx@entry=0x7f1334002250) at tinycthread.c:284
#4  0x00007f13410f1ce2 in rd_kafka_q_concat0 (do_lock=<optimized out>, srcq=<optimized out>, rkq=0x7f1334002250) at rdkafka_queue.h:495
#5  rd_kafka_msgset_reader_run (msetr=msetr@entry=0x7f133e364660) at rdkafka_msgset_reader.c:1093
#6  0x00007f13410f8e7e in rd_kafka_msgset_parse (rkbuf=rkbuf@entry=0x7f1328000b20, request=request@entry=0x7f1328001350, rktp=rktp@entry=0x7f1334001f00, tver=tver@entry=0x7f13280016f8) at rdkafka_msgset_reader.c:1135
#7  0x00007f134108b17b in rd_kafka_fetch_reply_handle (rkb=rkb@entry=0x7f1334000a50, rkbuf=0x7f1328000b20, request=0x7f1328001350) at rdkafka_broker.c:3121
#8  0x00007f134108cfa7 in rd_kafka_broker_fetch_reply (rk=<optimized out>, rkb=0x7f1334000a50, err=RD_KAFKA_RESP_ERR_NO_ERROR, reply=<optimized out>, request=<optimized out>, opaque=<optimized out>) at rdkafka_broker.c:3176
#9  0x00007f13410a96ce in rd_kafka_buf_callback (rk=<optimized out>, rkb=<optimized out>, err=RD_KAFKA_RESP_ERR_NO_ERROR, response=0x7f1328000b20, request=0x7f1328001350) at rdkafka_buf.c:444
#10 0x00007f134108fb2c in rd_kafka_req_response (rkbuf=0x7f1328000b20, rkb=0x7f1334000a50) at rdkafka_broker.c:1288
#11 rd_kafka_recv (rkb=rkb@entry=0x7f1334000a50) at rdkafka_broker.c:1400
#12 0x00007f13410a7918 in rd_kafka_transport_io_event (rktrans=0x7f1328000a50, events=1) at rdkafka_transport.c:1417
#13 0x00007f13410a831e in rd_kafka_transport_io_serve (rktrans=<optimized out>, timeout_ms=<optimized out>) at rdkafka_transport.c:1466
#14 0x00007f1341096167 in rd_kafka_broker_serve (rkb=rkb@entry=0x7f1334000a50, abs_timeout=33610124035464) at rdkafka_broker.c:2533
#15 0x00007f13410973f8 in rd_kafka_broker_consumer_serve (rkb=0x7f1334000a50) at rdkafka_broker.c:3439
#16 rd_kafka_broker_thread_main (arg=arg@entry=0x7f1334000a50) at rdkafka_broker.c:3551
#17 0x00007f13410e2907 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:624
#18 0x00000031ce807ee5 in start_thread () from /lib64/libpthread.so.0
#19 0x00000031ce0f4b8d in clone () from /lib64/libc.so.6

Thread 5 (Thread 0x7f133f369700 (LWP 15183)):
#0  0x00000031ce80c049 in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1  0x00007f13410e2b4d in cnd_timedwait_ms (cnd=cnd@entry=0x7f1338006078, mtx=mtx@entry=0x7f1338006050, timeout_ms=timeout_ms@entry=1000) at tinycthread.c:501
#2  0x00007f13410ad748 in rd_kafka_q_pop_serve (rkq=0x7f1338006050, timeout_ms=1000, version=version@entry=0, cb_type=cb_type@entry=RD_KAFKA_Q_CB_RETURN, callback=callback@entry=0x0, opaque=opaque@entry=0x0) at rdkafka_queue.c:378
#3  0x00007f13410ad880 in rd_kafka_q_pop (rkq=<optimized out>, timeout_ms=<optimized out>, version=version@entry=0) at rdkafka_queue.c:409
#4  0x00007f134109607f in rd_kafka_broker_ops_serve (rkb=rkb@entry=0x7f1338005770, timeout_ms=<optimized out>) at rdkafka_broker.c:2488
#5  0x00007f134109612e in rd_kafka_broker_serve (rkb=rkb@entry=0x7f1338005770, abs_timeout=abs_timeout@entry=33610123534239) at rdkafka_broker.c:2510
#6  0x00007f134109664f in rd_kafka_broker_ua_idle (rkb=rkb@entry=0x7f1338005770, timeout_ms=<optimized out>, timeout_ms@entry=-1) at rdkafka_broker.c:2595
#7  0x00007f13410971a5 in rd_kafka_broker_thread_main (arg=arg@entry=0x7f1338005770) at rdkafka_broker.c:3547
#8  0x00007f13410e2907 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:624
#9  0x00000031ce807ee5 in start_thread () from /lib64/libpthread.so.0
#10 0x00000031ce0f4b8d in clone () from /lib64/libc.so.6

Thread 4 (Thread 0x7f133eb68700 (LWP 15184)):
#0  0x00000031ce0ea71d in poll () from /lib64/libc.so.6
#1  0x00007f13410a82a9 in rd_kafka_transport_poll (rktrans=0x7f1330000a50, tmout=<optimized out>) at rdkafka_transport.c:1598
#2  0x00007f13410a82fb in rd_kafka_transport_io_serve (rktrans=0x7f1330000a50, timeout_ms=998) at rdkafka_transport.c:1461
#3  0x00007f1341096167 in rd_kafka_broker_serve (rkb=rkb@entry=0x7f13380062d0, abs_timeout=abs_timeout@entry=33610123534687) at rdkafka_broker.c:2533
#4  0x00007f134109664f in rd_kafka_broker_ua_idle (rkb=rkb@entry=0x7f13380062d0, timeout_ms=<optimized out>, timeout_ms@entry=-1) at rdkafka_broker.c:2595
#5  0x00007f13410971a5 in rd_kafka_broker_thread_main (arg=arg@entry=0x7f13380062d0) at rdkafka_broker.c:3547
#6  0x00007f13410e2907 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:624
#7  0x00000031ce807ee5 in start_thread () from /lib64/libpthread.so.0
#8  0x00000031ce0f4b8d in clone () from /lib64/libc.so.6

Thread 3 (Thread 0x7f134036b700 (LWP 15181)):
#0  0x00000031ce80c049 in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1  0x00007f13410e2b4d in cnd_timedwait_ms (cnd=cnd@entry=0x7f1338006e78, mtx=mtx@entry=0x7f1338006e50, timeout_ms=timeout_ms@entry=2147483647) at tinycthread.c:501
#2  0x00007f13410ad748 in rd_kafka_q_pop_serve (rkq=rkq@entry=0x7f1338006e50, timeout_ms=2147483647, timeout_ms@entry=-1, version=version@entry=0, cb_type=cb_type@entry=RD_KAFKA_Q_CB_RETURN, callback=callback@entry=0x0, opaque=opaque@entry=0x0) at rdkafka_queue.c:378
#3  0x00007f13410ad880 in rd_kafka_q_pop (rkq=rkq@entry=0x7f1338006e50, timeout_ms=timeout_ms@entry=-1, version=version@entry=0) at rdkafka_queue.c:409
#4  0x00007f1341080acc in rd_kafka_consumer_close (rk=0x7f13380042f0) at rdkafka.c:2338
<my code...>

Thread 2 (Thread 0x7f1340ac5040 (LWP 14988)):
<my code...>

Thread 1 (Thread 0x7f133fb6a700 (LWP 15182)):
#0  0x00000031ce035877 in raise () from /lib64/libc.so.6
#1  0x00000031ce036f68 in abort () from /lib64/libc.so.6
#2  0x00007f134107b835 in rd_kafka_crash (file=<optimized out>, line=<optimized out>, function=<optimized out>, rk=0x7f13380042f0, reason=<optimized out>) at rdkafka.c:3367
#3  0x00007f13410cd19e in rd_kafka_cgrp_op_serve (rk=<optimized out>, rkq=<optimized out>, rko=0x7f1334002710, cb_type=<optimized out>, opaque=0x7f1338005020) at rdkafka_cgrp.c:2808
#4  0x00007f13410ada3f in rd_kafka_q_serve (rkq=0x7f1338004d80, timeout_ms=<optimized out>, max_cnt=max_cnt@entry=0, cb_type=cb_type@entry=RD_KAFKA_Q_CB_CALLBACK, callback=callback@entry=0x0, opaque=opaque@entry=0x0) at rdkafka_queue.c:481
#5  0x00007f1341085814 in rd_kafka_thread_main (arg=arg@entry=0x7f13380042f0) at rdkafka.c:1387
#6  0x00007f13410e2907 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:624
#7  0x00000031ce807ee5 in start_thread () from /lib64/libpthread.so.0
#8  0x00000031ce0f4b8d in clone () from /lib64/libc.so.6

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

edenhill commented 6 years ago

Thanks for a great report!

Could you try reproducing this on latest master? There's been a couple of fixes related to consumer_close() behaviour and queue stability. If able to reproduce, please repro with debug=all and provide the logs.

zenonparker commented 6 years ago

Thanks for the very prompt response. I will report back with a build from latest master ASAP.

zenonparker commented 6 years ago

Reproduced the issue on the tip of master (231f59cf84c796ce1dfe0ee1d3ec99f0ec056e72), see the debug logs below. Apologies for the wall of text, I can't upload files from my corporate network at the moment!

%7|1536093615.914|MEMBERID|rdkafka#consumer-1| [thrd:app]: Group "my_group": updating member id "(not-set)" -> ""
%7|1536093615.914|BRKREASSIGN|rdkafka#consumer-1| [thrd:main]: Group "my_group" management reassigned from broker (none) to :0/internal
%7|1536093615.914|WAKEUPFD|rdkafka#consumer-1| [thrd:app]: localhost:9092/bootstrap: Enabled low-latency partition queue wake-ups
%7|1536093615.914|WAKEUPFD|rdkafka#consumer-1| [thrd:app]: localhost:9092/bootstrap: Enabled low-latency ops queue wake-ups
%7|1536093615.914|CGRPSTATE|rdkafka#consumer-1| [thrd:main]: Group "my_group" changed state init -> wait-broker (v1, join-state init)
%7|1536093615.914|BRKMAIN|rdkafka#consumer-1| [thrd::0/internal]: :0/internal: Enter main broker thread
%7|1536093615.914|BROADCAST|rdkafka#consumer-1| [thrd:main]: Broadcasting state change
%7|1536093615.914|STATE|rdkafka#consumer-1| [thrd::0/internal]: :0/internal: Broker changed state INIT -> UP
%7|1536093615.914|BROADCAST|rdkafka#consumer-1| [thrd::0/internal]: Broadcasting state change
%7|1536093615.914|BRKASSIGN|rdkafka#consumer-1| [thrd:main]: Group "my_group" management assigned to broker :0/internal
%7|1536093615.914|BROKER|rdkafka#consumer-1| [thrd:app]: localhost:9092/bootstrap: Added new broker with NodeId -1
%7|1536093615.914|INIT|rdkafka#consumer-1| [thrd:app]: librdkafka v0.11.6-PRE2-20-g231f59 (0xb05ff) rdkafka#consumer-1 initialized (debug 0xffff)
%7|1536093615.914|BRKMAIN|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Enter main broker thread
%7|1536093615.914|CONNECT|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: broker in state INIT connecting
%7|1536093615.914|METADATA|rdkafka#consumer-1| [thrd:app]: Skipping metadata refresh of 1 topic(s): no usable brokers
%7|1536093615.914|CONNECT|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connecting to ipv4#127.0.0.1:9092 (plaintext) with socket 18
%7|1536093615.915|STATE|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed state INIT -> CONNECT
%7|1536093615.915|BROADCAST|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: Broadcasting state change
%7|1536093615.915|CONNECT|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connected to ipv4#127.0.0.1:9092
%7|1536093615.915|CONNECTED|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connected (#1)
%7|1536093615.915|FEATURE|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1536093615.915|STATE|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1536093615.915|BROADCAST|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: Broadcasting state change
%7|1536093615.915|SEND|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Sent ApiVersionRequest (v0, 25 bytes @ 0, CorrId 1)
%7|1536093615.915|RECV|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Received ApiVersionResponse (v0, 264 bytes, CorrId 1, rtt 0.28ms)
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker API support:
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey Produce (0) Versions 0..6
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey Fetch (1) Versions 0..8
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey Offset (2) Versions 0..3
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey Metadata (3) Versions 0..6
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey LeaderAndIsr (4) Versions 0..1
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey StopReplica (5) Versions 0..0
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey Unknown-6? (6) Versions 0..4
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey Unknown-7? (7) Versions 0..1
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey OffsetCommit (8) Versions 0..4
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey OffsetFetch (9) Versions 0..4
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey GroupCoordinator (10) Versions 0..2
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey JoinGroup (11) Versions 0..3
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey Heartbeat (12) Versions 0..2
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey LeaveGroup (13) Versions 0..2
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey SyncGroup (14) Versions 0..2
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey DescribeGroups (15) Versions 0..2
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey ListGroups (16) Versions 0..2
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey SaslHandshake (17) Versions 0..1
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey ApiVersion (18) Versions 0..2
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey CreateTopics (19) Versions 0..3
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey DeleteTopics (20) Versions 0..2
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey DeleteRecords (21) Versions 0..1
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey InitProducerId (22) Versions 0..1
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey OffsetForLeaderEpoch (23) Versions 0..1
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey AddPartitionsToTxn (24) Versions 0..1
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey AddOffsetsToTxn (25) Versions 0..1
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey EndTxn (26) Versions 0..1
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey WriteTxnMarkers (27) Versions 0..0
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey TxnOffsetCommit (28) Versions 0..1
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey DescribeAcls (29) Versions 0..1
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey CreateAcls (30) Versions 0..1
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey DeleteAcls (31) Versions 0..1
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey DescribeConfigs (32) Versions 0..2
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey AlterConfigs (33) Versions 0..1
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey AlterReplicaLogDirs (34) Versions 0..1
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey DescribeLogDirs (35) Versions 0..1
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey SaslAuthenticate (36) Versions 0..0
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey CreatePartitions (37) Versions 0..1
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey CreateDelegationToken (38) Versions 0..1
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey RenewDelegationToken (39) Versions 0..1
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey ExpireDelegationToken (40) Versions 0..1
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey DescribeDelegationToken (41) Versions 0..1
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:   ApiKey DeleteGroups (42) Versions 0..1
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:  Feature MsgVer1: Produce (2..2) supported by broker
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:  Feature MsgVer1: Fetch (2..2) supported by broker
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Enabling feature MsgVer1
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:  Feature MsgVer2: Produce (3..3) supported by broker
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:  Feature MsgVer2: Fetch (4..4) supported by broker
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Enabling feature MsgVer2
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:  Feature ApiVersion: ApiVersion (0..0) supported by broker
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Enabling feature ApiVersion
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:  Feature BrokerGroupCoordinator: GroupCoordinator (0..0) supported by broker
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Enabling feature BrokerGroupCoordinator
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:  Feature BrokerBalancedConsumer: GroupCoordinator (0..0) supported by broker
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:  Feature BrokerBalancedConsumer: OffsetCommit (1..2) supported by broker
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:  Feature BrokerBalancedConsumer: OffsetFetch (1..1) supported by broker
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:  Feature BrokerBalancedConsumer: JoinGroup (0..0) supported by broker
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:  Feature BrokerBalancedConsumer: SyncGroup (0..0) supported by broker
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:  Feature BrokerBalancedConsumer: Heartbeat (0..0) supported by broker
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:  Feature BrokerBalancedConsumer: LeaveGroup (0..0) supported by broker
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Enabling feature BrokerBalancedConsumer
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:  Feature ThrottleTime: Produce (1..2) supported by broker
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:  Feature ThrottleTime: Fetch (1..2) supported by broker
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Enabling feature ThrottleTime
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:  Feature Sasl: JoinGroup (0..0) supported by broker
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Enabling feature Sasl
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:  Feature SaslHandshake: SaslHandshake (0..0) supported by broker
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Enabling feature SaslHandshake
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:  Feature LZ4: GroupCoordinator (0..0) supported by broker
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Enabling feature LZ4
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:  Feature OffsetTime: Offset (1..1) supported by broker
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Enabling feature OffsetTime
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:  Feature : CreateTopics (0..0) supported by broker
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap:  Feature : DeleteTopics (0..0) supported by broker
%7|1536093615.915|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Enabling feature 
%7|1536093615.915|FEATURE|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2
%7|1536093615.915|STATE|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed state APIVERSION_QUERY -> UP
%7|1536093615.915|BROADCAST|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: Broadcasting state change
%7|1536093615.915|METADATA|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Request metadata for brokers only: connected
%7|1536093615.915|SEND|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Sent MetadataRequest (v2, 25 bytes @ 0, CorrId 2)
%7|1536093615.915|RECV|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Received MetadataResponse (v2, 77 bytes, CorrId 2, rtt 0.16ms)
%7|1536093615.915|METADATA|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: ===== Received metadata: connected =====
%7|1536093615.915|METADATA|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: ClusterId: AZ-ZQCgEQmW8oulxbxD20g, ControllerId: 0
%7|1536093615.915|METADATA|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: 1 brokers, 0 topics
%7|1536093615.915|METADATA|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap:   Broker #0/1: mydev.mydomain.com:9092 NodeId 0
%7|1536093615.915|WAKEUPFD|rdkafka#consumer-1| [thrd:main]: mydev.mydomain.com:9092/0: Enabled low-latency partition queue wake-ups
%7|1536093615.915|WAKEUPFD|rdkafka#consumer-1| [thrd:main]: mydev.mydomain.com:9092/0: Enabled low-latency ops queue wake-ups
%7|1536093615.915|BROKER|rdkafka#consumer-1| [thrd:main]: mydev.mydomain.com:9092/0: Added new broker with NodeId 0
%7|1536093615.915|CLUSTERID|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: ClusterId update "" -> "AZ-ZQCgEQmW8oulxbxD20g"
%7|1536093615.915|CONTROLLERID|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: ControllerId update -1 -> 0
%7|1536093615.915|BROADCAST|rdkafka#consumer-1| [thrd:main]: Broadcasting state change
%7|1536093615.915|BRKMAIN|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Enter main broker thread
%7|1536093615.915|CONNECT|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: broker in state INIT connecting
%7|1536093615.915|BRKREASSIGN|rdkafka#consumer-1| [thrd:main]: Group "my_group" management reassigned from broker :0/internal to localhost:9092/bootstrap
%7|1536093615.915|BRKUNASSIGN|rdkafka#consumer-1| [thrd:main]: Group "my_group" management unassigned from broker handle :0/internal
%7|1536093615.915|BRKASSIGN|rdkafka#consumer-1| [thrd:main]: Group "my_group" management assigned to broker localhost:9092/bootstrap
%7|1536093615.915|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: Group "my_group": querying for coordinator: intervaled in state wait-broker
%7|1536093615.915|SEND|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Sent GroupCoordinatorRequest (v0, 30 bytes @ 0, CorrId 3)
%7|1536093615.915|CONNECT|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Connecting to ipv4#127.0.0.1:9092 (plaintext) with socket 21
%7|1536093615.915|STATE|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Broker changed state INIT -> CONNECT
%7|1536093615.916|BROADCAST|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: Broadcasting state change
%7|1536093615.916|CONNECT|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Connected to ipv4#127.0.0.1:9092
%7|1536093615.916|CONNECTED|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Connected (#1)
%7|1536093615.916|FEATURE|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1536093615.916|STATE|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1536093615.916|BROADCAST|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: Broadcasting state change
%7|1536093615.916|SEND|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Sent ApiVersionRequest (v0, 25 bytes @ 0, CorrId 1)
%7|1536093615.916|RECV|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Received GroupCoordinatorResponse (v0, 41 bytes, CorrId 3, rtt 0.26ms)
%7|1536093615.916|CGRPCOORD|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: Group "my_group" coordinator is mydev.mydomain.com:9092 id 0
%7|1536093615.916|CGRPCOORD|rdkafka#consumer-1| [thrd:main]: Group "my_group" changing coordinator -1 -> 0
%7|1536093615.916|BRKREASSIGN|rdkafka#consumer-1| [thrd:main]: Group "my_group" management reassigned from broker localhost:9092/bootstrap to mydev.mydomain.com:9092/0
%7|1536093615.916|BRKUNASSIGN|rdkafka#consumer-1| [thrd:main]: Group "my_group" management unassigned from broker handle localhost:9092/bootstrap
%7|1536093615.916|BRKASSIGN|rdkafka#consumer-1| [thrd:main]: Group "my_group" management assigned to broker mydev.mydomain.com:9092/0
%7|1536093615.916|CGRPSTATE|rdkafka#consumer-1| [thrd:main]: Group "my_group" changed state wait-broker -> wait-broker-transport (v1, join-state init)
%7|1536093615.916|BROADCAST|rdkafka#consumer-1| [thrd:main]: Broadcasting state change
%7|1536093615.916|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: Group "my_group": querying for coordinator: intervaled in state wait-broker-transport
%7|1536093615.916|SEND|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Sent GroupCoordinatorRequest (v0, 30 bytes @ 0, CorrId 4)
%7|1536093615.916|RECV|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Received ApiVersionResponse (v0, 264 bytes, CorrId 1, rtt 0.20ms)
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Broker API support:
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey Produce (0) Versions 0..6
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey Fetch (1) Versions 0..8
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey Offset (2) Versions 0..3
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey Metadata (3) Versions 0..6
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey LeaderAndIsr (4) Versions 0..1
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey StopReplica (5) Versions 0..0
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey Unknown-6? (6) Versions 0..4
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey Unknown-7? (7) Versions 0..1
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey OffsetCommit (8) Versions 0..4
%7|1536093615.916|RECV|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Received GroupCoordinatorResponse (v0, 41 bytes, CorrId 4, rtt 0.19ms)
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey OffsetFetch (9) Versions 0..4
%7|1536093615.916|CGRPCOORD|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: Group "my_group" coordinator is mydev.mydomain.com:9092 id 0
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey GroupCoordinator (10) Versions 0..2
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey JoinGroup (11) Versions 0..3
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey Heartbeat (12) Versions 0..2
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey LeaveGroup (13) Versions 0..2
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey SyncGroup (14) Versions 0..2
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey DescribeGroups (15) Versions 0..2
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey ListGroups (16) Versions 0..2
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey SaslHandshake (17) Versions 0..1
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey ApiVersion (18) Versions 0..2
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey CreateTopics (19) Versions 0..3
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey DeleteTopics (20) Versions 0..2
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey DeleteRecords (21) Versions 0..1
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey InitProducerId (22) Versions 0..1
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey OffsetForLeaderEpoch (23) Versions 0..1
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey AddPartitionsToTxn (24) Versions 0..1
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey AddOffsetsToTxn (25) Versions 0..1
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey EndTxn (26) Versions 0..1
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey WriteTxnMarkers (27) Versions 0..0
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey TxnOffsetCommit (28) Versions 0..1
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey DescribeAcls (29) Versions 0..1
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey CreateAcls (30) Versions 0..1
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey DeleteAcls (31) Versions 0..1
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey DescribeConfigs (32) Versions 0..2
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey AlterConfigs (33) Versions 0..1
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey AlterReplicaLogDirs (34) Versions 0..1
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey DescribeLogDirs (35) Versions 0..1
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey SaslAuthenticate (36) Versions 0..0
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey CreatePartitions (37) Versions 0..1
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey CreateDelegationToken (38) Versions 0..1
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey RenewDelegationToken (39) Versions 0..1
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey ExpireDelegationToken (40) Versions 0..1
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey DescribeDelegationToken (41) Versions 0..1
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:   ApiKey DeleteGroups (42) Versions 0..1
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:  Feature MsgVer1: Produce (2..2) supported by broker
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:  Feature MsgVer1: Fetch (2..2) supported by broker
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Enabling feature MsgVer1
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:  Feature MsgVer2: Produce (3..3) supported by broker
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:  Feature MsgVer2: Fetch (4..4) supported by broker
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Enabling feature MsgVer2
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:  Feature ApiVersion: ApiVersion (0..0) supported by broker
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Enabling feature ApiVersion
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:  Feature BrokerGroupCoordinator: GroupCoordinator (0..0) supported by broker
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Enabling feature BrokerGroupCoordinator
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:  Feature BrokerBalancedConsumer: GroupCoordinator (0..0) supported by broker
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:  Feature BrokerBalancedConsumer: OffsetCommit (1..2) supported by broker
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:  Feature BrokerBalancedConsumer: OffsetFetch (1..1) supported by broker
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:  Feature BrokerBalancedConsumer: JoinGroup (0..0) supported by broker
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:  Feature BrokerBalancedConsumer: SyncGroup (0..0) supported by broker
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:  Feature BrokerBalancedConsumer: Heartbeat (0..0) supported by broker
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:  Feature BrokerBalancedConsumer: LeaveGroup (0..0) supported by broker
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Enabling feature BrokerBalancedConsumer
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:  Feature ThrottleTime: Produce (1..2) supported by broker
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:  Feature ThrottleTime: Fetch (1..2) supported by broker
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Enabling feature ThrottleTime
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:  Feature Sasl: JoinGroup (0..0) supported by broker
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Enabling feature Sasl
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:  Feature SaslHandshake: SaslHandshake (0..0) supported by broker
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Enabling feature SaslHandshake
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:  Feature LZ4: GroupCoordinator (0..0) supported by broker
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Enabling feature LZ4
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:  Feature OffsetTime: Offset (1..1) supported by broker
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Enabling feature OffsetTime
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:  Feature : CreateTopics (0..0) supported by broker
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0:  Feature : DeleteTopics (0..0) supported by broker
%7|1536093615.916|APIVERSION|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Enabling feature 
%7|1536093615.916|FEATURE|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2
%7|1536093615.916|STATE|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Broker changed state APIVERSION_QUERY -> UP
%7|1536093615.916|BROADCAST|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: Broadcasting state change
%7|1536093615.916|METADATA|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Request metadata for brokers only: connected
%7|1536093615.916|SEND|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Sent MetadataRequest (v2, 25 bytes @ 0, CorrId 2)
%7|1536093615.916|RECV|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Received MetadataResponse (v2, 77 bytes, CorrId 2, rtt 0.10ms)
%7|1536093615.916|METADATA|rdkafka#consumer-1| [thrd:main]: mydev.mydomain.com:9092/0: ===== Received metadata: connected =====
%7|1536093615.916|METADATA|rdkafka#consumer-1| [thrd:main]: mydev.mydomain.com:9092/0: ClusterId: AZ-ZQCgEQmW8oulxbxD20g, ControllerId: 0
%7|1536093615.916|METADATA|rdkafka#consumer-1| [thrd:main]: mydev.mydomain.com:9092/0: 1 brokers, 0 topics
%7|1536093615.916|METADATA|rdkafka#consumer-1| [thrd:main]: mydev.mydomain.com:9092/0:   Broker #0/1: mydev.mydomain.com:9092 NodeId 0
%7|1536093615.916|CGRPSTATE|rdkafka#consumer-1| [thrd:main]: Group "my_group" changed state wait-broker-transport -> up (v1, join-state init)
%7|1536093615.916|BROADCAST|rdkafka#consumer-1| [thrd:main]: Broadcasting state change
%7|1536093616.415|METADATA|rdkafka#consumer-1| [thrd:app]: Requesting metadata for 1/1 topics: query partition leaders
%7|1536093616.415|METADATA|rdkafka#consumer-1| [thrd:app]: localhost:9092/bootstrap: Request metadata for 1 topic(s): query partition leaders
%7|1536093616.415|SEND|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Sent MetadataRequest (v2, 57 bytes @ 0, CorrId 5)
%7|1536093616.415|RECV|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Received MetadataResponse (v2, 142 bytes, CorrId 5, rtt 0.40ms)
%7|1536093616.415|METADATA|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: ===== Received metadata (for 1 requested topics): query partition leaders =====
%7|1536093616.415|METADATA|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: ClusterId: AZ-ZQCgEQmW8oulxbxD20g, ControllerId: 0
%7|1536093616.415|METADATA|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: 1 brokers, 1 topics
%7|1536093616.415|METADATA|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap:   Broker #0/1: mydev.mydomain.com:9092 NodeId 0
%7|1536093616.415|METADATA|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap:   Topic #0/1: mytopic-test-1 with 1 partitions
%7|1536093616.415|METADATA|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: 1/1 requested topic(s) seen in metadata
%7|1536093616.415|OFFSET|rdkafka#consumer-1| [thrd:app]: mydev.mydomain.com:9092/0: OffsetRequest (v0, opv 0) for 1 topic(s) and 1 partition(s)
%7|1536093616.415|OFFSET|rdkafka#consumer-1| [thrd:app]: mydev.mydomain.com:9092/0: OffsetRequest (v0, opv 0) for 1 topic(s) and 1 partition(s)
%7|1536093616.415|SEND|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Sent OffsetRequest (v0, 81 bytes @ 0, CorrId 3)
%7|1536093616.415|SEND|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Sent OffsetRequest (v0, 81 bytes @ 0, CorrId 4)
%7|1536093616.416|RECV|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Received OffsetResponse (v0, 58 bytes, CorrId 3, rtt 0.50ms)
%7|1536093616.416|RECV|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Received OffsetResponse (v0, 58 bytes, CorrId 4, rtt 0.49ms)
%7|1536093616.416|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "my_group" received op ASSIGN (v0) in state up (join state init, v1 vs 0)
%7|1536093616.416|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "my_group": new assignment of 1 partition(s) in join state init
%7|1536093616.416|TOPIC|rdkafka#consumer-1| [thrd:main]: New local topic: mytopic-test-1
%7|1536093616.416|TOPPARNEW|rdkafka#consumer-1| [thrd:main]: NEW mytopic-test-1 [-1] 0x7f0ee40019e0 (at rd_kafka_topic_new0:362)
%7|1536093616.416|STATE|rdkafka#consumer-1| [thrd:main]: Topic mytopic-test-1 changed state unknown -> exists
%7|1536093616.416|PARTCNT|rdkafka#consumer-1| [thrd:main]: Topic mytopic-test-1 partition count changed from 0 to 1
%7|1536093616.416|TOPPARNEW|rdkafka#consumer-1| [thrd:main]: NEW mytopic-test-1 [0] 0x7f0ee4001eb0 (at rd_kafka_topic_partition_cnt_update:589)
%7|1536093616.416|METADATA|rdkafka#consumer-1| [thrd:main]:   Topic mytopic-test-1 partition 0 Leader 0
%7|1536093616.416|BRKDELGT|rdkafka#consumer-1| [thrd:main]: mytopic-test-1 [0]: delegate to broker mydev.mydomain.com:9092/0 (rktp 0x7f0ee4001eb0, term 0, ref 2, remove 0)
%7|1536093616.416|BRKDELGT|rdkafka#consumer-1| [thrd:main]: mytopic-test-1 [0]: broker mydev.mydomain.com:9092/0 is now leader for partition with 0 messages (0 bytes) queued
%7|1536093616.416|BRKMIGR|rdkafka#consumer-1| [thrd:main]: Migrating topic mytopic-test-1 [0] 0x7f0ee4001eb0 from (none) to mydev.mydomain.com:9092/0 (sending PARTITION_JOIN to mydev.mydomain.com:9092/0)
%7|1536093616.416|DESP|rdkafka#consumer-1| [thrd:main]: Setting topic mytopic-test-1 [0] partition as desired
%7|1536093616.416|BARRIER|rdkafka#consumer-1| [thrd:main]: Group "my_group": rd_kafka_cgrp_assign:2301: new version barrier v2
%7|1536093616.416|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "my_group": assigning 1 partition(s) in join state init
%7|1536093616.416|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "my_group" changed join state init -> assigned (v2, state up)
%7|1536093616.416|BARRIER|rdkafka#consumer-1| [thrd:main]: Group "my_group": rd_kafka_cgrp_partitions_fetch_start0:1641: new version barrier v3
%7|1536093616.416|FETCHSTART|rdkafka#consumer-1| [thrd:main]: Group "my_group": starting fetchers for 1 assigned partition(s) in join-state assigned (usable_offsets=no, v3, line 2346)
%7|1536093616.416|FETCHSTART|rdkafka#consumer-1| [thrd:main]: List with 1 partition(s):
%7|1536093616.416|FETCHSTART|rdkafka#consumer-1| [thrd:main]:  mytopic-test-1 [0] offset 1
%7|1536093616.416|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "my_group" changed join state assigned -> started (v3, state up)
%7|1536093616.416|BARRIER|rdkafka#consumer-1| [thrd:main]: mytopic-test-1 [0]: rd_kafka_toppar_op_fetch_start:2193: new version barrier v2
%7|1536093616.416|CONSUMER|rdkafka#consumer-1| [thrd:main]: Start consuming mytopic-test-1 [0] at offset 1 (v2)
%7|1536093616.416|OP|rdkafka#consumer-1| [thrd:main]: mytopic-test-1 [0] received op FETCH_START (v2) in fetch-state none (opv1)
%7|1536093616.416|FETCH|rdkafka#consumer-1| [thrd:main]: Start fetch for mytopic-test-1 [0] in state none at offset 1 (v2)
%7|1536093616.416|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition mytopic-test-1 [0] changed fetch state none -> active
%7|1536093616.416|FETCH|rdkafka#consumer-1| [thrd:main]: Partition mytopic-test-1 [0] start fetching at offset 1
%7|1536093616.416|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "my_group" received op PARTITION_JOIN in state up (join state started, v3) for mytopic-test-1 [0]
%7|1536093616.416|PARTADD|rdkafka#consumer-1| [thrd:main]: Group "my_group": add mytopic-test-1 [0]
%7|1536093616.416|TOPBRK|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Topic mytopic-test-1 [0]: joining broker (rktp 0x7f0ee4001eb0)
%7|1536093616.416|BROADCAST|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: Broadcasting state change
%7|1536093617.417|FETCHDEC|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: Topic mytopic-test-1 [0]: fetch decide: updating to version 2 (was 0) at offset 1 (was 0)
%7|1536093617.417|FETCH|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Topic mytopic-test-1 [0] in state active at offset 1 (0/100000 msgs, 0/1048576 kb queued, opv 2) is fetchable: 
%7|1536093617.417|FETCHADD|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Added mytopic-test-1 [0] to fetch list (1 entries, opv 2)
%7|1536093617.417|FETCH|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Fetch topic mytopic-test-1 [0] at offset 1 (v2)
%7|1536093617.417|FETCH|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Fetch 1/1/1 toppar(s)
%7|1536093617.417|SEND|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Sent FetchRequest (v4, 94 bytes @ 0, CorrId 5)
%7|1536093617.418|RECV|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Received FetchResponse (v4, 174 bytes, CorrId 5, rtt 0.43ms)
%7|1536093617.418|FETCH|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Topic mytopic-test-1 [0] MessageSet size 100, error "Success", MaxOffset 2, Ver 2/2
%7|1536093617.418|CONSUME|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Enqueue 1 message(s) (32 bytes, 1 ops) on mytopic-test-1 [0] fetch queue (qlen 1, v2, last_offset 1)
%7|1536093617.418|FETCH|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Fetch topic mytopic-test-1 [0] at offset 2 (v2)
%7|1536093617.418|FETCH|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Fetch 1/1/1 toppar(s)
%7|1536093617.418|SEND|rdkafka#consumer-1| [thrd:mydev.mydomain.com:9092/0]: mydev.mydomain.com:9092/0: Sent FetchRequest (v4, 94 bytes @ 0, CorrId 6)
%7|1536093617.418|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "my_group" received op ASSIGN (v0) in state up (join state started, v3 vs 0)
%7|1536093617.418|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "my_group": new assignment of 1 partition(s) in join state started
%7|1536093617.418|BARRIER|rdkafka#consumer-1| [thrd:main]: Group "my_group": rd_kafka_cgrp_assign:2301: new version barrier v4
%7|1536093617.418|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "my_group" changed join state started -> wait-unassign (v4, state up)
%7|1536093617.418|BARRIER|rdkafka#consumer-1| [thrd:main]: Group "my_group": rd_kafka_cgrp_unassign:2216: new version barrier v5
%7|1536093617.418|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Group "my_group": unassigning 1 partition(s) (v5)
%7|1536093617.418|BARRIER|rdkafka#consumer-1| [thrd:main]: mytopic-test-1 [0]: rd_kafka_toppar_op_fetch_stop:2220: new version barrier v3
%7|1536093617.418|CONSUMER|rdkafka#consumer-1| [thrd:main]: Stop consuming mytopic-test-1 [0] (v3)
%7|1536093617.418|DESP|rdkafka#consumer-1| [thrd:main]: Removing (un)desired topic mytopic-test-1 [0]
%7|1536093617.418|RESUME|rdkafka#consumer-1| [thrd:main]: Library resuming 1 partition(s)
%7|1536093617.418|BARRIER|rdkafka#consumer-1| [thrd:main]: mytopic-test-1 [0]: rd_kafka_toppar_op_pause_resume:2278: new version barrier v4
%7|1536093617.418|RESUME|rdkafka#consumer-1| [thrd:main]: Resume mytopic-test-1 [0] (v4)
%7|1536093617.418|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Unassign not done yet (1 wait_unassign, 1 assigned, 0 wait commit): unassign
%7|1536093617.418|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "my_group": assigning 1 partition(s) in join state wait-unassign
%7|1536093617.418|DESIRED|rdkafka#consumer-1| [thrd:main]: mytopic-test-1 [0]: adding to DESIRED list
%7|1536093617.418|OP|rdkafka#consumer-1| [thrd:main]: mytopic-test-1 [0] received op FETCH_STOP (v3) in fetch-state active (opv2)
%7|1536093617.418|FETCH|rdkafka#consumer-1| [thrd:main]: Stopping fetch for mytopic-test-1 [0] in state active (v3)
%7|1536093617.418|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition mytopic-test-1 [0] changed fetch state active -> stopping
%7|1536093617.418|STORETERM|rdkafka#consumer-1| [thrd:main]: mytopic-test-1 [0]: offset store terminating
%7|1536093617.418|CLOSE|rdkafka#consumer-1| [thrd:app]: Closing consumer
%7|1536093617.418|CLOSE|rdkafka#consumer-1| [thrd:app]: Waiting for close events
%7|1536093617.418|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition mytopic-test-1 [0] changed fetch state stopping -> stopped
%7|1536093617.418|OP|rdkafka#consumer-1| [thrd:main]: mytopic-test-1 [0] received op PAUSE (v4) in fetch-state stopped (opv3)
%7|1536093617.418|RESUME|rdkafka#consumer-1| [thrd:main]: Not resuming stopped mytopic-test-1 [0]: at offset 1 (state stopped, v4)
%7|1536093617.418|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "my_group" received op TERMINATE (v0) in state up (join state wait-unassign, v5 vs 0)
%7|1536093617.418|CGRPTERM|rdkafka#consumer-1| [thrd:main]: Terminating group "my_group" in state up with 1 partition(s)
%7|1536093617.418|BARRIER|rdkafka#consumer-1| [thrd:main]: Group "my_group": rd_kafka_cgrp_unassign:2216: new version barrier v6
%7|1536093617.418|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Group "my_group": unassigning 1 partition(s) (v6)
%7|1536093617.418|BARRIER|rdkafka#consumer-1| [thrd:main]: mytopic-test-1 [0]: rd_kafka_toppar_op_fetch_stop:2220: new version barrier v5
%7|1536093617.418|CONSUMER|rdkafka#consumer-1| [thrd:main]: Stop consuming mytopic-test-1 [0] (v5)
%7|1536093617.418|DESP|rdkafka#consumer-1| [thrd:main]: Removing (un)desired topic mytopic-test-1 [0]
%7|1536093617.418|RESUME|rdkafka#consumer-1| [thrd:main]: Library resuming 1 partition(s)
%7|1536093617.418|BARRIER|rdkafka#consumer-1| [thrd:main]: mytopic-test-1 [0]: rd_kafka_toppar_op_pause_resume:2278: new version barrier v6
%7|1536093617.418|RESUME|rdkafka#consumer-1| [thrd:main]: Resume mytopic-test-1 [0] (v6)
%7|1536093617.418|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Unassign not done yet (2 wait_unassign, 1 assigned, 0 wait commit): unassign
%7|1536093617.418|CGRPTERM|rdkafka#consumer-1| [thrd:main]: Group "my_group": waiting for 1 toppar(s), 2 unassignment(s), 0 commit(s) (state up, join-state wait-unassign) before terminating
%7|1536093617.418|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "my_group" received op PARTITION_LEAVE in state up (join state wait-unassign, v6) for mytopic-test-1 [0]
%7|1536093617.418|PARTDEL|rdkafka#consumer-1| [thrd:main]: Group "my_group": delete mytopic-test-1 [0]
%7|1536093617.418|CGRPTERM|rdkafka#consumer-1| [thrd:main]: Group "my_group": waiting for 0 toppar(s), 2 unassignment(s), 0 commit(s) (state up, join-state wait-unassign) before terminating
%7|1536093617.418|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "my_group" received op REPLY:FETCH_STOP in state up (join state wait-unassign, v6) for mytopic-test-1 [0]
%7|1536093617.418|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Unassign not done yet (1 wait_unassign, 0 assigned, 0 wait commit): FETCH_STOP done
%7|1536093617.418|CGRPTERM|rdkafka#consumer-1| [thrd:main]: Group "my_group": waiting for 0 toppar(s), 1 unassignment(s), 0 commit(s) (state up, join-state wait-unassign) before terminating
%7|1536093617.418|OP|rdkafka#consumer-1| [thrd:main]: mytopic-test-1 [0] received op FETCH_STOP (v5) in fetch-state stopped (opv4)
%7|1536093617.418|FETCH|rdkafka#consumer-1| [thrd:main]: Stopping fetch for mytopic-test-1 [0] in state stopped (v5)
%7|1536093617.418|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition mytopic-test-1 [0] changed fetch state stopped -> stopping
%7|1536093617.418|STORETERM|rdkafka#consumer-1| [thrd:main]: mytopic-test-1 [0]: offset store terminating
%7|1536093617.418|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition mytopic-test-1 [0] changed fetch state stopping -> stopped
%7|1536093617.418|OP|rdkafka#consumer-1| [thrd:main]: mytopic-test-1 [0] received op PAUSE (v6) in fetch-state stopped (opv5)
%7|1536093617.418|RESUME|rdkafka#consumer-1| [thrd:main]: Not resuming stopped mytopic-test-1 [0]: at offset 1 (state stopped, v6)
%7|1536093617.418|CGRPTERM|rdkafka#consumer-1| [thrd:main]: Group "my_group": waiting for 0 toppar(s), 1 unassignment(s), 0 commit(s) (state up, join-state wait-unassign) before terminating
%7|1536093617.418|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "my_group" received op REPLY:FETCH_STOP in state up (join state wait-unassign, v6) for mytopic-test-1 [0]
*** rdkafka_cgrp.c:2818:rd_kafka_cgrp_op_serve: assert: rktp->rktp_assigned ***
rd_kafka_t 0x7f0ee80042f0: rdkafka#consumer-1
 producer.msg_cnt 0 (0 bytes)
 rk_rep reply queue: 0 ops
 brokers:
 rd_kafka_broker_t 0x7f0ee8005770: :0/internal NodeId -1 in state UP (for 1.504s)
  refcnt 2
  outbuf_cnt: 0 waitresp_cnt: 0
  0 messages sent, 0 bytes, 0 errors, 0 timeouts
  0 messages received, 0 bytes, 0 errors
  0 messageset transmissions were retried
  0 toppars:
 rd_kafka_broker_t 0x7f0ee8006290: localhost:9092/bootstrap NodeId -1 in state UP (for 1.503s)
  refcnt 2
  outbuf_cnt: 0 waitresp_cnt: 0
  5 messages sent, 167 bytes, 0 errors, 0 timeouts
  5 messages received, 605 bytes, 0 errors
  0 messageset transmissions were retried
  0 toppars:
 rd_kafka_broker_t 0x7f0ee4000a50: mydev.mydomain.com:9092/0 NodeId 0 in state UP (for 1.502s)
  refcnt 5
  outbuf_cnt: 0 waitresp_cnt: 1
  6 messages sent, 400 bytes, 0 errors, 0 timeouts
  5 messages received, 671 bytes, 0 errors
  0 messageset transmissions were retried
  1 toppars:
   mytopic-test-1 [0] leader mydev.mydomain.com:9092/0
    refcnt 4
    msgq:      0 messages
    xmit_msgq: 0 messages
    total:     0 messages, 0 bytes
 cgrp:
  my_group in state up, flags 0x1
   coord_id 0, managing broker mydev.mydomain.com:9092/0
  toppars:
 topics:
  mytopic-test-1 with 1 partitions, state exists, refcnt 2
   mytopic-test-1 [-1] leader none
    refcnt 1
    msgq:      0 messages
    xmit_msgq: 0 messages
    total:     0 messages, 0 bytes
   desired partitions:

Metadata cache with 1 entries:
  mytopic-test-1 (inserted 1002ms ago, expires in 898997ms, 1 partition(s), valid)
edenhill commented 6 years ago

This looks like a race condition between your last assign() (which does an unassign first) and the unassign triggered by consumer_close(). Tried to reproduce this locally with no success, will try with increased broker latency and see if that helps.

I bet that if you add a sleep(1) after your last assign() it stops crashing, can you verify? Why are you doing the assign() just prior to close anyway?

zenonparker commented 6 years ago

During construction of my consumer class it's querying offsets and assigning itself an offset (based upon configuration). If the owner of this class immediately destroys it, it calls close(). I wouldn't say it's a typical usage of this class, but definitely possible and came up during my testing. I'll try adding a small sleep.

zenonparker commented 6 years ago

I wasn't able to reproduce it with a sleep_for(1ms) after the last assign() so I suppose your theory is correct. :)

Doesn't seem like the most elegant solution but I suppose it will suffice for now. Is the race condition you mention a fairly straightforward one to solve?

Thank you very much for your help!

edenhill commented 6 years ago

Thank you! I'll try to get a fix in before the next release candidate.

johnhckuo commented 4 years ago

@edenhill Hi, is this issue resolved? because I also encountered this problem at version 1.2.1-r0.

I've written a Kafka consumer function to first retrieve the message from the last offset it read, then start consuming the next offset, then commit the current offset.

Then I run a performance test, which will call this function 60 times per second, then it shows this error when assign is called, and my application got crashed:

*** rdkafka_cgrp.c:2961:rd_kafka_cgrp_op_serve: assert: rktp->rktp_assigned ***
rd_kafka_t 0x2605000: rdkafka#consumer-5
 producer.msg_cnt 0 (0 bytes)
 rk_rep reply queue: 61 ops
 brokers:
 rd_kafka_broker_t 0x26072e0: :0/internal NodeId -1 in state INIT (for 31.797s)
  refcnt 2
  outbuf_cnt: 0 waitresp_cnt: 0
  0 messages sent, 0 bytes, 0 errors, 0 timeouts
  0 messages received, 0 bytes, 0 errors
  0 messageset transmissions were retried
  0 toppars:
 rd_kafka_broker_t 0x2607f60: kafka-1:9091/1 NodeId 1 in state UP (for 30.787s)
  refcnt 6
  outbuf_cnt: 0 waitresp_cnt: 1
  18 messages sent, 1329 bytes, 0 errors, 0 timeouts
  17 messages received, 389654 bytes, 0 errors
  0 messageset transmissions were retried
  1 toppars:
   topic1 [0] leader kafka-1:9091/1
    refcnt 68
    msgq:      0 messages
    xmit_msgq: 0 messages
    total:     0 messages, 0 bytes
 rd_kafka_broker_t 0x26065a0: GroupCoordinator NodeId -1 in state UP (for 29.766s)
  refcnt 3
  outbuf_cnt: 0 waitresp_cnt: 0
  10 messages sent, 666 bytes, 0 errors, 0 timeouts
  10 messages received, 843 bytes, 0 errors
  0 messageset transmissions were retried
  0 toppars:
 cgrp:
  G2 in state up, flags 0x0
   coord_id 1, broker kafka-1:9091/1
  toppars:
 topics:
  topic1 with 1 partitions, state exists, refcnt 3
   topic1 [-1] leader none
    refcnt 1
    msgq:      0 messages
    xmit_msgq: 0 messages
    total:     0 messages, 0 bytes
service1    |
Metadata cache with 1 entries:
  topic1 (inserted 552ms ago, expires in 899447ms, 1 partition(s), valid)

I saw that you said adding time.Sleep(time.Duration(1) * time.Second) after the assign() part can solve the issue, but I don't understand why this will solve this issue

here is my code:

/*
        retrieving lowest/highest offset in the partition
    */
    lowOffset, highOffset, _ := c.QueryWatermarkOffsets(topic, partition, -1)

    log.Printf("The highest offset: %v", highOffset)
    log.Printf("The lowest offset: %v", lowOffset)

    if lowOffset == highOffset {
        log.Printf("Topic %v is empty", topic)
        return
    }

    part, _ := c.Committed(kafka.TopicPartitions{{Topic: &topic, Partition: partition}}, -1)
    if len(part) == 0 {
        startOffset = lowOffset
    } 

    var partitions kafka.TopicPartitions

    visitOffset, err := kafka.NewOffset(fmt.Sprintf("%v", startOffset))

    partitions = append(partitions, kafka.TopicPartition{
        Topic:     &topic,
        Partition: partition,
        Offset:    visitOffset,
        Error:     err,
    })

    err = c.Assign(partitions)
    if err != nil {
        log.Fatalf("Assign failed: %s", err)
        return
    }
    defer c.Unassign()
    assignment, _ := c.Assignment()
    log.Printf("Assignment %v\n", assignment)
    timeout := 6000
    ev := c.Poll(timeout)
nano-zhang commented 4 years ago

@johnhckuo hi, do u have solution to this issue? adding sleep cannot fix it.

palanmihir commented 3 years ago

@edenhill I am facing the same issue with the Confluent Kafka dotnet version 1.0.0. Is there anyway to fix this ?

edenhill commented 3 years ago

Upgrade to the latest version.

tors 18 feb. 2021 kl. 17:21 skrev palanmihir notifications@github.com:

@edenhill https://github.com/edenhill I am facing the same issue with the Confluent Kafka dotnet version 1.0.0. Is there anyway to fix this ?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/edenhill/librdkafka/issues/1983#issuecomment-781462157, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAEAFPQGGFPJE6HOVYPWDUTS7U5AXANCNFSM4FTGLEGA .

palanmihir commented 3 years ago

Upgrade to the latest version. tors 18 feb. 2021 kl. 17:21 skrev palanmihir notifications@github.com: @edenhill https://github.com/edenhill I am facing the same issue with the Confluent Kafka dotnet version 1.0.0. Is there anyway to fix this ? — You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub <#1983 (comment)>, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAEAFPQGGFPJE6HOVYPWDUTS7U5AXANCNFSM4FTGLEGA .

Upgrading to the latest vesion requires lot of things to be tested as we have created some implementation over these libraries. It would require lot of changes for the same and do a testing for all the apps that use them.

This error is not frequest, but it comes in some application only and at any random time.

edenhill commented 3 years ago

We follow semver, there are no breaking changes between non-major versions, an upgrade should be straight forward.