faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.65k stars 183 forks source link

Event runtime metric is zero when using `stream.take(...)` #319

Open Jarvis1Tube opened 2 years ago

Jarvis1Tube commented 2 years ago

Checklist

Steps to reproduce

Make agent, that uses take over stream. Also setup Prometheus monitor for Faust app.

import asyncio

from faust import App
from faust.sensors.prometheus import setup_prometheus_sensors

app = App('app-name', broker='kafka://localhost')
setup_prometheus_sensors(app)

@app.agent(topic)
async def start_process_versions(stream: StreamT) -> None:
    async for messages in stream.take(max_=10, within=5):
        await asyncio.gather(asyncio.sleep(1) for _ in messages)

Expected behavior

Sending events_runtime_ms_sum metric with some values, probably about 10_000 ms.

Actual behavior

Sending events_runtime_ms_bucket metric with zero value.

Versions

artem-ilin commented 6 days ago

I recently came across the same issue. I spent some time digging and here is what I found. When using stream.take() (or even with stream.filter().take()) the acks for the filtered events are disabled. So, the events that didn't comply the filter are getting automatically acked and hence they will trigger the call of on_stream_event_out: https://github.com/faust-streaming/faust/blob/da2d10e65826bd26d2b434f6dcc67d7457702d4a/faust/streams.py#L1224-L1232 Most importantly, here it passes the sensor_state to the callback.

On the other hand, with the events that have been chosen by stream.filter(), or if you use stream.take() without filtering, manual acking works: https://github.com/faust-streaming/faust/blob/da2d10e65826bd26d2b434f6dcc67d7457702d4a/faust/streams.py#L1255-L1274 And it doesn't pass the sensor_state (because it doesn't have it). Later there is a check whether the state exists (it doesn't) and this is the reason the metric is not calculated. https://github.com/faust-streaming/faust/blob/da2d10e65826bd26d2b434f6dcc67d7457702d4a/faust/sensors/prometheus.py#L381-L384

If we could somehow save the sensor_state before yielding the events, so we could use it in ack() method, that would solve the issue.

@wbarnha what do you think? Maybe you have a good idea on how to implement this? I've seen there some contextvars, maybe we could go that way?

UPD: I use the current version of faust-streaming - 0.11.3