kaskada-ai / kaskada

Modern, open-source event-processing
https://kaskada.io/
Apache License 2.0
348 stars 15 forks source link

Partitioned: Interpolation of Merge outputs #834

Open jordanrfrazier opened 10 months ago

jordanrfrazier commented 10 months ago

Summary

Merge uses a special spread kernel that keeps track of past values for each column that are being merged. The behavior may be latched or unlatched, depending on if it is interpolated as continuous or discrete.

The new partitioned code attempts to simplify the output types by using

Batch { time: TimestampNanosecondArray, subsort: UInt64Array, key_hash: Uint64Array, data: StructArray }

where data keeps the individual columns. This allows us to work with datatypes rather than pass a separate Schema and individual columns around. It also simplifies accessing columns of a merge output -- rather than have to name each column and lookup by name, we can keep them in each side's output structs, and access via that side/index. This avoids naming collisions.

However, a consequence of that is we can no longer (with the current implementation) interpolate each column individually. Since the values are now in a struct array, each column will be interpreted as part of the struct value at that time, meaning that we cannot support latched and unlatched values within the same input.

For example, If we have {a: 5} at 10:00 AM, and {a: 6} at 11:00 AM, and {c: 4} at 10:30 AM, we likely expect:

{a: 5, a_sum: 5, c: null}
{a: null, a_sum: 5, c: 4}
{a: 6, a_sum: 11, c: null}

But with interpolating each struct individually, we would see

{a: 5, a_sum: 5, c: null}
{a: null, a_sum: null, c: 4}
{a: 6, a_sum: 11, c: null}

Ideas

Fastest to parity:

The likely fastest way to get back to parity with the old merge behavior is to flatten the data back into individual columns. It's possible we'll have refactoring needed to work with named columns rather than indices, but it's preferable to revert to existing behavior we think works.

Future ideas:

The logical -> physical conversion may be able to detect interpolation of merge columns, and add a project after that uses the spread kernel.

Step 0: Source0

Step 1: Project(Step0)
  { a: step0.a,
    a_sum: step0.a.sum(),
  }

Step 2: Source1

Step 3: Merge (Step 1, Step 2)

Step 4: Project(Step 3)
  { 
    a: step3.step1.a,
    a_sum: step3.step1.a_sum.spread(???),
    c: step3.step2.c
  }

The benefits of this approach are:

Open questions: