confluentinc / librdkafka

The Apache Kafka C/C++ library
Other
7.37k stars 3.11k forks source link

Producer never reconnects to broker #4615

Closed cschwarz73 closed 4 months ago

cschwarz73 commented 4 months ago

We have an application that submits a continuous stream of messages to a kafka broker via a single topic. The application runs on Linux (CentOS 7 / 32 bit) and uses librdkafka v1.7.0 (C++ API).

Occasionally, and for unknown reasons, the producer loses connection to the broker and never reconnects. In some instances we observed weeks without recovery.

To recover, we manually restart the application, which recreates the producer and at that time it successfully connects to the broker.

While the application was in the problem state, we observed an ESTABLISHED socket to the broker still present in the kernel, with empty Recv and Send queues. However, on the broker, the socket no longer existed. It is likely that the broker was restarted in the mean time, but the producer didn't notice. Example:

# netstat -nta
Active Internet connections (servers and established)
Proto Recv-Q Send-Q Local Address           Foreign Address         State      
tcp        0      0 10.10.0.1:55053         10.10.0.2:9094          ESTABLISHED

The application has broker tracing enabled (debug=broker) by default, and while in the problem state it keeps constantly tracing the following three messages:

2024-01-25 13:01:02.311834 21978 3 log event: Cluster connection already in progress: refresh unavailable topics
2024-01-25 13:01:02.311942 21978 3 log event: Not selecting any broker for cluster connection: still suppressed for 49ms: no cluster connection
2024-01-25 13:01:02.312197 21978 4 send :1315620 -> true
2024-01-25 13:01:03.312280 21978 3 log event: Cluster connection already in progress: refresh unavailable topics
2024-01-25 13:01:03.312376 21978 3 log event: Not selecting any broker for cluster connection: still suppressed for 49ms: no cluster connection
2024-01-25 13:01:03.312585 21978 4 send :1315621 -> true
...
(same messages repeat for 2+ hours)
...
2024-01-25 15:12:20.455235 21978 3 log event: Cluster connection already in progress: refresh unavailable topics
2024-01-25 15:12:20.455352 21978 3 log event: Not selecting any broker for cluster connection: still suppressed for 49ms: no cluster connection
2024-01-25 15:12:20.455893 21978 4 send :1323494 -> true
2024-01-25 15:12:21.455775 21978 3 log event: Cluster connection already in progress: refresh unavailable topics
2024-01-25 15:12:21.455889 21978 3 log event: Not selecting any broker for cluster connection: still suppressed for 49ms: no cluster connection
2024-01-25 15:12:21.456393 21978 4 send :1323495 -> true

The application injects a "heartbeat" message every second, which is the "send -> true" trace. It indicates a successful call to producer's produce method. The application also polls the producer every seconds and the log events are delivered during the poll (log.queue=true).

The log events seem to indicate that a connection is in progress, but from the timestamps we can tell that nothing happened for over 2 hours.

The last error event delivered by librdkafka, which is tracked separately was:

Connection failed, ssl://10.10.0.2:9094/0: ApiVersionRequest failed: Local: Timed out: probably due to broker version < 0.10 (see api.version.request configuration) (after 10917ms in state APIVERSION_QUERY)

Appreciate any ideas how to narrow this down.

cschwarz73 commented 4 months ago

Today we collected the following information from one of the application instances in problem state:

TID   Thread name
25719 Application main thread (==PID)
25751 rdk:broker0
25750 rdk:broker-1
25749 rdk:main
++ lsof -nPp 25719
COMMAND     PID USER   FD      TYPE             DEVICE SIZE/OFF     NODE NAME
...
Application 25719 root   44u     IPv4           52857439      0t0      TCP 10.xx.xx.xx:43377->xx.xx.xxx.xxx:9094 (ESTABLISHED)
...
++ netstat -nta
Active Internet connections (servers and established)
Proto Recv-Q Send-Q Local Address           Foreign Address         State      
...
tcp        0      0 10.xx.xx.xx:43377       xx.xx.xxx.xxx:9094      ESTABLISHED
...
strace for TID 25750

