Calls to consumer.Assign([]kafka.TopicPartition)
or
consumer.Subscribe()
result in partitions either being directly assigned (Assign) or via group management (rebalancing).
But there is an indeterminate few second gap between assignment and calls to consumer.Poll() actually returning messages in dev/CI testing.
We have integration tests that start consuming, publish a message then wait for it to arrive before shutting down the connections.
=== BUT ===
we find the test hangs because it take some time to setup so our publish always happens 'too fast' and so consume latest offset strategy waits for ever.
Goal: to achieve synchronous Assign, Subscribe and Close calls and orchestrate stable tests without relying on sleeps.
How to reproduce
consumer.Subscribe(offset.Latest)
//start goroutine poll loop
producer.Produce(event)
//wait for message
NOTE: if we consume from the earliest offset we always get the message, but we need to test for usecases where only events published after subscription started are returned.
We are trying to avoid using sleeps which are unstable in slow CI environments - and see if there is a way to determine that LIBRDKAFKA actually did the handshaking and set everything up - both consumer.Assign/AssignAll and Subscribe appear to be async calls.
Checklist
Please provide the following information:
[ ] confluent-kafka-go and librdkafka version (LibraryVersion()): confluent-kafka-go (at least 0.11.4) librdkafka: 0.11.5 [yum info librdkafka check]
[ ] Provide client logs (with "debug": ".." as necessary) N/A as API query
[ ] Provide broker log excerpts N/A as API query
[ ] Critical issue NO. Trying to improve integration test stability & performance by orchestrating turn-taking with channels rather than sleeps.
Would be good for Subscribe, Assign and Close to be synchronous calls that return once librdkafka has set everything up and going or torn the processes down in a perfect world - but there may be good technical reasons not to - or I've not used the API correctly.
Description
Calls to consumer.Assign([]kafka.TopicPartition) or consumer.Subscribe()
result in partitions either being directly assigned (Assign) or via group management (rebalancing).
But there is an indeterminate few second gap between assignment and calls to consumer.Poll() actually returning messages in dev/CI testing.
We have integration tests that start consuming, publish a message then wait for it to arrive before shutting down the connections.
=== BUT ===
we find the test hangs because it take some time to setup so our publish always happens 'too fast' and so consume latest offset strategy waits for ever.
Goal: to achieve synchronous Assign, Subscribe and Close calls and orchestrate stable tests without relying on sleeps.
How to reproduce
consumer.Subscribe(offset.Latest) //start goroutine poll loop producer.Produce(event) //wait for message
NOTE: if we consume from the earliest offset we always get the message, but we need to test for usecases where only events published after subscription started are returned.
We are trying to avoid using sleeps which are unstable in slow CI environments - and see if there is a way to determine that LIBRDKAFKA actually did the handshaking and set everything up - both consumer.Assign/AssignAll and Subscribe appear to be async calls.
Checklist
Please provide the following information:
LibraryVersion()
): confluent-kafka-go (at least 0.11.4) librdkafka: 0.11.5 [yum info librdkafka check]ConfigMap{...}
brokers, acks=-1, linger=100, session timeout 30000, other settings."debug": ".."
as necessary) N/A as API queryWould be good for Subscribe, Assign and Close to be synchronous calls that return once librdkafka has set everything up and going or torn the processes down in a perfect world - but there may be good technical reasons not to - or I've not used the API correctly.
Thanks for the input,
Alex