swift-server / swift-kafka-client

Apache License 2.0
81 stars 20 forks source link

Crash when sending message with headers #150

Open blindspotbounty opened 10 months ago

blindspotbounty commented 10 months ago

For some situations, headers could be freed by librdkafka thus leading to crash:

Thread 5 Queue : com.apple.root.user-initiated-qos.cooperative (concurrent)
#0  0x0000000189d4d11c in __pthread_kill ()
#1  0x0000000189d84cc0 in pthread_kill ()
#2  0x0000000189c94a40 in abort ()
#3  0x0000000189babb08 in malloc_vreport ()
#4  0x0000000189baf3f4 in malloc_report ()
#5  0x0000000189bc3ebc in find_zone_and_free ()
#6  0x0000000106c1a644 in rd_free at <path>/Sources/Crdkafka/librdkafka/src/rd.h:151
#7  0x0000000106c1a570 in rd_list_destroy_elems at <path>/Sources/Crdkafka/librdkafka/src/rdlist.c:284
#8  0x0000000106c1a5dc in rd_list_destroy at <path>/Sources/Crdkafka/librdkafka/src/rdlist.c:298
#9  0x0000000106af0fe4 in rd_kafka_headers_destroy at <path>/Sources/Crdkafka/librdkafka/src/rdkafka_header.c:37
#10 0x0000000106b60008 in rd_kafka_produceva at <path>/Sources/Crdkafka/librdkafka/src/rdkafka_msg.c:520
#11 0x0000000106d0ee88 in RDKafkaClient._produceVariadic(topicHandle:partition:messageFlags:key:value:opaque:cHeaders:) at <path>/Sources/Kafka/RDKafka/RDKafkaClient.swift:231
#12 0x0000000106d0df1c in closure #1 in closure #1 in closure #1 in RDKafkaClient.produce<τ_0_0, τ_0_1>(message:newMessageID:topicConfiguration:topicHandles:) at <path>/Sources/Kafka/RDKafka/RDKafkaClient.swift:150
#13 0x0000000106d1667c in partial apply for closure #1 in closure #1 in closure #1 in RDKafkaClient.produce<τ_0_0, τ_0_1>(message:newMessageID:topicConfiguration:topicHandles:) ()
#14 0x0000000106d0fbec in static RDKafkaClient._withKafkaCHeadersRecursive<τ_0_0>(kafkaHeaders:cHeaders:_:) at <path>/Sources/Kafka/RDKafka/RDKafkaClient.swift:280
#15 0x0000000106d0feb4 in closure #1 in static RDKafkaClient._withKafkaCHeadersRecursive<τ_0_0>(kafkaHeaders:cHeaders:_:) at <path>/Sources/Kafka/RDKafka/RDKafkaClient.swift:301
#16 0x0000000106d1683c in partial apply for closure #1 in static RDKafkaClient._withKafkaCHeadersRecursive<τ_0_0>(kafkaHeaders:cHeaders:_:) ()
#17 0x0000000199692518 in String.withCString<τ_0_0>(_:) ()
#18 0x0000000106d0fb24 in static RDKafkaClient._withKafkaCHeadersRecursive<τ_0_0>(kafkaHeaders:cHeaders:_:) at <path>/Sources/Kafka/RDKafka/RDKafkaClient.swift:287
#19 0x0000000106d10220 in closure #1 in closure #1 in static RDKafkaClient._withKafkaCHeadersRecursive<τ_0_0>(kafkaHeaders:cHeaders:_:) at <path>/Sources/Kafka/RDKafka/RDKafkaClient.swift:292
#20 0x0000000106d1688c in partial apply for closure #1 in closure #1 in static RDKafkaClient._withKafkaCHeadersRecursive<τ_0_0>(kafkaHeaders:cHeaders:_:) ()
#21 0x00000001064fd70c in ByteBuffer.withUnsafeReadableBytes<τ_0_0>(_:) at /Users/pav/Library/Developer/Xcode/DerivedData/swift-kafka-ordo-bmzbbfqgoulzpcazwapzymnuanmm/SourcePackages/checkouts/swift-nio/Sources/NIOCore/ByteBuffer-core.swift:704
#22 0x0000000106d1001c in closure #1 in static RDKafkaClient._withKafkaCHeadersRecursive<τ_0_0>(kafkaHeaders:cHeaders:_:) at <path>/Sources/Kafka/RDKafka/RDKafkaClient.swift:289
#23 0x0000000106d1683c in partial apply for closure #1 in static RDKafkaClient._withKafkaCHeadersRecursive<τ_0_0>(kafkaHeaders:cHeaders:_:) ()
#24 0x0000000199692518 in String.withCString<τ_0_0>(_:) ()
#25 0x0000000106d0fb24 in static RDKafkaClient._withKafkaCHeadersRecursive<τ_0_0>(kafkaHeaders:cHeaders:_:) at <path>/Sources/Kafka/RDKafka/RDKafkaClient.swift:287
#26 0x0000000106d0f06c in static RDKafkaClient.withKafkaCHeaders<τ_0_0>(for:_:) at <path>/Sources/Kafka/RDKafka/RDKafkaClient.swift:267
#27 0x0000000106d0dc68 in closure #1 in closure #1 in RDKafkaClient.produce<τ_0_0, τ_0_1>(message:newMessageID:topicConfiguration:topicHandles:) at <path>/Sources/Kafka/RDKafka/RDKafkaClient.swift:148
#28 0x0000000106d161a4 in partial apply for closure #1 in closure #1 in RDKafkaClient.produce<τ_0_0, τ_0_1>(message:newMessageID:topicConfiguration:topicHandles:) ()
#29 0x0000000106d0f848 in closure #1 in closure #1 in static RDKafkaClient.withMessageKeyAndValueBuffer<τ_0_0, τ_0_1, τ_0_2>(for:_:) at <path>/Sources/Kafka/RDKafka/RDKafkaClient.swift:249
#30 0x0000000106d16588 in partial apply for closure #1 in closure #1 in static RDKafkaClient.withMessageKeyAndValueBuffer<τ_0_0, τ_0_1, τ_0_2>(for:_:) ()
#31 0x0000000106ce4d10 in closure #1 in String.withUnsafeBytes<τ_0_0>(_:) at <path>/Sources/Kafka/Data/String+KafkaContiguousBytes.swift:22
#32 0x0000000106ce4d80 in partial apply for closure #1 in String.withUnsafeBytes<τ_0_0>(_:) ()
#33 0x000000019980ad80 in String.UTF8View.withContiguousStorageIfAvailable<τ_0_0>(_:) ()
#34 0x0000000106ce49b8 in String.withUnsafeBytes<τ_0_0>(_:) at <path>/Sources/Kafka/Data/String+KafkaContiguousBytes.swift:19
#35 0x0000000106ce4e20 in protocol witness for KafkaContiguousBytes.withUnsafeBytes<τ_0_0>(_:) in conformance String ()
#36 0x0000000106d0f6dc in closure #1 in static RDKafkaClient.withMessageKeyAndValueBuffer<τ_0_0, τ_0_1, τ_0_2>(for:_:) at <path>/Sources/Kafka/RDKafka/RDKafkaClient.swift:248
#37 0x0000000106d1634c in partial apply for closure #1 in static RDKafkaClient.withMessageKeyAndValueBuffer<τ_0_0, τ_0_1, τ_0_2>(for:_:) ()
#38 0x0000000106ce4d10 in closure #1 in String.withUnsafeBytes<τ_0_0>(_:) at <path>/Sources/Kafka/Data/String+KafkaContiguousBytes.swift:22
#39 0x0000000106ce4d80 in partial apply for closure #1 in String.withUnsafeBytes<τ_0_0>(_:) ()
#40 0x000000019980ad80 in String.UTF8View.withContiguousStorageIfAvailable<τ_0_0>(_:) ()
#41 0x0000000106ce49b8 in String.withUnsafeBytes<τ_0_0>(_:) at <path>/Sources/Kafka/Data/String+KafkaContiguousBytes.swift:19
#42 0x0000000106ce4e20 in protocol witness for KafkaContiguousBytes.withUnsafeBytes<τ_0_0>(_:) in conformance String ()
#43 0x0000000106d0f30c in static RDKafkaClient.withMessageKeyAndValueBuffer<τ_0_0, τ_0_1, τ_0_2>(for:_:) at <path>/Sources/Kafka/RDKafka/RDKafkaClient.swift:246
#44 0x0000000106d0d4dc in closure #1 in RDKafkaClient.produce<τ_0_0, τ_0_1>(message:newMessageID:topicConfiguration:topicHandles:) at <path>/Sources/Kafka/RDKafka/RDKafkaClient.swift:133
#45 0x0000000106d14678 in partial apply for closure #1 in RDKafkaClient.produce<τ_0_0, τ_0_1>(message:newMessageID:topicConfiguration:topicHandles:) ()
#46 0x0000000106d1a330 in RDKafkaTopicHandles.withTopicHandlePointer<τ_0_0>(topic:topicConfiguration:_:) at <path>/Sources/Kafka/RDKafka/RDKafkaTopicHandles.swift:51
#47 0x0000000106d0d120 in RDKafkaClient.produce<τ_0_0, τ_0_1>(message:newMessageID:topicConfiguration:topicHandles:) at <path>/Sources/Kafka/RDKafka/RDKafkaClient.swift:129
#48 0x0000000106d04f70 in KafkaProducer.send<τ_0_0, τ_0_1>(_:) at <path>/Sources/Kafka/KafkaProducer.swift:257
#49 0x00000001061360b8 in closure #2 in closure #1 in KafkaTests.testProduceAndConsumeWithMessageHeaders() at <path>/Tests/IntegrationTests/KafkaTests.swift:375
#50 0x000000010614b014 in partial apply for closure #2 in closure #1 in KafkaTests.testProduceAndConsumeWithMessageHeaders() ()