25750 15:52:19.445544 restart_syscall(<... resuming interrupted futex ...> <unfinished ...>
25750 15:52:19.492201 <... restart_syscall resumed>) = -1 ETIMEDOUT (Connection timed out) <0.046623>
25750 15:52:19.492286 futex(0x5888d960, FUTEX_WAKE_PRIVATE, 1) = 0 <0.000022>
25750 15:52:19.492393 clock_gettime(CLOCK_REALTIME, {tv_sec=1707771139, tv_nsec=492423791}) = 0 <0.000021>
25750 15:52:19.492483 futex(0x5888d97c, FUTEX_WAIT_PRIVATE, 4380609, {tv_sec=0, tv_nsec=999952102} <unfinished ...>
25750 15:52:20.492580 <... futex resumed>) = -1 ETIMEDOUT (Connection timed out) <1.000054>
25750 15:52:20.492841 futex(0x5888d960, FUTEX_WAKE_PRIVATE, 1) = 0 <0.000021>
25750 15:52:20.495545 clock_gettime(CLOCK_REALTIME, {tv_sec=1707771140, tv_nsec=495581241}) = 0 <0.000023>
25750 15:52:20.495644 futex(0x5888d97c, FUTEX_WAIT_PRIVATE, 4380611, {tv_sec=0, tv_nsec=999948699} <unfinished ...>
...
(continues indefinitely)
strace for TID 25751

25751 15:52:19.445350 restart_syscall(<... resuming interrupted restart_syscall ...> <unfinished ...>
25751 15:52:20.154198 <... restart_syscall resumed>) = 0 <0.708654>
25751 15:52:20.154304 poll([{fd=44, events=POLLIN}, {fd=41, events=POLLIN}], 2, 1000 <unfinished ...>
25751 15:52:21.155207 <... poll resumed>) = 0 (Timeout) <1.000844>
25751 15:52:21.155328 poll([{fd=44, events=POLLIN}, {fd=41, events=POLLIN}], 2, 1000 <unfinished ...>
25751 15:52:22.155492 <... poll resumed>) = 0 (Timeout) <1.000106>
25751 15:52:22.155624 poll([{fd=44, events=POLLIN}, {fd=41, events=POLLIN}], 2, 1000 <unfinished ...>
25751 15:52:23.156527 <... poll resumed>) = 0 (Timeout) <1.000841>
25751 15:52:23.156642 poll([{fd=44, events=POLLIN}, {fd=41, events=POLLIN}], 2, 1000 <unfinished ...>
25751 15:52:24.157475 <... poll resumed>) = 0 (Timeout) <1.000773>
25751 15:52:24.157606 poll([{fd=44, events=POLLIN}, {fd=41, events=POLLIN}], 2, 1000 <unfinished ...>
25751 15:52:25.158173 <... poll resumed>) = 0 (Timeout) <1.000509>
...
(continues indefinitely)

Backtraces for librdkafka threads:

Thread 4 (Thread 0xe98c0b40 (LWP 25751) "rdk:broker0"):
#0  0xf7eded79 in __kernel_vsyscall ()
#1  0xf7776c6b in poll () from /lib/libc.so.6
#2  0xf41b5c34 in rd_kafka_transport_poll.localalias () from /lib/librdkafka.so.1
#3  0xf41b5ca1 in rd_kafka_transport_io_serve () from /lib/librdkafka.so.1
#4  0xf419bf68 in rd_kafka_broker_ops_io_serve () from /lib/librdkafka.so.1
#5  0xf419dd60 in rd_kafka_broker_serve () from /lib/librdkafka.so.1
#6  0xf419e5dc in rd_kafka_broker_thread_main () from /lib/librdkafka.so.1
#7  0xf7658f63 in start_thread () from /lib/libpthread.so.0
#8  0xf7782abe in clone () from /lib/libc.so.6
Thread 3 (Thread 0xea0c1b40 (LWP 25750) "rdk:broker-1"):
#0  0xf7eded79 in __kernel_vsyscall ()
#1  0xf765d604 in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib/libpthread.so.0
#2  0xf4215b00 in cnd_timedwait () from /lib/librdkafka.so.1
#3  0xf4216004 in cnd_timedwait_abs () from /lib/librdkafka.so.1
#4  0xf41bb2a7 in rd_kafka_q_pop_serve.localalias () from /lib/librdkafka.so.1
#5  0xf41bb35d in rd_kafka_q_pop () from /lib/librdkafka.so.1
#6  0xf419be3c in rd_kafka_broker_ops_serve () from /lib/librdkafka.so.1
#7  0xf419bfe6 in rd_kafka_broker_ops_io_serve () from /lib/librdkafka.so.1
#8  0xf419e1e4 in rd_kafka_broker_serve () from /lib/librdkafka.so.1
#9  0xf419e9bc in rd_kafka_broker_thread_main () from /lib/librdkafka.so.1
#10 0xf7658f63 in start_thread () from /lib/libpthread.so.0
#11 0xf7782abe in clone () from /lib/libc.so.6
Thread 2 (Thread 0xea8c2b40 (LWP 25749) "rdk:main"):
#0  0xf7eded79 in __kernel_vsyscall ()
#1  0xf765d604 in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib/libpthread.so.0
#2  0xf4215b00 in cnd_timedwait () from /lib/librdkafka.so.1
#3  0xf4216004 in cnd_timedwait_abs () from /lib/librdkafka.so.1
#4  0xf41bb541 in rd_kafka_q_serve.localalias () from /lib/librdkafka.so.1
#5  0xf4182208 in rd_kafka_thread_main () from /lib/librdkafka.so.1
#6  0xf7658f63 in start_thread () from /lib/libpthread.so.0
#7  0xf7782abe in clone () from /lib/libc.so.6

application/rdkafka logs show messages getting enqueued, timeout, cluster connection in progress indefinitely:

2024-02-12 12:52:13.895781 6 305595.260980257, +267444 "doQueueRecord"   <<<< Producer::produce
2024-02-12 12:52:14.895876 3 305596.261075545, +2400134222 "log event: Cluster connection already in progress: refresh unavailable topics" <<< Event callback
2024-02-12 12:52:14.895907 3 305596.261106053, +73217 "log event: Not selecting any broker for cluster connection: still suppressed for 49ms: no cluster connection" <<< Event callback
2024-02-12 12:52:14.895930 2 305596.261129146, +55418 "handleDeliveryError 2289446 Local: Message timed out"  <<<<< DeliveryReport callback
2024-02-12 12:52:14.895938 6 305596.261137256, +19464 "doQueueRecord"
2024-02-12 12:52:14.896046 6 305596.261245122, +258867 "doQueueRecord"
2024-02-12 12:52:15.896204 3 305597.261402497, +2400069664 "log event: Cluster connection already in progress: refresh unavailable topics"
2024-02-12 12:52:15.896234 3 305597.261432628, +72313 "log event: Not selecting any broker for cluster connection: still suppressed for 49ms: no cluster connection"
2024-02-12 12:52:15.896258 2 305597.261457172, +58902 "handleDeliveryError 2289447 Local: Message timed out"
2024-02-12 12:52:15.896265 6 305597.261464013, +16416 "doQueueRecord"
2024-02-12 12:52:15.896381 6 305597.261579964, +278271 "doQueueRecord"
2024-02-12 12:52:16.899101 3 305598.264300236, +2406433938 "log event: Cluster connection already in progress: refresh unavailable topics"
2024-02-12 12:52:16.899133 3 305598.264332280, +76900 "log event: Not selecting any broker for cluster connection: still suppressed for 49ms: no cluster connection"
2024-02-12 12:52:16.899157 2 305598.264356289, +57618 "handleDeliveryError 2289448 Local: Message timed out"
...
(continues indefinitely)

As far as we can tell, librdkafka keeps polling the dead broker connection. Polls keep timing out after 1 second but this doesn't have any effect. It never hits an application timeout.

From an application log we can see that the last successful message delivery to the broker as 24 days ago. The application instance has been in this state ever since.

cschwarz73 commented 4 months ago

Examined the core file from an application instance in problem state:

(gdb) info thr
  Id   Target Id                     Frame 
* 1    Thread 0xf6b14b00 (LWP 25719) 0xf7eded79 in __kernel_vsyscall ()
  2    Thread 0xea8c2b40 (LWP 25749) 0xf7eded79 in __kernel_vsyscall ()
  3    Thread 0xea0c1b40 (LWP 25750) 0xf7eded79 in __kernel_vsyscall ()
  4    Thread 0xe98c0b40 (LWP 25751) 0xf7eded79 in __kernel_vsyscall ()
(gdb) thr 2
[Switching to thread 2 (Thread 0xea8c2b40 (LWP 25749))]
#0  0xf7eded79 in __kernel_vsyscall ()
(gdb) bt 
#0  0xf7eded79 in __kernel_vsyscall ()
#1  0xf765d604 in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib/libpthread.so.0
#2  0xf4215b00 in cnd_timedwait (cond=0x587ea128, mtx=0x587ea110, ts=0xea8c1dd8) at tinycthread.c:462
#3  0xf4216004 in cnd_timedwait_abs (cnd=0x587ea128, mtx=0x587ea110, tspec=0x80) at tinycthread_extra.c:106
#4  0xf41bb541 in rd_kafka_q_serve (rkq=0x587ea110, timeout_ms=999, max_cnt=0, cb_type=RD_KAFKA_Q_CB_CALLBACK, callback=0x0, 
    opaque=0x0) at rdkafka_queue.c:482
