swift-server / swift-kafka-client

Apache License 2.0
83 stars 24 forks source link

Consumer autocommit lead to crash #99

Closed blindspotbounty closed 1 year ago

blindspotbounty commented 1 year ago

With consumer enableAutoCommit option library crashes with fatalError at:

    private func handleOffsetCommitEvent(_ event: OpaquePointer?) {
        guard let opaquePointer = rd_kafka_event_opaque(event) else { <--- here
            fatalError("Could not resolve reference to catpured Swift callback instance")
        }

After some time (I guess after 5 seconds). It is possible to either disable commit events in that case with enabled_events, either just skip events without opaque pointer.

With auto commit disabled this seems not reproducible.

felixschlegel commented 1 year ago

Alright, I know where this is coming from. What happens here is the following:

When we have enableAutoCommit = false, we use this method to manually commit offsets:

Click to expand ```swift func commitSync(_ message: KafkaConsumerMessage) async throws { // Declare captured closure outside of withCheckedContinuation. // We do that because do an unretained pass of the captured closure to // librdkafka which means we have to keep a reference to the closure // ourselves to make sure it does not get deallocated before // commitSync returns. var capturedClosure: CapturedCommitCallback! try await withCheckedThrowingContinuation { continuation in capturedClosure = CapturedCommitCallback { result in continuation.resume(with: result) } // The offset committed is always the offset of the next requested message. // Thus, we increase the offset of the current message by one before committing it. // See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945 let changesList = RDKafkaTopicPartitionList() changesList.setOffset( topic: message.topic, partition: message.partition, offset: Int64(message.offset + 1) ) // Unretained pass because the reference that librdkafka holds to capturedClosure // should not be counted in ARC as this can lead to memory leaks. let opaquePointer: UnsafeMutableRawPointer? = Unmanaged.passUnretained(capturedClosure).toOpaque() changesList.withListPointer { listPointer in rd_kafka_commit_queue( self.kafkaHandle, listPointer, self.mainQueue, nil, opaquePointer ) } } } ```

Which does an unretained pass of this CapturedClosure and then waits for the func handleOffsetCommit(_:) method to execute the closure.

The problem we are running into here is that when enableAutoCommit = true, we don't pass a CapturedClosure to the commit, but our code still expects something in func handleOffsetCommit(_:).

Fixing this would either mean failing softly as in:

guard let opaquePointer = rd_kafka_event_opaque(event) else {
     // No captured closure set (enableAutoCommit = true)
     return
}

Or only expecting a value here after we checked that enableAutoCommit == true.

Thanks for raising this issue!

blindspotbounty commented 1 year ago

Yes, exactly! I think that it is possible to remove commit event from poll with enabled_events=0x7fffffff & ~(RD_KAFKA_EVENT_OFFSET_COMMIT) or as suggested above by softly bypassing that situation

blindspotbounty commented 1 year ago

Oh, it is even already in place, just small addition:

// KafkaConsumer
        let client = try RDKafkaClient.makeClient(
            type: .consumer,
            configDictionary: config.dictionary,
            events: [.log, .fetch]  /* just add */+ config.enableAutoCommit ? [] : [.offsetCommit],
            logger: logger
        )