Crash is around:

rdkafka_msg.c:521:17
   518                  rd_kafka_topic_destroy0(rkt);
   519  
   520          if (hdrs)
-> 521                  rd_kafka_headers_destroy(hdrs);
   522  
   523          rd_assert(error != NULL);
   524          return error;

I used slightly modified test for headers:

diff --git a/Tests/IntegrationTests/KafkaTests.swift b/Tests/IntegrationTests/KafkaTests.swift
index e6cf82e..4b92338 100644
--- a/Tests/IntegrationTests/KafkaTests.swift
+++ b/Tests/IntegrationTests/KafkaTests.swift
@@ -332,7 +332,7 @@ final class KafkaTests: XCTestCase {

     func testProduceAndConsumeWithMessageHeaders() async throws {
         let testMessages = Self.createTestMessages(
-            topic: self.uniqueTestTopic,
+            topic: "unknowntopic",
             headers: [
                 KafkaHeader(key: "some.header", value: ByteBuffer(string: "some-header-value")),
                 KafkaHeader(key: "some.null.header", value: nil),
@@ -340,7 +340,7 @@ final class KafkaTests: XCTestCase {
             count: 10
         )

-        let (producer, events) = try KafkaProducer.makeProducerWithEvents(configuration: self.producerConfig, logger: .kafkaTest)
+        let producer = try KafkaProducer(configuration: self.producerConfig, logger: .kafkaTest)

         var consumerConfig = KafkaConsumerConfiguration(
             consumptionStrategy: .group(id: "commit-sync-test-group-id", topics: [self.uniqueTestTopic]),
@@ -366,11 +366,12 @@ final class KafkaTests: XCTestCase {

             // Producer Task
             group.addTask {
-                try await Self.sendAndAcknowledgeMessages(
-                    producer: producer,
-                    events: events,
-                    messages: testMessages
-                )
+                let sleepInterval: Duration = .seconds(120)
+
+                for message in testMessages {
+                    try await Task.sleep(for: sleepInterval)
+                    try producer.send(message)
+                }
             }

             // Consumer Task

Expected behaviour: exception thrown instead of crash

blindspotbounty commented 7 months ago

Prepared an issue with asan stacks: https://github.com/confluentinc/librdkafka/issues/4627