#5  0xf4182208 in rd_kafka_thread_main (arg=0x5880bfe0) at rdkafka.c:2058
#6  0xf7658f63 in start_thread () from /lib/libpthread.so.0
#7  0xf7782abe in clone () from /lib/libc.so.6
(gdb) thr 3 
[Switching to thread 3 (Thread 0xea0c1b40 (LWP 25750))]
#0  0xf7eded79 in __kernel_vsyscall ()
(gdb) bt
#0  0xf7eded79 in __kernel_vsyscall ()
#1  0xf765d604 in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib/libpthread.so.0
#2  0xf4215b00 in cnd_timedwait (cond=0x5888d978, mtx=0x5888d960, ts=0xea0c0a28) at tinycthread.c:462
#3  0xf4216004 in cnd_timedwait_abs (cnd=0x5888d978, mtx=0x5888d960, tspec=0x80) at tinycthread_extra.c:106
#4  0xf41bb2a7 in rd_kafka_q_pop_serve (rkq=0x5888d960, timeout_us=999999, version=0, cb_type=RD_KAFKA_Q_CB_RETURN, callback=0x0, 
    opaque=0x0) at rdkafka_queue.c:412
#5  0xf41bb35d in rd_kafka_q_pop (rkq=0x5888d960, timeout_us=999999, version=0) at rdkafka_queue.c:436
#6  0xf419be3c in rd_kafka_broker_ops_serve (rkb=0x5888e220, timeout_us=549760194657) at rdkafka_broker.c:3377
#7  0xf419bfe6 in rd_kafka_broker_ops_io_serve (rkb=0x5888e220, abs_timeout=999999) at rdkafka_broker.c:3433
#8  0xf419e1e4 in rd_kafka_broker_internal_serve (abs_timeout=<optimized out>, rkb=<optimized out>) at rdkafka_broker.c:3610
#9  rd_kafka_broker_serve (rkb=0xfffffdfc, timeout_ms=4380769) at rdkafka_broker.c:5159
#10 0xf419e9bc in rd_kafka_broker_thread_main (arg=0x5888e220) at rdkafka_broker.c:5222
#11 0xf7658f63 in start_thread () from /lib/libpthread.so.0
#12 0xf7782abe in clone () from /lib/libc.so.6
(gdb) thr 4
[Switching to thread 4 (Thread 0xe98c0b40 (LWP 25751))]
#0  0xf7eded79 in __kernel_vsyscall ()
(gdb) bt
#0  0xf7eded79 in __kernel_vsyscall ()
#1  0xf7776c6b in poll () from /lib/libc.so.6
#2  0xf41b5c34 in poll (__timeout=<optimized out>, __nfds=<optimized out>, __fds=<optimized out>) at /usr/include/bits/poll2.h:41
#3  rd_kafka_transport_poll (rktrans=0xe8d22660, tmout=1000) at rdkafka_transport.c:977
#4  0xf41b5ca1 in rd_kafka_transport_io_serve (rktrans=0xe8d22660, timeout_ms=1000) at rdkafka_transport.c:809
#5  0xf419bf68 in rd_kafka_broker_ops_io_serve (rkb=0x5888ece0, abs_timeout=7629017360111) at rdkafka_broker.c:3425
#6  0xf419dd60 in rd_kafka_broker_producer_serve (abs_timeout=<optimized out>, rkb=<optimized out>) at rdkafka_broker.c:4032
#7  rd_kafka_broker_serve (rkb=0xfffffdfc, timeout_ms=1000) at rdkafka_broker.c:5164
#8  0xf419e5dc in rd_kafka_broker_thread_main (arg=0x5888ece0) at rdkafka_broker.c:5313
#9  0xf7658f63 in start_thread () from /lib/libpthread.so.0
#10 0xf7782abe in clone () from /lib/libc.so.6
(gdb) frame 3
#3  rd_kafka_transport_poll (rktrans=0xe8d22660, tmout=1000) at rdkafka_transport.c:977
977     r = poll(rktrans->rktrans_pfd, rktrans->rktrans_pfd_cnt, tmout);
(gdb) p *rktrans
$15 = {rktrans_s = 44, rktrans_rkb = 0x5888ece0, rktrans_ssl = 0xe8d20d48, rktrans_sasl = {state = 0x0, complete = 0, msg = {msg_name = 0x0, msg_namelen = 0, msg_iov = 0x0, msg_iovlen = 0, msg_control = 0x0, 
      msg_controllen = 0, msg_flags = 0}, iov = {{iov_base = 0x0, iov_len = 0}, {iov_base = 0x0, iov_len = 0}}, recv_buf = 0x0, recv_of = 0, recv_len = 0}, rktrans_recv_buf = 0x0, rktrans_pfd = {{fd = 44, 
      events = 1, revents = 0}, {fd = 41, events = 1, revents = 0}}, rktrans_pfd_cnt = 2, rktrans_rcvbuf_size = 131072, rktrans_sndbuf_size = 65536}
