kaskada-ai / kaskada

Modern, open-source event-processing
https://kaskada.io/
Apache License 2.0
348 stars 15 forks source link

Partitioned: Split watermark from Batch #836

Open jordanrfrazier opened 10 months ago

jordanrfrazier commented 10 months ago

Summary

The current Batch passes around the watermark with optional data.

#[derive(Clone, PartialEq, Debug)]
pub struct Batch {
    /// The data associated with the batch.
    pub(crate) data: Option<BatchInfo>,

    /// An indication that the batch stream has completed up to the given time.
    /// Any rows in future batches on this stream must have a time strictly
    /// greater than this.
    pub up_to_time: RowTime,
}

Many evaluators are thus forced to reason about the presence / absence of the watermark and data without really needing to. A good refactoring to simplify logic / readability would be to separate the watermark from the batch, and only pass each where they are needed.

Possible Solution

#[must_use]
pub struct Watermark(RowTime);

pub struct WatermarkedBatch {
  batch: Option<Batch>,
  watermark: Watermark,
}

impl WatermarkedBatch {
  pub fn take(self) -> (Option<Batch>, Watermark) { ... }
}

So:

  1. The only way to get the batch is to call take
  2. When you call take you get the Watermark
  3. Once you have the Watermark you must use it