confluentinc / librdkafka

The Apache Kafka C/C++ library
Other
306 stars 3.15k forks source link

Crash (null-pointer access) in rd_kafka_metadata_cache_entry_by_id_cmp() during rd_avl_insert() #4778

Open GerKr opened 4 months ago

GerKr commented 4 months ago

Description

After about 2 months of usage a crash happened. The crashdump-file shows that it happened in the function rd_kafka_metadata_cache_entry_by_id_cmp(const void _a, const void _b), where _b is 0x0000000000000000. This leads to a write access to some address 0x0000000000000088. During the call of rd_avl_insert(ravl, elm, ran) the variable ravl contains: ravl->ravl_root == 0x12eaf651500 (no nullptr) and ravl->ravl_root.ran_height == 0x61657268 (converted to text it is "hrea" from the below found "hread_0") ravl->ravl_root.ran_elm == 0x0000000000000000 (nullptr!!!)

The memory where rafl->rafl_root points to looks like following: 0f 00 00 00 00 00 00 00 00 40 89 b8 2e 01 00 00 68 72 65 61 64 5f 30 00 00 00 00 00 00 00 00 00 0f 00 00 00 00 00 00 00 00 74 e2 b6 2e 01 00 00 06 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 0f 00 00 00 00 00 00 00 00 65 74 43 68 61 6e

The call-stack looks like following:

librdkafka.dll!rd_kafka_metadata_cache_entry_by_id_cmp(const void _a, const void _b) Line 700 C librdkafka.dll!rd_avl_insert_node(rd_avl_s ravl, rd_avl_node_s parent, rd_avl_node_s ran, rd_avl_node_s existing) Line 104 C librdkafka.dll!rd_kafka_metadata_cache_insert(rd_kafka_s rk, const rd_kafka_metadata_topic mtopic, const rd_kafka_metadata_topic_internal_s metadata_internal_topic, int64 now, int64 ts_expires, unsigned char include_racks, rd_kafka_metadata_broker_internal_s brokers_internal, unsigned __int64 broker_cnt) Line 380 C librdkafka.dll!rd_kafka_metadata_cache_topic_update(rd_kafka_s rk, const rd_kafka_metadata_topic mdt, const rd_kafka_metadata_topic_internal_s mdit, unsigned char propagate, unsigned char include_racks, rd_kafka_metadata_broker_internal_s brokers, unsigned __int64 broker_cnt, unsigned char only_existing) Line 508 C librdkafka.dll!rd_kafka_parse_Metadata0(rd_kafka_broker_s rkb, rd_kafka_buf_s request, rd_kafka_buf_s rkbuf, rd_kafka_metadata_internal_s mdip, rd_list_s request_topics, const char reason) Line 857 C librdkafka.dll!rd_kafka_parse_Metadata(rd_kafka_broker_s rkb, rd_kafka_buf_s request, rd_kafka_buf_s rkbuf, rd_kafka_metadata_internal_s mdip) Line 1113 C librdkafka.dll!rd_kafka_handle_Metadata(rd_kafka_s rk, rd_kafka_broker_s rkb, rd_kafka_resp_err_t err, rd_kafka_buf_s rkbuf, rd_kafka_buf_s request, void opaque) Line 2490 C librdkafka.dll!rd_kafka_buf_callback(rd_kafka_s rk, rd_kafka_broker_s rkb, rd_kafka_resp_err_t err, rd_kafka_buf_s response, rd_kafka_buf_s request) Line 512 C librdkafka.dll!rd_kafka_buf_handle_op(rd_kafka_op_s rko, rd_kafka_resp_err_t err) Line 453 C librdkafka.dll!rd_kafka_op_handle_std(rd_kafka_s rk, rd_kafka_q_s rkq, rd_kafka_op_s rko, int cb_type) Line 884 C librdkafka.dll!rd_kafka_op_handle(rd_kafka_s rk, rd_kafka_q_s rkq, rd_kafka_op_s rko, rd_kafka_q_cb_type_t cb_type, void opaque, rd_kafka_op_res_t()(rd_kafka_s , rd_kafka_q_s , rd_kafka_op_s , rd_kafka_q_cb_type_t, void ) callback) Line 916 C librdkafka.dll!rd_kafka_q_serve(rd_kafka_q_s rkq, int timeout_ms, int max_cnt, rd_kafka_q_cb_type_t cb_type, rd_kafka_op_res_t()(rd_kafka_s , rd_kafka_q_s , rd_kafka_op_s , rd_kafka_q_cb_type_t, void ) callback, void opaque) Line 581 C librdkafka.dll!rd_kafka_thread_main(void arg) Line 2138 C librdkafka.dll!_thrd_wrapper_function(void aArg) Line 589 C kernel32.dll!00007ffd36527e94() Unknown ntdll.dll!RtlUserThreadStart() Unknown

How to reproduce

I don't know how to reproduce, as within the last 2 months it did not happen. It seems to be a sporadic problem. The crashdump and the according pdb-file and sources (v2.4.0) are available. On request I can deliver the values of variables or memory dumps.

Checklist

Please provide the following information:

GerKr commented 4 months ago

For memory-dump-decoding: I use a 64bit release version of librdkafka.

marcin-krystianc commented 2 months ago

Hi,

I've also encountered this issue and can reproduce it, though not consistently. While investigating the problem, I don't fully understand it. However, I've identified that it started occurring between v2.3.0 and v2.4.0 (https://github.com/confluentinc/librdkafka/pull/4676). I can confirm the issue still affects v2.5.3.

How to reproduce it

My test scenario involves re-creating (removing and creating) many topics quickly and producing to newly created topics.

