swift-server / swift-kafka-client

Apache License 2.0
83 stars 23 forks source link

Store offset on .finishing may lead to fatalError #119

Closed blindspotbounty closed 1 year ago

blindspotbounty commented 1 year ago

If client still iterating through async sequence but triggerGracefulShutdown() was called, it may lead to fatal error:

        func storeOffset() -> StoreOffsetAction {
            switch self.state {
            case .uninitialized:
                fatalError("\(#function) invoked while still in state \(self.state)")
            case .initializing:
                fatalError("Subscribe to consumer group / assign to topic partition pair before committing offsets")
            case .consumptionStopped:
                fatalError("Cannot store offset when consumption has been stopped")
            case .consuming(let client, _):
                return .storeOffset(client: client)
            case .finishing, .finished:
                fatalError("\(#function) invoked while still in state \(self.state)") <---- here
            }
        }

Unlike for consumptionStopped, I believe it should be allowed to continue iterate on graceful shutdown until end of current sequence.

FranzBusch commented 1 year ago

It's a tricky one. We should definitely avoid the error somehow, but what do we do about the offsets? It would be very unexpected to me if you iterated the messages and then next time iterate them again because we haven't committed the offset. We should probably wait until all messages have been consumed and then transition to finished. cc @felixschlegel

felixschlegel commented 1 year ago

TL;DR It is ok to store message offsets when we are in the state .finishing, because we receive no new messages after triggerGracefulShutdown and all changes will be committed before closing completely.

librdkafka does not fetch any new messages after we invoke triggerGracefulShutdown (so librdkafka consumer close) and enqueues RD_KAFKA_OP_TERMINATE on its local queue. Once the RD_KAFKA_OP_TERMINATE operation is reached and enable.auto.commit (aka isAutoCommitEnabled) is true, librdkafka commits all offsets. Therefore it is ok to store message offsets when we are in the state .finishing, because we receive no new messages after triggerGracefulShutdown and all changes will be committed before closing completely.


Here is my analysis on this topic:

(Stacktrace for when triggerGracefulShutdown is invoked)

triggerGracefulShutdown -> RDKafkaClient.consumerClose -> rd_kafka_consumer_close_queue -> ... -> rd_kafka_cgrp_terminate -> enqueue RD_KAFKA_OP_TERMINATE on librdkafka's internal replyq

(Backtrace for how final offsets are committed on consumer termination)

librdkafkas replyq processed in rd_kafka_cgrp_op_serve

rd_kafka_cgrp_op_serve -> case RD_KAFKA_OP_TERMINATE -> rd_kafka_cgrp_terminate0 -> rd_kafka_assignment_serve -> rd_kafka_assignment_serve_removals -> (rd_kafka_toppar_op_fetch_stop) + rd_kafka_cgrp_assigned_offsets_commit (See implementation)

felixschlegel commented 1 year ago

Addendum:

As it turns out, still consuming and storing the offset after triggerGracefulShutdown was invoked can result in a RD_KAFKA_RESP_ERR__STATE error being triggered here:

https://github.com/confluentinc/librdkafka/blob/49f180a36c247100dc246379ace84833b91f4038/src/rdkafka_offset.h#L102

This is due to the partition being unassinged already (caused by the shutdown process).

Conclusion: I would say we allow reading and storing offsets in while in the .finishing state, though at the risk of throwing.

blindspotbounty commented 1 year ago

Thank you for the fix!