microsoft / Trill

Trill is a single-node query processor for temporal or streaming data.
MIT License
1.25k stars 133 forks source link

how to implement non-invertible user-defined aggregates? #130

Open Ohyoukillkenny opened 4 years ago

Ohyoukillkenny commented 4 years ago

In addition to Trill’s built-in aggregates (i.e., Count, Sum, Average, Max, Min, TopK), Trill provides a framework for users to create their own custom aggregates by implementing the IAggregate interface.

public interface IAggregate<TInput, TState, TResult>
{
    Expression<Func<TState>> InitialState();
    Expression<Func<TState, long, TInput, TState>> Accumulate();
    Expression<Func<TState, long, TInput, TState>> Deaccumulate();
    Expression<Func<TState, TState, TState>> Difference();
    Expression<Func<TState, TResult>> ComputeResult();
}

However, it looks to me that Trill assumes the user-defined aggregates are invertible and associative, as it requires users to specify the Deaccumulate and the Difference methods.

My question is "Is there a way to implement an aggregate that is meant to be non-invertible in Trill?"

More specifically, how can we implement an operator such as finite impulse response (FIR) filtering that follows the computation as below:

In general, the FIR filtering computes the dot product of FIR parameters and every five elements in the stream. As the first step, I think a hopping window shall be applied to the input stream. But I am stuck at designing a user-defined aggregate to address the remaining computations.

Can anyone help me to get through this problem? I will really appreciate it.

arunkm commented 4 years ago

You can start with a simple approach like this:

  1. Keep state as Deque < (ts, val) > (deque of tuple of ts, val)
  2. Accumulate : Add the entry to the state (or merge if same timestamp) optionally, optimize memory by removing older elements that would not contribute to the result.
  3. De-accumulate : Remove the entry from deque/state If you have optimized in Accumulate by reducing elements, you need to appropriately handle here.
  4. Difference : Difference the two deques/states.
  5. ComputeResult : a. Select K most recent items from the State/deque (where k is the required items for the FIR product). b. Compute out_{t} = f1*val_{t-2} + f2*val_{t-1} + f3*val_{t} with values obtained as above

Then check if you can optimize further by representing the state in a more concise (less memory) form.

Ohyoukillkenny commented 4 years ago

You can start with a simple approach like this:

  1. Keep state as Deque < (ts, val) > (deque of tuple of ts, val)
  2. Accumulate : Add the entry to the state (or merge if same timestamp) optionally, optimize memory by removing older elements that would not contribute to the result.
  3. De-accumulate : Remove the entry from deque/state If you have optimized in Accumulate by reducing elements, you need to appropriately handle here.
  4. Difference : Difference the two deques/states.
  5. ComputeResult : a. Select K most recent items from the State/deque (where k is the required items for the FIR product). b. Compute out_{t} = f1*val_{t-2} + f2*val_{t-1} + f3*val_{t} with values obtained as above

Then check if you can optimize further by representing the state in a more concise (less memory) form.

Thanks for the reply, and it is very helpful, and I successfully implement the FIR operator according to your suggestion. May I ask is there any other way to implement customized operators? I find actually it is arduous to implement aggregators when the deaccumulate and the difference methods are not easy or "unnatural" to be implemented.