C#/C++ - Start the [test cluster](https://github.com/marcin-krystianc/KafkaPlayground/tree/master/docker/compose-cluster3) - Run the [KafkaTool](https://github.com/marcin-krystianc/KafkaPlayground/tree/master/java/KafkaTool): ``` dotnet run --project KafkaTool.csproj -- \ producer \ --config allow.auto.create.topics=false \ --config bootstrap.servers=localhost:40001,localhost:40002,localhost:40003 \ --topics=3000 \ --partitions=1 \ --replication-factor=3 \ --min-isr=2 \ --messages-per-second=1000 \ --config request.timeout.ms=180000 \ --config message.timeout.ms=180000 \ --config request.required.acks=-1 \ --config enable.idempotence=true \ --config max.in.flight.requests.per.connection=1 \ --config topic.metadata.propagation.max.ms=60000 \ --producers=1 \ --recreate-topics-delay=700 \ --recreate-topics-batch-size=500 ```

Sample stack-trace

librdkafka.so!rd_kafka_metadata_cache_entry_by_id_cmp(const void * _a, const void * _b) (\workspace\librdkafka\src\rdkafka_metadata_cache.c:700)
librdkafka.so!rd_avl_insert_node(rd_avl_t * ravl, rd_avl_node_t * parent, rd_avl_node_t * ran, rd_avl_node_t ** existing) (\workspace\librdkafka\src\rdavl.c:104)
librdkafka.so!rd_avl_insert_node(rd_avl_t * ravl, rd_avl_node_t * parent, rd_avl_node_t * ran, rd_avl_node_t ** existing) (\workspace\librdkafka\src\rdavl.c:119)
librdkafka.so!rd_avl_insert(rd_avl_t * ravl, void * elm, rd_avl_node_t * ran) (\workspace\librdkafka\src\rdavl.h:211)
librdkafka.so!rd_kafka_metadata_cache_insert(rd_kafka_t * rk, const rd_kafka_metadata_topic_t * mtopic, const rd_kafka_metadata_topic_internal_t * metadata_internal_topic, rd_ts_t now, rd_ts_t ts_expires, rd_bool_t include_racks, rd_kafka_metadata_broker_internal_t * brokers_internal, size_t broker_cnt) (\workspace\librdkafka\src\rdkafka_metadata_cache.c:380)
librdkafka.so!rd_kafka_metadata_cache_topic_update(rd_kafka_t * rk, const rd_kafka_metadata_topic_t * mdt, const rd_kafka_metadata_topic_internal_t * mdit, rd_bool_t propagate, rd_bool_t include_racks, rd_kafka_metadata_broker_internal_t * brokers, size_t broker_cnt, rd_bool_t only_existing) (\workspace\librdkafka\src\rdkafka_metadata_cache.c:502)
librdkafka.so!rd_kafka_parse_Metadata0(rd_kafka_broker_t * rkb, rd_kafka_buf_t * request, rd_kafka_buf_t * rkbuf, rd_kafka_metadata_internal_t ** mdip, rd_list_t * request_topics, const char * reason) (\workspace\librdkafka\src\rdkafka_metadata.c:858)
librdkafka.so!rd_kafka_parse_Metadata(rd_kafka_broker_t * rkb, rd_kafka_buf_t * request, rd_kafka_buf_t * rkbuf, rd_kafka_metadata_internal_t ** mdip) (\workspace\librdkafka\src\rdkafka_metadata.c:1111)
librdkafka.so!rd_kafka_handle_Metadata(rd_kafka_t * rk, rd_kafka_broker_t * rkb, rd_kafka_resp_err_t err, rd_kafka_buf_t * rkbuf, rd_kafka_buf_t * request, void * opaque) (\workspace\librdkafka\src\rdkafka_request.c:2546)
librdkafka.so!rd_kafka_buf_callback(rd_kafka_t * rk, rd_kafka_broker_t * rkb, rd_kafka_resp_err_t err, rd_kafka_buf_t * response, rd_kafka_buf_t * request) (\workspace\librdkafka\src\rdkafka_buf.c:509)
librdkafka.so!rd_kafka_buf_handle_op(rd_kafka_op_t * rko, rd_kafka_resp_err_t err) (\workspace\librdkafka\src\rdkafka_buf.c:452)
librdkafka.so!rd_kafka_op_handle_std(rd_kafka_t * rk, rd_kafka_q_t * rkq, rd_kafka_op_t * rko, int cb_type) (\workspace\librdkafka\src\rdkafka_op.c:905)
librdkafka.so!rd_kafka_op_handle(rd_kafka_t * rk, rd_kafka_q_t * rkq, rd_kafka_op_t * rko, rd_kafka_q_cb_type_t cb_type, void * opaque, rd_kafka_q_serve_cb_t * callback) (\workspace\librdkafka\src\rdkafka_op.c:945)
librdkafka.so!rd_kafka_q_serve(rd_kafka_q_t * rkq, int timeout_ms, int max_cnt, rd_kafka_q_cb_type_t cb_type, rd_kafka_q_serve_cb_t * callback, void * opaque) (\workspace\librdkafka\src\rdkafka_queue.c:578)
librdkafka.so!rd_kafka_thread_main(void * arg) (\workspace\librdkafka\src\rdkafka.c:2143)
libc.so.6!start_thread(void * arg) (pthread_create.c:444)
libc.so.6!__GI___clone3() (clone3.S:78)

More details

I built the library with the ASAN (Address Sanitizer) option, which helped me understand the problem better. In the rd_kafka_metadata_cache_delete method, the memory pointed to by the rkmce pointer is freed, but that memory is still referenced from the rk_metadata_cache.rkmc_avl_by_id tree. The Address Sanitizer reports a "use after free" exception when traversing the entire tree immediately after freeing the rkmce pointer, but not before.