Closed heketong closed 6 years ago
If you call destroy() without calling close() first then destroy() will call close() internally, to leave the group and commit final offsets, this may take some while. Please also upgrade to latest librdkafka v0.11.4
i already called close() before i call destroy() if you mean rd_kafka_consumer_close(), ** api code *** / The legacy/simple consumer lacks an API to close down the consumer/ if (rk->rk_cgrp) { rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Closing consumer group"); rd_kafka_consumer_close(rk); }
*all code is down here: ***** KafkaHighConsumer::destroy(){ ... rd_kafka_message_destroy(m_pRkmessage); rd_kafka_consumer_close(m_pKafka); rd_kafka_topic_partition_list_destroy(m_pTopicList); rd_kafka_destroy(m_pKafka); rd_kafka_wait_destroyed(1000) ... } *detail log is down here: *** 2018-04-03 18:04:36.035960 :[DEBUG] : KafkaHighConsumer::destroy: befor call rd_kafka_consumer_close 2018-04-03 18:04:36.035993 :[DEBUG] : logger : 1522749876.035 RDKAFKA-7-CGRPOP: rdkafka#consumer-6: [thrd:main]: Group "CBE_DSF_FCDR9520" received op TERMINATE (v0) in state up (join state wait-join, v1 vs 0)
2018-04-03 18:04:36.036010 :[DEBUG] : logger : 1522749876.036 RDKAFKA-7-CGRPTERM: rdkafka#consumer-6: [thrd:main]: Terminating group "CBE_DSF_FCDR9520" in state up with 0 partition(s)
2018-04-03 18:04:36.036027 :[DEBUG] : logger : 1522749876.036 RDKAFKA-7-UNSUBSCRIBE: rdkafka#consumer-6: [thrd:main]: Group "CBE_DSF_FCDR9520": unsubscribe from current subscription of 1 topics (leave group=yes, join state wait-join, v1)
2018-04-03 18:04:36.036037 :[DEBUG] : logger : 1522749876.036 RDKAFKA-7-SUBSCRIPTION: rdkafka#consumer-6: [thrd:main]: Group "CBE_DSF_FCDR9520": clearing subscribed topics list (1)
2018-04-03 18:04:36.036045 :[DEBUG] : logger : 1522749876.036 RDKAFKA-7-SUBSCRIPTION: rdkafka#consumer-6: [thrd:main]: Group "CBE_DSF_FCDR9520": effective subscription list changed from 1 to 0 topic(s):
2018-04-03 18:04:36.036054 :[DEBUG] : logger : 1522749876.036 RDKAFKA-7-CGRPJOINSTATE: rdkafka#consumer-6: [thrd:main]: Group "CBE_DSF_FCDR9520" changed join state wait-join -> wait-unassign (v1, state up)
2018-04-03 18:04:36.036062 :[DEBUG] : logger : 1522749876.036 RDKAFKA-7-UNASSIGN: rdkafka#consumer-6: [thrd:main]: Group "CBE_DSF_FCDR9520": unassign done in state up (join state wait-unassign): without new assignment: unassign (no previous assignment)
2018-04-03 18:04:36.036069 :[DEBUG] : logger : 1522749876.036 RDKAFKA-7-LEAVE: rdkafka#consumer-6: [thrd:main]: Group "CBE_DSF_FCDR9520": leave
2018-04-03 18:04:36.036090 :[DEBUG] : logger : 1522749876.036 RDKAFKA-7-CGRPJOINSTATE: rdkafka#consumer-6: [thrd:main]: Group "CBE_DSF_FCDR9520" changed join state wait-unassign -> init (v1, state up)
2018-04-03 18:04:36.036100 :[DEBUG] : logger : 1522749876.036 RDKAFKA-7-CGRPSTATE: rdkafka#consumer-6: [thrd:main]: Group "CBE_DSF_FCDR9520" changed state up -> term (v1, join-state init)
2018-04-03 18:04:36.036107 :[DEBUG] : logger : 1522749876.036 RDKAFKA-7-CGRPJOINSTATE: rdkafka#consumer-6: [thrd:main]: Group "CBE_DSF_FCDR9520" changed join state init -> wait-unassign (v1, state term)
2018-04-03 18:04:36.036114 :[DEBUG] : logger : 1522749876.036 RDKAFKA-7-UNASSIGN: rdkafka#consumer-6: [thrd:main]: Group "CBE_DSF_FCDR9520": unassign done in state term (join state wait-unassign): without new assignment: unassign (no previous assignment)
2018-04-03 18:04:36.036120 :[DEBUG] : logger : 1522749876.036 RDKAFKA-7-CGRPJOINSTATE: rdkafka#consumer-6: [thrd:main]: Group "CBE_DSF_FCDR9520" changed join state wait-unassign -> init (v1, state term)
2018-04-03 18:04:36.036129 :[DEBUG] : logger : 1522749876.036 RDKAFKA-7-BRKUNASSIGN: rdkafka#consumer-6: [thrd:main]: Group "CBE_DSF_FCDR9520" management unassigned from broker handle 10.64.131.83:9093/1
2018-04-03 18:04:36.036145 :[DEBUG] : KafkaHighConsumer::destroy: after call rd_kafka_consumer_close 2018-04-03 18:04:36.036156 :[DEBUG] : KafkaHighConsumer::destroy: Consumer closed.. 2018-04-03 18:04:36.036162 :[DEBUG] : KafkaHighConsumer::destroy: befor call rd_kafka_topic_partition_list_destroy 2018-04-03 18:04:36.036168 :[DEBUG] : KafkaHighConsumer::destroy: after call rd_kafka_topic_partition_list_destroy 2018-04-03 18:04:36.036173 :[DEBUG] : KafkaHighConsumer::destroy: befor call rd_kafka_destroy 2018-04-03 18:04:36.036181 :[DEBUG] : logger : 1522749876.036 RDKAFKA-7-DESTROY: rdkafka#consumer-6: [thrd:app]: Terminating instance
2018-04-03 18:04:36.036205 :[DEBUG] : logger : 1522749876.036 RDKAFKA-7-DESTROY: rdkafka#consumer-6: [thrd:main]: Destroy internal
2018-04-03 18:04:36.036218 :[DEBUG] : logger : 1522749876.036 RDKAFKA-7-DESTROY: rdkafka#consumer-6: [thrd:main]: Removing all topics
2018-04-03 18:04:48,377:42568(0x7fe8bcb10700):ZOO_WARN@zookeeper_interest@1557: Exceeded deadline by 13ms 2018-04-03 18:04:48,379:42568(0x7fe8bcb10700):ZOO_DEBUG@zookeeper_process@2264: Got ping response in 2 ms 2018-04-03 18:05:01.428318 :[DEBUG] : logger : 1522749901.428 RDKAFKA-7-JOINGROUP: rdkafka#consumer-6: [thrd:10.64.131.83:9093/bootstrap]: JoinGroup response: discarding outdated request (now in join-state init)
2018-04-03 18:05:01.530445 :[DEBUG] : logger : 1522749901.530 RDKAFKA-7-MEMBERID: rdkafka#consumer-6: [thrd:app]: Group "CBE_DSF_FCDR9520": updating member id "" -> "(not-set)"
2018-04-03 18:05:01.530525 :[DEBUG] : KafkaHighConsumer::destroy: after call rd_kafka_destroy
Read the FAQ first: https://github.com/edenhill/librdkafka/wiki/FAQ my code void KafkaHighConsumer::destroy(){ ...... log_debug("befor call rd_kafka_topic_partition_list_destroy"); rd_kafka_topic_partition_list_destroy(m_pTopicList); log_debug("after call rd_kafka_topic_partition_list_destroy");
............... }
my debug log: 2018-04-03 18:04:36.036162 :[DEBUG] : KafkaHighConsumer::destroy: befor call rd_kafka_topic_partition_list_destroy 2018-04-03 18:04:36.036168 :[DEBUG] : KafkaHighConsumer::destroy: after call rd_kafka_topic_partition_list_destroy 2018-04-03 18:04:36.036173 :[DEBUG] : KafkaHighConsumer::destroy: befor call rd_kafka_destroy 2018-04-03 18:04:36.036181 :[DEBUG] : logger : 1522749876.036 RDKAFKA-7-DESTROY: rdkafka#consumer-6: [thrd:app]: Terminating instance
2018-04-03 18:04:36.036205 :[DEBUG] : logger : 1522749876.036 RDKAFKA-7-DESTROY: rdkafka#consumer-6: [thrd:main]: Destroy internal
2018-04-03 18:04:36.036218 :[DEBUG] : logger : 1522749876.036 RDKAFKA-7-DESTROY: rdkafka#consumer-6: [thrd:main]: Removing all topics
2018-04-03 18:04:48,377:42568(0x7fe8bcb10700):ZOO_WARN@zookeeper_interest@1557: Exceeded deadline by 13ms 2018-04-03 18:04:48,379:42568(0x7fe8bcb10700):ZOO_DEBUG@zookeeper_process@2264: Got ping response in 2 ms 2018-04-03 18:05:01.428318 :[DEBUG] : logger : 1522749901.428 RDKAFKA-7-JOINGROUP: rdkafka#consumer-6: [thrd:10.64.131.83:9093/bootstrap]: JoinGroup response: discarding outdated request (now in join-state init)
2018-04-03 18:05:01.530445 :[DEBUG] : logger : 1522749901.530 RDKAFKA-7-MEMBERID: rdkafka#consumer-6: [thrd:app]: Group "CBE_DSF_FCDR9520": updating member id "" -> "(not-set)"
2018-04-03 18:05:01.530525 :[DEBUG] : KafkaHighConsumer::destroy: after call rd_kafka_destroy
my librdkafka version 0.11.0 Description