(gdb) p *rktrans->rktrans_rkb
$16 = {rkb_link = {tqe_next = 0x0, tqe_prev = 0x5880bfe8}, rkb_nodeid = 0, rkb_rsal = 0xe8d19110, rkb_ts_rsal_last = 5519115830224, rkb_addr_last = 0xe8d19118, rkb_transport = 0xe8d22660, rkb_corrid = 145669, 
  rkb_connid = 4, rkb_ops = 0x5888d9f0, rkb_lock = {__data = {__lock = 0, __count = 0, __owner = 0, __kind = 0, __nusers = 0, {__elision_data = {__espins = 0, __elision = 0}, __list = {__next = 0x0}}}, 
    __size = '\000' <repeats 23 times>, __align = 0}, rkb_blocking_max_ms = 0, rkb_toppars = {tqh_first = 0xf1d00710, tqh_last = 0xf1d00718}, rkb_toppar_cnt = 1, rkb_active_toppars = {cqh_first = 0xf1d00710, 
    cqh_last = 0xf1d00710}, rkb_active_toppar_cnt = 1, rkb_active_toppar_next = 0xf1d00710, rkb_cgrp = 0x0, rkb_ts_fetch_backoff = 0, rkb_fetching = 0, rkb_state = RD_KAFKA_BROKER_STATE_SSL_HANDSHAKE, 
  rkb_ts_state = 5519115870416, rkb_timeout_scan_intvl = {ri_ts_last = 7629016360107, ri_fixed = 0, ri_backoff = 0}, rkb_blocking_request_cnt = {val = 0}, rkb_features = 6141, rkb_ApiVersions = 0xe8d14f08, 
  rkb_ApiVersions_cnt = 43, rkb_ApiVersion_fail_intvl = {ri_ts_last = 5519105605764, ri_fixed = 0, ri_backoff = 0}, rkb_source = RD_KAFKA_CONFIGURED, rkb_c = {tx_bytes = {val = 55821447}, tx = {val = 145670}, 
    tx_err = {val = 0}, tx_retries = {val = 0}, req_timeouts = {val = 5}, rx_bytes = {val = 11062390}, rx = {val = 145355}, rx_err = {val = 0}, rx_corrid_err = {val = 0}, rx_partial = {val = 0}, zbuf_grow = {
      val = 0}, buf_grow = {val = 0}, wakeups = {val = 457351}, connects = {val = 5}, disconnects = {val = 4}, reqtype = {{val = 145394}, {val = 0}, {val = 0}, {val = 268}, {val = 0} <repeats 14 times>, {
        val = 7}, {val = 0} <repeats 40 times>}, ts_send = {val = 0}, ts_recv = {val = 0}}, rkb_req_timeouts = 1, rkb_thread = 3918269248, rkb_refcnt = {val = 5}, rkb_rk = 0x5880bfe0, rkb_recv_buf = 0x0, 
  rkb_max_inflight = 1, rkb_outbufs = {rkbq_bufs = {tqh_first = 0x0, tqh_last = 0x5888f00c}, rkbq_cnt = {val = 0}, rkbq_msg_cnt = {val = 0}}, rkb_waitresps = {rkbq_bufs = {tqh_first = 0x0, 
      tqh_last = 0x5888f01c}, rkbq_cnt = {val = 0}, rkbq_msg_cnt = {val = 0}}, rkb_retrybufs = {rkbq_bufs = {tqh_first = 0x0, tqh_last = 0x5888f02c}, rkbq_cnt = {val = 0}, rkbq_msg_cnt = {val = 0}}, 
  rkb_avg_int_latency = {ra_v = {maxv = 0, minv = 0, avg = 0, sum = 0, cnt = 0, start = 0}, ra_lock = {__data = {__lock = 0, __count = 0, __owner = 0, __kind = 0, __nusers = 0, {__elision_data = {__espins = 0, 
            __elision = 0}, __list = {__next = 0x0}}}, __size = '\000' <repeats 23 times>, __align = 0}, ra_enabled = 0, ra_type = RD_AVG_GAUGE, ra_hdr = 0x0, ra_hist = {p50 = 0, p75 = 0, p90 = 0, p95 = 0, 
      p99 = 0, p99_99 = 0, oor = 0, hdrsize = 0, stddev = 0, mean = 0}}, rkb_avg_outbuf_latency = {ra_v = {maxv = 0, minv = 0, avg = 0, sum = 0, cnt = 0, start = 0}, ra_lock = {__data = {__lock = 0, 
        __count = 0, __owner = 0, __kind = 0, __nusers = 0, {__elision_data = {__espins = 0, __elision = 0}, __list = {__next = 0x0}}}, __size = '\000' <repeats 23 times>, __align = 0}, ra_enabled = 0, 
    ra_type = RD_AVG_GAUGE, ra_hdr = 0x0, ra_hist = {p50 = 0, p75 = 0, p90 = 0, p95 = 0, p99 = 0, p99_99 = 0, oor = 0, hdrsize = 0, stddev = 0, mean = 0}}, rkb_avg_rtt = {ra_v = {maxv = 0, minv = 0, avg = 0, 
      sum = 0, cnt = 0, start = 0}, ra_lock = {__data = {__lock = 0, __count = 0, __owner = 0, __kind = 0, __nusers = 0, {__elision_data = {__espins = 0, __elision = 0}, __list = {__next = 0x0}}}, 
      __size = '\000' <repeats 23 times>, __align = 0}, ra_enabled = 0, ra_type = RD_AVG_GAUGE, ra_hdr = 0x0, ra_hist = {p50 = 0, p75 = 0, p90 = 0, p95 = 0, p99 = 0, p99_99 = 0, oor = 0, hdrsize = 0, 
      stddev = 0, mean = 0}}, rkb_avg_throttle = {ra_v = {maxv = 0, minv = 0, avg = 0, sum = 0, cnt = 0, start = 0}, ra_lock = {__data = {__lock = 0, __count = 0, __owner = 0, __kind = 0, __nusers = 0, {
          __elision_data = {__espins = 0, __elision = 0}, __list = {__next = 0x0}}}, __size = '\000' <repeats 23 times>, __align = 0}, ra_enabled = 0, ra_type = RD_AVG_GAUGE, ra_hdr = 0x0, ra_hist = {p50 = 0, 
      p75 = 0, p90 = 0, p95 = 0, p99 = 0, p99_99 = 0, oor = 0, hdrsize = 0, stddev = 0, mean = 0}}, rkb_name = "ssl://xx.xx.xxx.xxx:9094/0\000otstrap", '\000' <repeats 221 times>, 
  rkb_nodename = "xx.xx.xxx.xxx:9094", '\000' <repeats 237 times>, rkb_port = 9094, rkb_origname = 0x587d1f80 "xx.xx.xxx.xxx", rkb_nodename_epoch = 0, rkb_connect_epoch = 0, 
  rkb_logname = 0xe8d1b810 "ssl://xx.xx.xxx.xxx:9094/0", rkb_logname_lock = {__data = {__lock = 0, __count = 0, __owner = 0, __kind = 0, __nusers = 0, {__elision_data = {__espins = 0, __elision = 0}, __list = {
          __next = 0x0}}}, __size = '\000' <repeats 23 times>, __align = 0}, rkb_wakeup_fd = {41, 42}, rkb_toppar_wakeup_fd = -1, rkb_reconnect_backoff_ms = 400, rkb_ts_reconnect = 5519116046046, 
  rkb_persistconn = {internal = 0, coord = {val = 0}}, rkb_monitors = {tqh_first = 0x5888f500, tqh_last = 0x5888f500}, rkb_coord_monitor = {rkbmon_link = {tqe_next = 0x0, tqe_prev = 0x5888f4f8}, 
    rkbmon_rkb = 0x5888ece0, rkbmon_q = 0x587ea110, rkbmon_cb = 0xf425e790 <rd_kafka_coord_rkb_monitor_cb>}, rkb_proto = RD_KAFKA_PROTO_SSL, rkb_down_reported = 1, rkb_sasl_kinit_refresh_tmr = {rtmr_link = {
      tqe_next = 0x0, tqe_prev = 0x0}, rtmr_next = 0, rtmr_interval = 0, rtmr_oneshot = 0 '\000', rtmr_callback = 0x0, rtmr_arg = 0x0}, rkb_suppress = {unsupported_compression = {ri_ts_last = 0, ri_fixed = 0, 
      ri_backoff = 0}, unsupported_kip62 = {ri_ts_last = 0, ri_fixed = 0, ri_backoff = 0}, unsupported_kip345 = {ri_ts_last = 0, ri_fixed = 0, ri_backoff = 0}, fail_error = {ri_ts_last = 5518576545665, 
      ri_fixed = 0, ri_backoff = 0}}, rkb_last_err = {
    errstr = "ApiVersionRequest failed: Local: Timed out: probably due to broker version < 0.10 (see api.version.request configuration)", '\000' <repeats 390 times>, err = RD_KAFKA_RESP_ERR__TRANSPORT, 
    cnt = 1}}
