Open morningman opened 3 years ago
By the way, In my program, the RdKafka::KafkaConsumer
is reused by many threads. But only one thread will use it at one time. Hope this info can help. Thank you!
The same error occurs under kafka V2.5
Please provide a reproducible test program, thanks.
Please provide a reproducible test program, thanks.
HI @edenhill , sorry that it is hard to reproduce... It just happens occasionally. So I just hope to provide some possible reasons for this problem, so that we can further investigate.
There is not enough information to go on.
Please provide:
There is not enough information to go on.
Please provide:
- the assert reason itself (should be printed to stderr or stdout)
- when is this happening? Are there any other events (disconnects, rebalances, etc) at the same time?
- logs
- check your object usage: make sure you are not re-creating Topic objects and don't use destroyed/deleted objects.
Thanks a lot! I will check my program to see if more info can be provided!
Hi @edenhill I traced the code and find when calling https://github.com/edenhill/librdkafka/blob/c0cfc24cce8b77a1e42d9668dd1cfd015bbf0d6b/src/rdkafka_metadata.c#L863
And if run into this branch: https://github.com/edenhill/librdkafka/blob/c0cfc24cce8b77a1e42d9668dd1cfd015bbf0d6b/src/rdkafka_metadata.c#L880-L899
The rkb
is got from rd_kafka_broker_any_usable()
, and the destroy_rkb
will be set to 1.
And finally:
I am not sure if the rkb
which is got from rd_kafka_broker_any_usable()
is certainly created by current thread?
Because when calling rd_kafka_broker_destroy_final()
The application crashed at rd_assert(thrd_is_current(rkb->rkb_thread));
Is it a possible reason of this problem?
Hi @edenhill, same here as well, and it's happend when restart program due to the process had not consumed data for a period of time. and my program crash with following stack, I hope it can help you:
#6 <signal handler called>
#7 0x0000ffffa02d3200 in raise () from /lib64/libc.so.6
#8 0x0000ffffa02d45ac in abort () from /lib64/libc.so.6
#9 0x0000ffffa02cc60c in __assert_fail_base () from /lib64/libc.so.6
#10 0x0000ffffa02cc68c in __assert_fail () from /lib64/libc.so.6
#11 0x0000ffff9b4ccd30 in rd_kafka_broker_destroy_final (rkb=0xaaaadee22e60) at rdkafka_broker.c:5412
#12 0x0000ffff9b5529ac in rd_kafka_metadata_refresh_topics (rk=rk@entry=0xaaaadeea2140, rkb=0xaaaadee22e60, rkb@entry=0x0,
topics=topics@entry=0xffff18ca9ae0, force=force@entry=1 '\001', allow_auto_create=0 '\000',
cgrp_update=cgrp_update@entry=0 '\000', reason=reason@entry=0xffff9b600840 "broker down") at rdkafka_metadata.c:954
#13 0x0000ffff9b552d90 in rd_kafka_metadata_refresh_known_topics (rk=0xaaaadeea2140, rkb=rkb@entry=0x0, force=force@entry=1 '\001',
reason=reason@entry=0xffff9b600840 "broker down") at rdkafka_metadata.c:994
#14 0x0000ffff9b4c86e4 in rd_kafka_broker_fail (rkb=rkb@entry=0xaaaadeea3d90, level=level@entry=7,
err=err@entry=RD_KAFKA_RESP_ERR__TRANSPORT, fmt=fmt@entry=0xffff9b6022d8 "Closing connection due to nodename change")
at rdkafka_broker.c:625
#15 0x0000ffff9b4d6040 in rd_kafka_broker_op_serve (rkb=rkb@entry=0xaaaadeea3d90, rko=0xaaaadf5ac450) at rdkafka_broker.c:3342
#16 0x0000ffff9b4d6e0c in rd_kafka_broker_ops_serve (rkb=rkb@entry=0xaaaadeea3d90, timeout_us=<optimized out>, timeout_us@entry=0)
at rdkafka_broker.c:3378
#17 0x0000ffff9b4d6ef4 in rd_kafka_broker_ops_io_serve (rkb=rkb@entry=0xaaaadeea3d90, abs_timeout=<optimized out>)
at rdkafka_broker.c:3433
#18 0x0000ffff9b4d76f0 in rd_kafka_broker_consumer_serve (rkb=rkb@entry=0xaaaadeea3d90, abs_timeout=abs_timeout@entry=281471097680320)
at rdkafka_broker.c:5023
#19 0x0000ffff9b4d8924 in rd_kafka_broker_serve (rkb=rkb@entry=0xaaaadeea3d90, timeout_ms=<optimized out>, timeout_ms@entry=1000)
at rdkafka_broker.c:5166
#20 0x0000ffff9b4d8fd4 in rd_kafka_broker_thread_main (arg=0xaaaadeea3d90) at rdkafka_broker.c:5328
#21 0x0000ffffa0a7e7dc in start_thread () from /lib64/libpthread.so.0
#22 0x0000ffffa03759fc in thread_start () from /lib64/libc.so.6
(gdb) p *(rd_kafka_broker_t *)0xaaaadee22e60
$3 = {rkb_link = {tqe_next = 0xaaaadeea3d90, tqe_prev = 0xaaaadee27a30}, rkb_nodeid = 0, rkb_rsal = 0xaaaadf4029a0, rkb_ts_rsal_last = 188244515693, rkb_addr_last = 0xaaaadf4029a8, rkb_transport = 0x0, rkb_corrid = 3965,
rkb_connid = 1, rkb_ops = 0xaaaadee23ab0, rkb_lock = {__size = '\000' <repeats 17 times>, "\002", '\000' <repeats 29 times>, __align = 0}, rkb_blocking_max_ms = 0, rkb_toppars = {tqh_first = 0x0, tqh_last = 0xaaaadee22ee0},
rkb_toppar_cnt = 0, rkb_active_toppars = {cqh_first = 0xaaaadee22ef8, cqh_last = 0xaaaadee22ef8}, rkb_active_toppar_cnt = 0, rkb_active_toppar_next = 0x0, rkb_cgrp = 0x0, rkb_ts_fetch_backoff = 0, rkb_fetching = 1,
rkb_state = RD_KAFKA_BROKER_STATE_DOWN, rkb_ts_state = 190226149453, rkb_timeout_scan_intvl = {ri_ts_last = 190225875214, ri_fixed = 0, ri_backoff = 0}, rkb_blocking_request_cnt = {val = 0}, rkb_features = 8191,
rkb_ApiVersions = 0xaaaadf41fac0, rkb_ApiVersions_cnt = 48, rkb_ApiVersion_fail_intvl = {ri_ts_last = 188244522436, ri_fixed = 0, ri_backoff = 0}, rkb_source = RD_KAFKA_CONFIGURED, rkb_c = {tx_bytes = {val = 499041}, tx = {
val = 3965}, tx_err = {val = 0}, tx_retries = {val = 0}, req_timeouts = {val = 0}, rx_bytes = {val = 475700}, rx = {val = 3964}, rx_err = {val = 0}, rx_corrid_err = {val = 0}, rx_partial = {val = 0}, zbuf_grow = {
val = 0}, buf_grow = {val = 0}, wakeups = {val = 7998}, connects = {val = 1}, disconnects = {val = 1}, reqtype = {{val = 0}, {val = 3958}, {val = 2}, {val = 1}, {val = 0}, {val = 0}, {val = 0}, {val = 0}, {val = 0}, {
val = 0}, {val = 1}, {val = 0}, {val = 0}, {val = 0}, {val = 0}, {val = 0}, {val = 0}, {val = 1}, {val = 1}, {val = 0} <repeats 17 times>, {val = 1}, {val = 0} <repeats 22 times>}, ts_send = {val = 0}, ts_recv = {
val = 0}}, rkb_req_timeouts = 0, rkb_thread = 281471072502208, rkb_refcnt = {val = 0}, rkb_rk = 0xaaaadeea2140, rkb_recv_buf = 0x0, rkb_max_inflight = 1000000, rkb_outbufs = {rkbq_bufs = {tqh_first = 0x0,
tqh_last = 0xaaaadee23210}, rkbq_cnt = {val = 0}, rkbq_msg_cnt = {val = 0}}, rkb_waitresps = {rkbq_bufs = {tqh_first = 0x0, tqh_last = 0xaaaadee23228}, rkbq_cnt = {val = 0}, rkbq_msg_cnt = {val = 0}}, rkb_retrybufs = {
rkbq_bufs = {tqh_first = 0x0, tqh_last = 0xaaaadee23240}, rkbq_cnt = {val = 0}, rkbq_msg_cnt = {val = 0}}, rkb_avg_int_latency = {ra_v = {maxv = 0, minv = 0, avg = 0, sum = 0, cnt = 0, start = 0}, ra_lock = {
__size = '\000' <repeats 17 times>, "\002", '\000' <repeats 29 times>, __align = 0}, ra_enabled = 0, ra_type = RD_AVG_GAUGE, ra_hdr = 0x0, ra_hist = {p50 = 0, p75 = 0, p90 = 0, p95 = 0, p99 = 0, p99_99 = 0, oor = 0,
hdrsize = 0, stddev = 0, mean = 0}}, rkb_avg_outbuf_latency = {ra_v = {maxv = 0, minv = 0, avg = 0, sum = 0, cnt = 0, start = 0}, ra_lock = {__size = '\000' <repeats 17 times>, "\002", '\000' <repeats 29 times>,
__align = 0}, ra_enabled = 0, ra_type = RD_AVG_GAUGE, ra_hdr = 0x0, ra_hist = {p50 = 0, p75 = 0, p90 = 0, p95 = 0, p99 = 0, p99_99 = 0, oor = 0, hdrsize = 0, stddev = 0, mean = 0}}, rkb_avg_rtt = {ra_v = {maxv = 0,
minv = 0, avg = 0, sum = 0, cnt = 0, start = 0}, ra_lock = {__size = '\000' <repeats 17 times>, "\002", '\000' <repeats 29 times>, __align = 0}, ra_enabled = 0, ra_type = RD_AVG_GAUGE, ra_hdr = 0x0, ra_hist = {p50 = 0,
p75 = 0, p90 = 0, p95 = 0, p99 = 0, p99_99 = 0, oor = 0, hdrsize = 0, stddev = 0, mean = 0}}, rkb_avg_throttle = {ra_v = {maxv = 0, minv = 0, avg = 0, sum = 0, cnt = 0, start = 0}, ra_lock = {
__size = '\000' <repeats 17 times>, "\002", '\000' <repeats 29 times>, __align = 0}, ra_enabled = 0, ra_type = RD_AVG_GAUGE, ra_hdr = 0x0, ra_hist = {p50 = 0, p75 = 0, p90 = 0, p95 = 0, p99 = 0, p99_99 = 0, oor = 0,
hdrsize = 0, stddev = 0, mean = 0}}, rkb_name = "sasl_ssl://193.1.93.77:9092/0\000otstrap", '\000' <repeats 218 times>, rkb_nodename = "193.1.93.77:9092", '\000' <repeats 239 times>, rkb_port = 9092,
rkb_origname = 0xaaaadeea7fb0 "193.1.93.77", rkb_nodename_epoch = 0, rkb_connect_epoch = 0, rkb_logname = 0xaaaadeea5b00 "sasl_ssl://193.1.93.77:9092/0", rkb_logname_lock = {
__size = '\000' <repeats 17 times>, "\002", '\000' <repeats 29 times>, __align = 0}, rkb_wakeup_fd = {100, 101}, rkb_toppar_wakeup_fd = -1, rkb_reconnect_backoff_ms = 10000, rkb_ts_reconnect = 188251240592,
rkb_persistconn = {internal = 0, coord = {val = 0}}, rkb_monitors = {tqh_first = 0x0, tqh_last = 0xaaaadee237c8}, rkb_coord_monitor = {rkbmon_link = {tqe_next = 0x0, tqe_prev = 0xaaaadee237c8}, rkbmon_rkb = 0x0,
rkbmon_q = 0xaaaadeea3160, rkbmon_cb = 0xffff9b58caa0 <rd_kafka_coord_rkb_monitor_cb>}, rkb_proto = RD_KAFKA_PROTO_SASL_SSL, rkb_down_reported = 1, rkb_sasl_kinit_refresh_tmr = {rtmr_link = {tqe_next = 0x0,
tqe_prev = 0x0}, rtmr_next = 0, rtmr_interval = 0, rtmr_oneshot = 0 '\000', rtmr_callback = 0x0, rtmr_arg = 0x0}, rkb_suppress = {unsupported_compression = {ri_ts_last = 0, ri_fixed = 0, ri_backoff = 0},
unsupported_kip62 = {ri_ts_last = 0, ri_fixed = 0, ri_backoff = 0}, unsupported_kip345 = {ri_ts_last = 0, ri_fixed = 0, ri_backoff = 0}, fail_error = {ri_ts_last = 0, ri_fixed = 0, ri_backoff = 0}}, rkb_last_err = {
errstr = "Broker handle is terminating", '\000' <repeats 483 times>, err = RD_KAFKA_RESP_ERR__DESTROY, cnt = 1}}
(gdb) info threads
Id Target Id Frame
* 1 Thread 0xffff18cab1c0 (LWP 16597) 0x0000ffff9b5529ac in rd_kafka_metadata_refresh_topics (rk=rk@entry=0xaaaadeea2140, rkb=0xaaaadee22e60, rkb@entry=0x0, topics=topics@entry=0xffff18ca9ae0, force=force@entry=1 '\001',
allow_auto_create=0 '\000', cgrp_update=cgrp_update@entry=0 '\000', reason=reason@entry=0xffff9b600840 "broker down") at rdkafka_metadata.c:954
2 Thread 0xffff9df761c0 (LWP 15599) 0x0000ffffa0a88ec0 in nanosleep () from /lib64/libpthread.so.0
3 Thread 0xffff9e7771c0 (LWP 15598) 0x0000ffffa0a88ec0 in nanosleep () from /lib64/libpthread.so.0
4 Thread 0xffff922a71c0 (LWP 15621) 0x0000ffffa0a88ec0 in nanosleep () from /lib64/libpthread.so.0
5 Thread 0xffff9536f1c0 (LWP 15614) 0x0000ffffa036e76c in select () from /lib64/libc.so.6
6 Thread 0xffff93b2b1c0 (LWP 15617) 0x0000ffffa0a88ec0 in nanosleep () from /lib64/libpthread.so.0
7 Thread 0xffff9332a1c0 (LWP 15618) 0x0000ffffa0a877bc in do_futex_wait () from /lib64/libpthread.so.0
8 Thread 0xffff94b2d1c0 (LWP 15615) 0x0000ffffa0a87540 in do_futex_wait.constprop () from /lib64/libpthread.so.0
9 Thread 0xffff9432c1c0 (LWP 15616) 0x0000ffffa036e76c in select () from /lib64/libc.so.6
10 Thread 0xffff8e29f1c0 (LWP 15629) 0x0000ffffa0a88ec0 in nanosleep () from /lib64/libpthread.so.0
11 Thread 0xffff91aa61c0 (LWP 15622) 0x0000ffffa0a87540 in do_futex_wait.constprop () from /lib64/libpthread.so.0
12 Thread 0xffff90aa41c0 (LWP 15624) 0x0000ffffa0a88ec0 in nanosleep () from /lib64/libpthread.so.0
13 Thread 0xffff8eaa01c0 (LWP 15628) 0x0000ffffa0375b24 in epoll_pwait () from /lib64/libc.so.6
14 Thread 0xffff8da9e1c0 (LWP 15630) 0x0000ffffa0a7fb2c in __pthread_timedjoin_ex () from /lib64/libpthread.so.0
15 Thread 0xffff932a91c0 (LWP 15619) 0x0000ffffa0a87540 in do_futex_wait.constprop () from /lib64/libpthread.so.0
16 Thread 0xffffa0ad6010 (LWP 15577) 0x0000ffffa0346538 in nanosleep () from /lib64/libc.so.6
17 Thread 0xffff8ba9a1c0 (LWP 15634) 0x0000ffffa0375b24 in epoll_pwait () from /lib64/libc.so.6
18 Thread 0xffff8f2a11c0 (LWP 15627) 0x0000ffffa0a87540 in do_futex_wait.constprop () from /lib64/libpthread.so.0
19 Thread 0xffff95ebf1c0 (LWP 15613) 0x0000ffffa0346538 in nanosleep () from /lib64/libc.so.6
20 Thread 0xffff8c29b1c0 (LWP 15633) 0x0000ffffa0375b24 in epoll_pwait () from /lib64/libc.so.6
21 Thread 0xffff8ca9c1c0 (LWP 15632) 0x0000ffffa0375b24 in epoll_pwait () from /lib64/libc.so.6
22 Thread 0xffff844681c0 (LWP 15659) 0x0000ffffa0346538 in nanosleep () from /lib64/libc.so.6
23 Thread 0xffff8a2971c0 (LWP 15637) 0x0000ffffa0375b24 in epoll_pwait () from /lib64/libc.so.6
24 Thread 0xffff8b2991c0 (LWP 15635) 0x0000ffffa0375b24 in epoll_pwait () from /lib64/libc.so.6
25 Thread 0xffff8d29d1c0 (LWP 15631) 0x0000ffffa0375b24 in epoll_pwait () from /lib64/libc.so.6
26 Thread 0xffff912a51c0 (LWP 15623) 0x0000ffffa0a87540 in do_futex_wait.constprop () from /lib64/libpthread.so.0
27 Thread 0xffff8aa981c0 (LWP 15636) 0x0000ffffa0375b24 in epoll_pwait () from /lib64/libc.so.6
28 Thread 0xffff88a941c0 (LWP 15651) 0x0000ffffa0346538 in nanosleep () from /lib64/libc.so.6
29 Thread 0xffff80c611c0 (LWP 16063) 0x0000ffffa0a84c38 in pthread_cond_wait@@GLIBC_2.17 () from /lib64/libpthread.so.0
30 Thread 0xffff87a921c0 (LWP 15653) 0x0000ffffa0a7fb2c in __pthread_timedjoin_ex () from /lib64/libpthread.so.0
31 Thread 0xffff89a961c0 (LWP 15638) 0x0000ffffa0375b24 in epoll_pwait () from /lib64/libc.so.6
32 Thread 0xffff892951c0 (LWP 15639) 0x0000ffffa0375b24 in epoll_pwait () from /lib64/libc.so.6
33 Thread 0xffff804601c0 (LWP 16064) 0x0000ffffa0a84c38 in pthread_cond_wait@@GLIBC_2.17 () from /lib64/libpthread.so.0
34 Thread 0xffff814621c0 (LWP 16062) 0x0000ffffa0346538 in nanosleep () from /lib64/libc.so.6
35 Thread 0xffff7e45c1c0 (LWP 16068) 0x0000ffffa0346538 in nanosleep () from /lib64/libc.so.6
36 Thread 0xffff92aa81c0 (LWP 15620) 0x0000ffffa0375b24 in epoll_pwait () from /lib64/libc.so.6
37 Thread 0xffff8528d1c0 (LWP 15658) 0x0000ffffa036e76c in select () from /lib64/libc.so.6
38 Thread 0xffff7ec5d1c0 (LWP 16067) 0x0000ffffa0346538 in nanosleep () from /lib64/libc.so.6
39 Thread 0xffff902a31c0 (LWP 15625) 0x0000ffffa0a87540 in do_futex_wait.constprop () from /lib64/libpthread.so.0
40 Thread 0xffff81c631c0 (LWP 16061) 0x0000ffffa0346538 in nanosleep () from /lib64/libc.so.6
41 Thread 0xffff7f45e1c0 (LWP 16066) 0x0000ffffa0346538 in nanosleep () from /lib64/libc.so.6
42 Thread 0xffff7dc5b1c0 (LWP 16069) 0x0000ffffa0346538 in nanosleep () from /lib64/libc.so.6
43 Thread 0xffff2de3b1c0 (LWP 16569) 0x0000ffffa0a88ec0 in nanosleep () from /lib64/libpthread.so.0
following stack, I hop
I'm trying to solve this problem, can you tell me how to reproduce it
following stack, I hop
I'm trying to solve this problem, can you tell me how to reproduce it
I'm sorry, I have no idea about how to reproduce it. It happens occasionally.
If thread_1 (rdkafka_broker_thread_main) is executing rd_kafka_metadata_refresh_known_topics (which will select one rbk to fetch topics), and then thread_2 (rdkafka_broker_thread_main which create the rbk ) exit, finally the thread_1 will destroy the rbk, so the check is problematic
starrocks_be: rdkafka_broker.c:5412: rd_kafka_broker_destroy_final: Assertion `thrd_is_current(rkb->rkb_thread)' failed
@edenhill
@edenhill I am also troubled by this problem, can you provide some help?
I'm guessing this only happens on client termination, rd_kafka_destroy()
, correct?
I've seen it happen once as well. Cannot reproduce though and all I got is:
rdkafka_broker.c:4633: rd_kafka_broker_destroy_final: Assertion `thrd_is_current(rkb->rkb_thread)' failed.
It happens on 2.2.0 via rdkafka-ruby in the context of a short lived consumer without a consumer group assigned. I use it to read data in a one-shot.
I need to reproduce it with a test, but I think here the problem comes from the assert that checks that the thread that calls destroy final for that broker is the same broker thread. Given the struct is ref-counted, it can be that the count reaches zero in another thread, after broker thread already exited.
In previous stack trace destroy is called from rd_kafka_broker_thread_main
, but given rd_kafka_metadata_refresh_topics
calls rd_kafka_broker_any_usable
, it can be a different broker thread.
@emasab I was not able to reproduce it but I wonder if running rd_kafka_unsubscribe
before closing would not mitigate the case of running metadata refresh on brokers alongside?
@mensfeld probably not, because the metadata refresh can happen independently from subscription, with a producer too.
About the fix, it could be removing the assert completely or setting rkb_thread to NULL when thread exits and don't fail the assert in that case.
probably not, because the metadata refresh can happen independently from subscription, with a producer too.
Thanks. Two more questions as a followup:
I would love to at least partially mitigate this prior to having a fix (or even a repro).
I will try to reproduce it as well.
@emasab can I mitigate it (at least partially) by checking the statistics age of metadata for brokers and "waiting" out the refresh if the time is too close to it? If I know the frequency (which is user controller) and I do know the age (if this is what statistics publish), putting aside edge cases (cluster changes that would trigger refresh), the periodic one could we "waited out", right?
@edenhill @mensfeld Any update here. We faced the similar issue of assert fail while calling consumer close after unsubscribe. It resulted in a critical application getting down. Would also be ok if someone could suggest a quick solution meanwhile it is fixed.
@atul-raghuwanshi you can use statistics to figure out the metadata refresh time (putting aside cluster changes ofc) and make sure you do not close around it
I have same problem, is there anynoe slove this problem?
We have a customer who has also ran into this problem a few times when using a newer version of our software which uses librdkafka 1.9.2. Unfortunately we haven't been able to reproduce the issue ourselves in our tests. However, the customer had previously been using an older version of our software that was using librdkafka 1.3.0, and hadn't run into it, so we gave them a newer version of our software with that older library and they haven't been able to reproduce it. So it seems that the issue was introduced somewhere between version 1.3.0 and 1.9.2
@Long-Wu-code @JSoet we used a force metadata fetch before closing consumer to bypass the issue and that seems to work for our case. SInce then we have not encountered the issue.
@atul-raghuwanshi Thanks, so you just do the metadata fetch and then are able to immediately close? You then don't need to do anything with the statistics and calculating exactly when to close, as I think was suggested above by @mensfeld ?
@atul-raghuwanshi Just wanted to check if you saw my question above? Just curious if that's all that you had to do...
Read the FAQ first: https://github.com/edenhill/librdkafka/wiki/FAQ
Do NOT create issues for questions, use the discussion forum: https://github.com/edenhill/librdkafka/discussions
Description
My program crash with following stack:
How to reproduce
I don't know how to reproduce, my program is a little bit complicated. And all things go well at most cases. This error only happens occasionally.
I just want to know under what circumstances may the above problem occur so that I can continue to troubleshoot my program.
IMPORTANT: Always try to reproduce the issue on the latest released version (see https://github.com/edenhill/librdkafka/releases), if it can't be reproduced on the latest version the issue has been fixed.
Checklist
IMPORTANT: We will close issues where the checklist has not been completed.
Please provide the following information:
1.8.0
0.11
centos7 x86_64
debug=..
as necessary) from librdkafka: No logs, just crash...