getsentry / arroyo

A library to build streaming applications that consume from and produce to Kafka.
https://getsentry.github.io/arroyo/
Apache License 2.0
39 stars 6 forks source link

fix: Ensure carried over message is in buffer #283

Closed lynnagara closed 10 months ago

lynnagara commented 10 months ago

Since _run_once can early return (https://github.com/getsentry/arroyo/blob/6286c7921978065edeb5ce72d829031f952d2e5e/arroyo/processing/processor.py#L369) it was possible that a message was never placed in self.buffered_messages. If we try to retreive it later, it can crash the consumer.

This is suspected to be the cause of the Invalid message not found in buffer messages we saw in prod.

lynnagara commented 10 months ago

Wondering if it's better to remove the early return on line 369. This way we might be putting the same message in the buffer multiple times if we have backpressure.

lynnagara commented 10 months ago

@untitaker you're right, does this look right to you now?

untitaker commented 10 months ago

i believe we should ensure that poll returns successfully at least once before we send a message in. otherwise I suspect we might discover new ways to overload processing strategies.

an older version called poll in a tight loop instead of returning because of that, but that had its own problems.

lynnagara commented 10 months ago

i believe we should ensure that poll returns successfully at least once before we send a message in. otherwise I suspect we might discover new ways to overload processing strategies.

do you mean strategy.poll() rather than consumer.poll()? I don't think this can overload the strategy - we never poll on the consumer again or run this block of code for as long as there is a carried over message.

untitaker commented 10 months ago
 do you mean strategy.poll() rather than consumer.poll()? 

yes, I mean strategy poll