faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.65k stars 183 forks source link

DatadogMonitor - IndexError: deque index out of range #113

Closed pwilczynskiclearcode closed 3 years ago

pwilczynskiclearcode commented 3 years ago

Steps to reproduce

import faust
from faust.sensors.datadog import DatadogMonitor

app = faust.App(..., monitor=DatadogMonitor(prefix="faustapp"))

process a stream with manual acknowledgement:

@app.agent(users_topic)
async def process_users(users_stream):
    async for event in users_stream.noack().events():
        ...
        await users_stream.ack(event)
        yield None

Expected behavior

stream to be processed without issues

Actual behavior

Agent crashes and restarts

Full traceback

[^----Agent*: __main__.process_users]: Crashed reason="IndexError('deque index out of range')"
IndexError: deque index out of range
  File "faust/agents/agent.py", line 688, in _execute_actor
    await coro
  File "faust/agents/agent.py", line 710, in _slurp
    async for value in it:
  File "faust.py", line 458, in process_users
    await process_users.ack(event)
  File "faust/streams.py", line 961, in ack
    self._on_stream_event_out(tp, offset, self, event)
  File "faust/sensors/base.py", line 196, in on_stream_event_out
    sensor.on_stream_event_out(
  File "faust/sensors/datadog.py", line 186, in on_stream_event_out
    self.secs_to_ms(self.events_runtime[-1]),

Versions

pwilczynskiclearcode commented 3 years ago

class Stream(StreamT[T_co], Service):
    ....

    async def _py_aiter(self) -> AsyncIterator[T_co]:
       ...

        try:
            while not self.should_stop:
                event = None
                do_ack = self.enable_acks  # set to False to not ack event.
                # wait for next message
                value: Any = None
                # we iterate until on_merge gives value.
                while value is None and event is None:
                    ...

                    if isinstance(channel_value, event_cls):
                       ...

                        # call Sensors
                        sensor_state = on_stream_event_in(tp, offset, self, event)
                       ...
                     ...
                if value is skipped_value:
                    continue
                self.events_total += 1
                try:
                    yield value
                finally:
                    self.current_event = None
                    if do_ack and event is not None:
                        # This inlines self.ack
                        last_stream_to_ack = event.ack()
                        message = event.message
                        tp = event.message.tp
                        offset = event.message.offset
                        on_stream_event_out(tp, offset, self, event, sensor_state)
                        ...

    ...

    async def ack(self, event: EventT) -> bool:
        ...
        # WARNING: This function is duplicated in __aiter__
        last_stream_to_ack = event.ack()
        message = event.message
        tp = message.tp
        offset = message.offset
        self._on_stream_event_out(tp, offset, self, event)
        ...

When calling Stream.ack() manually don't have access to sensor_state which is different to _py_aiter and _c_aiter which propagate sensor_state to on_stream_event_out.

pwilczynskiclearcode commented 3 years ago

119 partial fix

pwilczynskiclearcode commented 3 years ago

I think that the issue in general still requires proper solution. https://github.com/faust-streaming/faust/pull/119 is a fix preventing exceptions but full solution is to write the code in a way that manually acknowledged events produce datadog/statsd/prometheus metrics correctly