cda-group / arcon

State-first Streaming Applications in Rust
https://cda-group.github.io/arcon/
Apache License 2.0
175 stars 17 forks source link

Explicit Keyed Streams #284

Closed Max-Meldrum closed 2 years ago

Max-Meldrum commented 2 years ago

Currently key(s) are declared through the Arcon derive macro that makes sure the get_key() function is implemented.

#[derive(Arcon, Arrow, prost::Message, Copy, Clone)]
#[arcon(unsafe_ser_id = 12, reliable_ser_id = 13, version = 1, keys = "id")]
pub struct Event {
    #[prost(uint64)]
    pub id: u64,
    #[prost(float)]
    pub data: f32,
}

This is however bit confusing when working with the API. Would be better to have it more explicit and introduce a KeyExtractor that the runtime can use (see below).

let mut app = Application::with_conf(conf)
    .iterator((0..1000000).map(|x| Event { id: x, data: 1.5 }), |conf| {
        conf.set_timestamp_extractor(|x: &Event| x.id);
    })
    .key_by(|event| event.id) // KeyExtractor 
     // KeyedStream?
    .operator(...)
segeljakt commented 2 years ago

It looks nice, I think a benefit with this is that you can have multiple key extractors for the same event type. Is the type of the stream produced by iterator also a keyed stream or something else?

Max-Meldrum commented 2 years ago

It looks nice, I think a benefit with this is that you can have multiple key extractors for the same event type. Is the type of the stream produced by iterator it also a keyed stream?

I take it would just be a Stream. iterator() -> Stream -> key_by(..) -> KeyedStream

We just have Stream right now, I guess we have to introduce more stream types such as in Flink Stream / KeyedStream / WindowStream?

segeljakt commented 2 years ago

Flink seems to have:

Max-Meldrum commented 2 years ago

One possible way of defining key_by. Use Builder notion as in other parts of the API and require an extractor + description of how the stream is keyed.

let mut app = Application::with_conf(conf)
    .iterator((0..1000000).map(|x| Event { id: x, data: 1.5 }), |conf| {
        conf.set_timestamp_extractor(|x: &Event| x.id);
    }) // Stream<Event>
    .key_by(KeyBuilder {
         extractor: Arc::new(|event| event.id),
         description: "User Name ID",
     }) // KeyedStream<Event>