tulios / kafkajs

A modern Apache Kafka client for node.js
https://kafka.js.org
MIT License
3.75k stars 527 forks source link

Data race in e2e tests: consumer is not up and running after calling `.run()` #1629

Open Baptiste-Garcin opened 1 year ago

Baptiste-Garcin commented 1 year ago

Hi,

Describe the bug I am facing a really frustrating issue while coding some e2e tests around kafka consumers. This issue only triggers during CI making it really hard to reproduce.

I am testing a component that subscribe to a topic A, execute some business logic and publish the result in a topic B. In order to test it properly, I add a dummyConsumer who will receive the event from topic B and make it available for a bunch of expect.

My problem is that most of the times, the first test fails because the dummyConsumer never received the message to be tested. I am using wait-for-expect so I am waiting around 5 seconds before concluding that the message is lost somewhere. I notice, and that's the workaround I am using, that if I explicitly wait for 1 second between subscribing with the dummy consumer and starting to publish the message, my issue is gone.

To Reproduce 1) Start a consumer that subscribe to topic A and forward every message to topic B 2) Start a consumer (dummyConsumer) that subscribe to topic B 3) Start a series of test that publish in topic A and run expect on the message received in topic B

As it is a data race, here are my hooks and what they do:

Expected behavior Every message should be received by the dummyConsumer.

Observed behavior The first (and sometimes second) messages never reach the dummyConsumer. If I add a timeOut of 1 second before starting each test (so in the beforeEach), this issue is gone. This last workaround makes me think that the promise returned by consumer.run() does not imply that the consumer is up and ready to receive message.

Environment:

Additional context I checked the "Instrumentation Events" documentation but I didn't find a relevant event. I tried waiting for some of those signals but I never succeed in avoiding to wait explicitly 1 second between each test.

wrslatz commented 8 months ago

I was able to work around this issue by using https://kafka.js.org/docs/admin#a-name-reset-offsets-by-timestamp-a-reset-consumer-group-offsets-by-timestamp implemented in https://github.com/tulios/kafkajs/pull/604.

Specifically, using

await admin.setOffsets({ groupId, topic, partitions: await admin.fetchTopicOffsetsByTimestamp(topic, timestamp) })

and using a consumer on the same groupId and topic causes that consumer to start consuming messages at the required offset. We then followed this example for a promise that resolves when a matching message is found within consumer.run.

Hope this helps!