(gdb) p *rktrans->rktrans_ssl
$16 = {version = 771, type = 4096, method = 0xf4a234e0, rbio = 0xe8d16ae0, wbio = 0xe8d16ae0, bbio = 0x0, rwstate = 3, in_handshake = 0, handshake_func = 0xf49e7960 <ssl23_connect>, server = 0, 
  new_session = 0, quiet_shutdown = 0, shutdown = 0, state = 4640, rstate = 240, init_buf = 0xe8d21760, init_msg = 0x0, init_num = 0, init_off = 0, packet = 0xe8d0ad78 "", packet_length = 0, s2 = 0x0, 
  s3 = 0xe8d173e0, d1 = 0x0, read_ahead = 0, msg_callback = 0x0, msg_callback_arg = 0x0, hit = 0, param = 0xe8d04b08, cipher_list = 0x0, cipher_list_by_id = 0x0, mac_flags = 0, enc_read_ctx = 0x0, 
  read_hash = 0x0, expand = 0x0, enc_write_ctx = 0x0, write_hash = 0x0, compress = 0x0, cert = 0xe8d22810, sid_ctx_length = 0, sid_ctx = '\000' <repeats 31 times>, session = 0xe8d20420, 
  generate_session_id = 0x0, verify_mode = 1, verify_callback = 0xf6262490 <ssl_cert_verify_cb(int, x509_store_ctx_st*)>, info_callback = 0x0, error = 0, error_code = 0, kssl_ctx = 0xe8d049d8, 
  psk_client_callback = 0x0, psk_server_callback = 0x0, ctx = 0x5880cb00, debug = 0, verify_result = 0, ex_data = {sk = 0x0, dummy = 0}, client_CA = 0x0, references = 1, options = 50462724, mode = 17, 
  max_cert_list = 102400, first_packet = 0, client_version = 771, max_send_fragment = 16384, tlsext_debug_cb = 0x0, tlsext_debug_arg = 0x0, tlsext_hostname = 0x0, servername_done = 0, tlsext_status_type = -1, 
  tlsext_status_expected = 0, tlsext_ocsp_ids = 0x0, tlsext_ocsp_exts = 0x0, tlsext_ocsp_resp = 0x0, tlsext_ocsp_resplen = -1, tlsext_ticket_expected = 0, tlsext_ecpointformatlist_length = 0, 
  tlsext_ecpointformatlist = 0x0, tlsext_ellipticcurvelist_length = 0, tlsext_ellipticcurvelist = 0x0, tlsext_opaque_prf_input = 0x0, tlsext_opaque_prf_input_len = 0, tlsext_session_ticket = 0x0, 
  tls_session_ticket_ext_cb = 0x0, tls_session_ticket_ext_cb_arg = 0x0, tls_session_secret_cb = 0x0, tls_session_secret_cb_arg = 0x0, initial_ctx = 0x5880cb00, next_proto_negotiated = 0x0, 
  next_proto_negotiated_len = 0 '\000', srtp_profiles = 0x0, srtp_profile = 0x0, tlsext_heartbeat = 0, tlsext_hb_pending = 0, tlsext_hb_seq = 0, renegotiate = 0, alpn_client_proto_list = 0x0, 
  alpn_client_proto_list_len = 0}
