Is your feature request related to a problem? Please describe.
Add internal telemetry for how much the output of the system is behind current time, i.e. end-to-end lag introduced by the pipeline.
Describe the solution you'd like
Each core keeps the current timestamp in a member variable named current_timestamp_, defined in reducer/core.h. This timestamp gets updated with timestamps coming from incoming (received) messages (see Core::handle_rpc in reducer/core.cc). Ingest cores time-stamp originating messages using the system clock (the monotonic() function). Result: current end-to-end lag can be calculated in aggregation cores as monotonic() + get_boot_time() - current_timestamp_.
We can add a new internal stat named processing_time (or processing_lag, TBD), per aggregation core.
Is your feature request related to a problem? Please describe.
Add internal telemetry for how much the output of the system is behind current time, i.e. end-to-end lag introduced by the pipeline.
Describe the solution you'd like
Each core keeps the current timestamp in a member variable named
current_timestamp_
, defined in reducer/core.h. This timestamp gets updated with timestamps coming from incoming (received) messages (seeCore::handle_rpc
in reducer/core.cc). Ingest cores time-stamp originating messages using the system clock (themonotonic()
function). Result: current end-to-end lag can be calculated in aggregation cores asmonotonic() + get_boot_time() - current_timestamp_
.We can add a new internal stat named
processing_time
(orprocessing_lag
, TBD), per aggregation core.Describe alternatives you've considered
No response
Additional context
No response