swift-server / swift-kafka-client

Apache License 2.0
83 stars 23 forks source link

`KafkaConsumer` may crash if deinit called before run #147

Closed blindspotbounty closed 11 months ago

blindspotbounty commented 1 year ago

I've accidentally discovered two cases when KafkaConsumer may crash with fatal error.

There is a simplified test:

/// KafkaConsumerTests.swift
    func testConsumerMessagesConstructDeinit() async throws {
        let uniqueGroupID = UUID().uuidString
        let config = KafkaConsumerConfiguration(
            consumptionStrategy: .group(id: uniqueGroupID, topics: ["this-topic-does-not-exist"]),
            bootstrapBrokerAddresses: []
        )

        _ = try KafkaConsumer(configuration: config, logger: .kafkaTest) // deinit called before run
    }

Output:

Test Case '-[KafkaTests.KafkaConsumerTests testConsumerMessagesConstructDeinit]' started.
Kafka/KafkaConsumer.swift:890: Fatal error: Subscribe to consumer group / assign to topic partition pair before reading messages

Other, similar but slightly different scenario: cancel consumption task before serviceGroup.run()

/// KafkaConsumerTests.swift
    func testConsumerMessagesReadCancelledBeforeRun() async throws {
        let uniqueGroupID = UUID().uuidString
        let config = KafkaConsumerConfiguration(
            consumptionStrategy: .group(id: uniqueGroupID, topics: ["this-topic-does-not-exist"]),
            bootstrapBrokerAddresses: []
        )

        let consumer = try KafkaConsumer(configuration: config, logger: .kafkaTest)

        let svcGroupConfig = ServiceGroupConfiguration(services: [consumer], logger: .kafkaTest)
        let serviceGroup = ServiceGroup(configuration: svcGroupConfig)

        // explicitly run and cancel message consuming task before serviceGroup.run()
        let consumingTask = Task {
            for try await record in consumer.messages {
                XCTFail("Unexpected record \(record))")
            }
        }

        try await Task.sleep(for: .seconds(1))

        // explicitly cancel message consuming task before serviceGroup.run()
        consumingTask.cancel()

        try await withThrowingTaskGroup(of: Void.self) { group in
            // Run Task
            group.addTask {
                try await serviceGroup.run()
            }

            try await Task.sleep(for: .seconds(1))

            // Shutdown the serviceGroup
            await serviceGroup.triggerGracefulShutdown()
        }
    }

Output:

Test Case '-[KafkaTests.KafkaConsumerTests testConsumerMessagesReadCancelledBeforeRun]' started.
Kafka/KafkaConsumer.swift:890: Fatal error: Subscribe to consumer group / assign to topic partition pair before reading messages

Could you suggest if it is a bug or misuse, please?

FranzBusch commented 1 year ago

Good catch. Our state machine is too strict here. We should allow those transitions and just see them as moving to finished right away probably.