(gdb) p *rktrans->rktrans_ssl->ctx
$17 = {method = 0xf4a234e0, cipher_list = 0x5888cf20, cipher_list_by_id = 0x5888bd90, cert_store = 0x587b1150, sessions = 0x5880ce28, session_cache_size = 20480, session_cache_head = 0x0, 
  session_cache_tail = 0x0, session_cache_mode = 2, session_timeout = 300, new_session_cb = 0x0, remove_session_cb = 0x0, get_session_cb = 0x0, stats = {sess_connect = 5, sess_connect_renegotiate = 0, 
    sess_connect_good = 4, sess_accept = 0, sess_accept_renegotiate = 0, sess_accept_good = 0, sess_miss = 0, sess_timeout = 0, sess_cache_full = 0, sess_hit = 0, sess_cb_hit = 0}, references = 3, 
  app_verify_callback = 0x0, app_verify_arg = 0x0, default_passwd_callback = 0xf42a0810 <rd_kafka_transport_ssl_passwd_cb>, default_passwd_callback_userdata = 0x5880bfe0, client_cert_cb = 0x0, 
  app_gen_cookie_cb = 0x0, app_verify_cookie_cb = 0x0, ex_data = {sk = 0x5888bc30, dummy = 0}, rsa_md5 = 0xf623d840, md5 = 0xf623d840, sha1 = 0xf623da80, extra_certs = 0x0, comp_methods = 0x587f19a0, 
  info_callback = 0x0, client_CA = 0x587d1fc0, options = 50462724, mode = 17, max_cert_list = 102400, cert = 0x5880cca0, read_ahead = 0, msg_callback = 0x0, msg_callback_arg = 0x0, verify_mode = 1, 
  sid_ctx_length = 0, sid_ctx = '\000' <repeats 31 times>, default_verify_callback = 0xf6262490 <ssl_cert_verify_cb(int, x509_store_ctx_st*)>, generate_session_id = 0x0, param = 0x587b0550, 
  quiet_shutdown = 0, max_send_fragment = 16384, client_cert_engine = 0x0, tlsext_servername_callback = 0x0, tlsext_servername_arg = 0x0, 
  tlsext_tick_key_name = "\300\000Dq\354&\255\367\254;?\246\354\306\325\312", tlsext_tick_hmac_key = "Q\365\306\312\207\034\034\345Jv\023}\254hT\035", 
  tlsext_tick_aes_key = "\265\366\203\217\311\234\351\062vM\317X3-\251C", tlsext_ticket_key_cb = 0x0, tlsext_status_cb = 0x0, tlsext_status_arg = 0x0, tlsext_opaque_prf_input_callback = 0x0, 
  tlsext_opaque_prf_input_callback_arg = 0x0, psk_identity_hint = 0x0, psk_client_callback = 0x0, psk_server_callback = 0x0, freelist_max_len = 32, wbuf_freelist = 0x587d20d8, rbuf_freelist = 0x587d2128, 
  next_protos_advertised_cb = 0x0, next_protos_advertised_cb_arg = 0x0, next_proto_select_cb = 0x0, next_proto_select_cb_arg = 0x0, srtp_profiles = 0x0, alpn_select_cb = 0x0, alpn_select_cb_arg = 0x0, 
  alpn_client_proto_list = 0x0, alpn_client_proto_list_len = 0, tlsext_ecpointformatlist_length = 0, tlsext_ecpointformatlist = 0x0, tlsext_ellipticcurvelist_length = 0, tlsext_ellipticcurvelist = 0x0}

