ArroyoSystems / arroyo

Distributed stream processing engine in Rust
https://arroyo.dev
Apache License 2.0
3.81k stars 220 forks source link

Add function for getting current wallclock time #687

Open hazelnut-99 opened 4 months ago

hazelnut-99 commented 4 months ago

Hi! I am trying to add a processing time timestamp to each row with the following query:

select now() as current_ts, "timestampMillis" as original_ts, some_other_column
from my_table

However, it seems that the now() function is executed only once, resulting in a static current_ts field that does not update over time.

Are there any keywords or functions that can generate a real-time timestamp for each record in the output?

mwylde commented 4 months ago

Thanks for reporting. There are at least four possible definitions for "current time" in arroyo

  1. The time the query is started (which is now() in Arroyo)
  2. The current clock time
  3. The event time of the row it's called on
  4. The current watermark time

Currently arroyo just implements the first one (via now()), but others would be useful as well. The second one is easy to get in a UDF today, but the other two would need to be built in.

In Flink, now() is the current clock time (2) in streaming mode, but query start time (1) in batch mode. We might want to adopt that behavior, but I think the current version of now() is also useful in some cases and is worth preserving. Adding functions for 3 and 4 would also be great (in Flink 4 is CURRENT_WATERMARK which seems like a fine name).

Writing new functions is pretty straightforward so this could be a great first issue for someone.

kpe commented 1 month ago

I'm interested in obtaining the current event time of a row (i.e. 3 above). Tried to look quickly how this could be implemented, but couldn't find a similar function? (~I actually was not able to find the implementation of any of the standard functions in the source code~ I just found extract_json_string)

@mwylde - can you give me some guidance, where to look at?

Can something like row_time() be implemented as an UDF? Is there a UDF type that has access to the row "attributes" like the event_time? Or should the _timestamp field, I see in the source code, just be somehow made accessible over SQL?

mwylde commented 1 month ago

Hey @kpe — happy to help walk through how you'd implement this. row_time() (i.e., 3 in the list above) can't be implemented as a UDF. This is because the expression trees are planned from SQL with DataFusion, which doesn't know about the _timestamp field as its not actually part of the SQL schema. We add it in as part of various rewrites of the plan (you can find the various calls to add_timestamp_field) throughout arroyo-planner).

So instead we would need to define a placeholder UDF, like hop/tumble/session:

https://github.com/ArroyoSystems/arroyo/blob/1d95e13dcad1afe4debc67fd078290a4cc6d5f15/crates/arroyo-planner/src/lib.rs#L137-L139

This would then need to be rewritten (by traversing through the logical plan and all expressions) into an expression that gets the _timestamp field.