Closed heketong closed 6 years ago
In worst case you may need to wait for up to session.timeout.ms
(def 30 seconds) to get an assignment.
Your debug output only covers a couple of seconds.
Please also share your (relevant) code.
add something: i got time out(timeout:one seconds) when i call rd_kafka_consumer_poll. and in my case it consume two topic in cycle,when one ie EOF,it will consume another one. the first one always be ok(call rebalance after rd_kafka_consumer_poll), but the second always be not ok(do not call rebalance after rd_kafka_consumer_poll)
but why i need to wait for up to session.timeout.ms(def 30 seconds)? you mean librdkafka lost the connect with kafka broker?
If the group exists and has had a member recently fall out it may take up to session.timeout.ms to rebalance the group.
but when i start two client to consume one topic(with 2 partition),when i kill one client, the other client's log shows me that it will rebalance the group immediately. why?
If a client is shut down in a controlled fashion it will send a LeaveGroup request which immediately triggers a rebalance.
Read the FAQ first: https://github.com/edenhill/librdkafka/wiki/FAQ
Description
it don't call Rebalance after rd_kafka_consumer_poll,so i can't consume a thing from one topic. the librdkafka debug logs are below. This question has been bothering me for a long time,please help me out,tks!
How to reproduce
<your steps how to reproduce goes here, or remove section if not relevant>
IMPORTANT: Always try to reproduce the issue on the latest released version (see https://github.com/edenhill/librdkafka/releases), if it can't be reproduced on the latest version the issue has been fixed.
Checklist
IMPORTANT: We will close issues where the checklist has not been completed.
Please provide the following information:
<REPLACE with e.g., v0.10.5 or a git sha. NOT "latest" or "current">
<REPLACE with e.g., 0.10.2.3>
Global config
builtin.features = gzip,snappy,sasl,regex,lz4,sasl_plain,plugins client.id = rdkafka message.max.bytes = 1000000 message.copy.max.bytes = 65535 receive.message.max.bytes = 100000000 max.in.flight.requests.per.connection = 1000000 metadata.request.timeout.ms = 60000 topic.metadata.refresh.interval.ms = 300000 metadata.max.age.ms = -1 topic.metadata.refresh.fast.interval.ms = 250 topic.metadata.refresh.fast.cnt = 10 topic.metadata.refresh.sparse = true debug = topic,cgrp,fetch socket.timeout.ms = 60000 socket.blocking.max.ms = 1000 socket.send.buffer.bytes = 0 socket.receive.buffer.bytes = 0 socket.keepalive.enable = false socket.nagle.disable = false socket.max.fails = 1 broker.address.ttl = 1000 broker.address.family = any reconnect.backoff.jitter.ms = 500 statistics.interval.ms = 0 enabled_events = 0 log_cb = 0x7f47f5bdb5e0 log_level = 6 log.queue = false log.thread.name = true log.connection.close = true socket_cb = 0x7f47f5be7ca0 open_cb = 0x7f47f5bfc1d0 default_topic_conf = 0xcb6330 internal.termination.signal = 0 api.version.request = true api.version.request.timeout.ms = 10000 api.version.fallback.ms = 1200000 broker.version.fallback = 0.9.0 security.protocol = plaintext sasl.mechanisms = GSSAPI sasl.kerberos.service.name = kafka sasl.kerberos.principal = kafkaclient sasl.kerberos.kinit.cmd = kinit -S "%{sasl.kerberos.service.name}/%{broker.name}" -k -t "%{sasl.kerberos.keytab}" %{sasl.kerberos.principal} sasl.kerberos.min.time.before.relogin = 60000 group.id = CBE_DSF_VOICE01_20_9520 partition.assignment.strategy = range,roundrobin session.timeout.ms = 30000 heartbeat.interval.ms = 1000 group.protocol.type = consumer coordinator.query.interval.ms = 600000 enable.auto.commit = true auto.commit.interval.ms = 60 enable.auto.offset.store = true queued.min.messages = 100000 queued.max.messages.kbytes = 1048576 fetch.wait.max.ms = 100 fetch.message.max.bytes = 1048576 fetch.max.bytes = 52428800 fetch.min.bytes = 1 fetch.error.backoff.ms = 500 offset.store.method = broker enable.partition.eof = true check.crcs = false queue.buffering.max.messages = 100000 queue.buffering.max.kbytes = 1048576 queue.buffering.max.ms = 0 message.send.max.retries = 2 retry.backoff.ms = 100 queue.buffering.backpressure.threshold = 10 compression.codec = none batch.num.messages = 10000 delivery.report.only.error = false
Topic config
request.required.acks = 1 request.timeout.ms = 5000 message.timeout.ms = 300000 queuing.strategy = fifo produce.offset.report = false partitioner = consistent_random compression.codec = inherit auto.commit.enable = false auto.commit.interval.ms = 60 auto.offset.reset = smallest offset.store.path = . offset.store.sync.interval.ms = -1 offset.store.method = broker consume.callback.max.messages = 0
[ SUSE Linux Enterprise Server 12 SP1] Operating system:
[ ] Provide logs (with
debug=..
as necessary) from librdkafka %7|1526545183.133|MEMBERID|rdkafka#consumer-2| [thrd:app]: Group "CBE_DSF_VOICE01_20_9520": updating member id "(not-set)" -> "" %7|1526545183.133|BRKREASSIGN|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520" management reassigned from broker (none) to :0/internal %7|1526545183.133|CGRPSTATE|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520" changed state init -> wait-broker (v1, join-state init) %7|1526545183.133|BRKASSIGN|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520" management assigned to broker :0/internal %7|1526545183.134|CGRPOP|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520" received op SUBSCRIBE (v0) in state wait-broker (join state init, v1 vs 0) %7|1526545183.134|SUBSCRIBE|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520": subscribe to new subscription of 1 topics (join state init) %7|1526545183.134|UNSUBSCRIBE|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520": unsubscribe from current unset subscription of 0 topics (leave group=no, join state init, v1) %7|1526545183.134|GRPLEADER|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520": resetting group leader info: unsubscribe %7|1526545183.134|CGRPJOINSTATE|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520" changed join state init -> wait-unassign (v1, state wait-broker) %7|1526545183.134|UNASSIGN|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520": unassign done in state wait-broker (join state wait-unassign): without new assignment: unassign (no previous assignment) %7|1526545183.134|CGRPJOINSTATE|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520" changed join state wait-unassign -> init (v1, state wait-broker) %7|1526545183.134|CGRPQUERY|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520": no broker available for coordinator query: intervaled in state wait-broker %7|1526545183.140|SUBSCRIPTION|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520": no topics in metadata matched subscription %7|1526545183.140|BRKREASSIGN|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520" management reassigned from broker :0/internal to 10.64.131.83:9092/bootstrap %7|1526545183.140|BRKUNASSIGN|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520" management unassigned from broker handle :0/internal %7|1526545183.140|BRKASSIGN|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520" management assigned to broker 10.64.131.83:9092/bootstrap %7|1526545183.140|CGRPQUERY|rdkafka#consumer-2| [thrd:main]: 10.64.131.83:9094/bootstrap: Group "CBE_DSF_VOICE01_20_9520": querying for coordinator: intervaled in state wait-broker %7|1526545183.141|LEADER|rdkafka#consumer-2| [thrd:10.64.131.83:9093/bootstrap]: 10.64.131.83:9093/1: Mapped 0 partition(s) to broker %7|1526545183.141|LEADER|rdkafka#consumer-2| [thrd:10.64.131.83:9094/bootstrap]: 10.64.131.83:9094/2: Mapped 0 partition(s) to broker %7|1526545183.141|LEADER|rdkafka#consumer-2| [thrd:10.64.131.83:9092/bootstrap]: 10.64.131.83:9092/0: Mapped 0 partition(s) to broker %7|1526545183.143|CGRPCOORD|rdkafka#consumer-2| [thrd:main]: 10.64.131.83:9094/2: Group "CBE_DSF_VOICE01_20_9520" coordinator is 10.64.131.83:9093 id 1 %7|1526545183.143|CGRPCOORD|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520" changing coordinator -1 -> 1 %7|1526545183.143|BRKREASSIGN|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520" management reassigned from broker 10.64.131.83:9092/0 to 10.64.131.83:9093/1 %7|1526545183.143|BRKUNASSIGN|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520" management unassigned from broker handle 10.64.131.83:9092/0 %7|1526545183.143|BRKASSIGN|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520" management assigned to broker 10.64.131.83:9093/1 %7|1526545183.143|CGRPSTATE|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520" changed state wait-broker -> wait-broker-transport (v1, join-state init) %7|1526545183.143|CGRPSTATE|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520" changed state wait-broker-transport -> up (v1, join-state init) %7|1526545183.143|JOIN|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520": join with 0 (1) subscribed topic(s) %7|1526545183.143|CGRPMETADATA|rdkafka#consumer-2| [thrd:main]: consumer join: metadata for subscription only available for 0/1 topics (-1ms old) %7|1526545183.143|JOIN|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520": postponing join until up-to-date metadata is available %7|1526545183.143|CGRPQUERY|rdkafka#consumer-2| [thrd:main]: 10.64.131.83:9093/1: Group "CBE_DSF_VOICE01_20_9520": querying for coordinator: intervaled in state up %7|1526545183.145|CGRPCOORD|rdkafka#consumer-2| [thrd:main]: 10.64.131.83:9093/1: Group "CBE_DSF_VOICE01_20_9520" coordinator is 10.64.131.83:9093 id 1 %7|1526545183.145|METADATA|rdkafka#consumer-2| [thrd:main]: 10.64.131.83:9094/2: 1/1 requested topic(s) seen in metadata %7|1526545183.145|SUBSCRIPTION|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520": effective subscription list changed from 0 to 1 topic(s): %7|1526545183.145|SUBSCRIPTION|rdkafka#consumer-2| [thrd:main]: Topic CBE_DSF_VOICE01_20_9520 with 3 partition(s) %7|1526545183.145|REJOIN|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520": subscription updated from metadata change: rejoining group %7|1526545183.145|GRPLEADER|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520": resetting group leader info: Group rejoin %7|1526545183.145|REJOIN|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520" rejoining in join-state init without an assignment %7|1526545183.145|JOIN|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520": join with 1 (1) subscribed topic(s) %7|1526545183.145|CGRPMETADATA|rdkafka#consumer-2| [thrd:main]: consumer join: metadata for subscription is up to date (0ms old) %7|1526545183.145|CGRPJOINSTATE|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520" changed join state init -> wait-join (v1, state up) %7|1526545183.193|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit internal error: Local: No offset stored %7|1526545183.193|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored %7|1526545183.193|UNASSIGN|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520": unassign done in state up (join state wait-join): without new assignment: OffsetCommit done (NO_OFFSET) %7|1526545183.253|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit internal error: Local: No offset stored %7|1526545183.253|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored %7|1526545183.253|UNASSIGN|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520": unassign done in state up (join state wait-join): without new assignment: OffsetCommit done (__NO_OFFSET) %7|1526545183.313|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit internal error: Local: No offset stored %7|1526545183.314|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored %7|1526545183.314|UNASSIGN|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520": unassign done in state up (join state wait-join): without new assignment: OffsetCommit done (NO_OFFSET) %7|1526545183.374|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit internal error: Local: No offset stored %7|1526545183.374|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored %7|1526545183.374|UNASSIGN|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520": unassign done in state up (join state wait-join): without new assignment: OffsetCommit done (NO_OFFSET) %7|1526545183.434|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit internal error: Local: No offset stored %7|1526545183.434|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored %7|1526545183.434|UNASSIGN|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520": unassign done in state up (join state wait-join): without new assignment: OffsetCommit done (__NO_OFFSET) %7|1526545183.494|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit internal error: Local: No offset stored %7|1526545183.494|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored %7|1526545183.494|UNASSIGN|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520": unassign done in state up (join state wait-join): without new assignment: OffsetCommit done (NO_OFFSET) %7|1526545183.554|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit internal error: Local: No offset stored %7|1526545183.554|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored %7|1526545183.554|UNASSIGN|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520": unassign done in state up (join state wait-join): without new assignment: OffsetCommit done (NO_OFFSET) %7|1526545183.614|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit internal error: Local: No offset stored %7|1526545183.614|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored %7|1526545183.614|UNASSIGN|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520": unassign done in state up (join state wait-join): without new assignment: OffsetCommit done (__NO_OFFSET) %7|1526545183.674|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit internal error: Local: No offset stored %7|1526545183.674|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored %7|1526545183.674|UNASSIGN|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520": unassign done in state up (join state wait-join): without new assignment: OffsetCommit done (NO_OFFSET) %7|1526545183.734|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit internal error: Local: No offset stored %7|1526545183.734|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored %7|1526545183.734|UNASSIGN|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520": unassign done in state up (join state wait-join): without new assignment: OffsetCommit done (NO_OFFSET) %7|1526545183.794|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit internal error: Local: No offset stored %7|1526545183.794|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored %7|1526545183.794|UNASSIGN|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520": unassign done in state up (join state wait-join): without new assignment: OffsetCommit done (__NO_OFFSET) %7|1526545183.854|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit internal error: Local: No offset stored %7|1526545183.854|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored %7|1526545183.854|UNASSIGN|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520": unassign done in state up (join state wait-join): without new assignment: OffsetCommit done (NO_OFFSET) %7|1526545183.914|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit internal error: Local: No offset stored %7|1526545183.914|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored %7|1526545183.914|UNASSIGN|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520": unassign done in state up (join state wait-join): without new assignment: OffsetCommit done (NO_OFFSET) %7|1526545183.974|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit internal error: Local: No offset stored %7|1526545183.974|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored %7|1526545183.974|UNASSIGN|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520": unassign done in state up (join state wait-join): without new assignment: OffsetCommit done (__NO_OFFSET) %7|1526545184.034|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit internal error: Local: No offset stored %7|1526545184.035|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored %7|1526545184.035|UNASSIGN|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520": unassign done in state up (join state wait-join): without new assignment: OffsetCommit done (NO_OFFSET) %7|1526545184.095|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit internal error: Local: No offset stored %7|1526545184.095|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored %7|1526545184.095|UNASSIGN|rdkafka#consumer-2| [thrd:main]: Group "CBE_DSF_VOICE01_20_9520": unassign done in state up (join state wait-join): without new assignment: OffsetCommit done (__NO_OFFSET)[ ] Provide broker log excerpts
[ ] Critical issue