It looks like the broker is in RD_KAFKA_BROKER_STATE_SSL_HANDSHAKE state and waiting for data from the socket. Once it gets data, it will call SSL_do_handshake again to continue the SSL handshake (rd_kafka_transport_io_serve -> rd_kafka_transport_io_event -> rd_kafka_transport_ssl_handshake -> SSL_do_handshake)

But because the remote end of the socket does not exist any more (broker was rebooted in the mean time), it never receives any data. And because TCP keepalive is disabled by default, the dead socket lingers forever.

 795 int rd_kafka_transport_io_serve (rd_kafka_transport_t *rktrans,
 796                                   int timeout_ms) {
...
 809         if ((r = rd_kafka_transport_poll(rktrans, timeout_ms)) <= 0)
 810                 return r;  // <<< always returns here because poll returns 0 (see strace)
 811 
 812         /* Only handle events on the broker socket, the wakeup
 813          * socket is just for waking up the blocking boll. */
 814         events = rktrans->rktrans_pfd[0].revents;
 815         if (events) {
 816                 rd_kafka_transport_poll_clear(rktrans, POLLOUT);
 817 
 818                 rd_kafka_transport_io_event(rktrans, events); // <<< needs to call this to resume the SSL handshake
 819         }

There doesn't seem to be a timeout for SSL handshake completion.

cschwarz73 commented 4 months ago

We figured out that this issue is actually fixed in 1.9.0

From the release notes:

Added socket.connection.setup.timeout.ms (default 30s).
The maximum time allowed for broker connection setups (TCP connection as
well as SSL and SASL handshakes) is now limited to this value.
This fixes the issue with stalled broker connections in the case of network
or load balancer problems.

The relevant commit is: https://github.com/confluentinc/librdkafka/commit/cd78ea46ae49a3241a0d767da0a93576cb138f60