microsoft / Trill

Trill is a single-node query processor for temporal or streaming data.
MIT License
1.25k stars 133 forks source link

questions about the FlushPolicy #132

Closed Ohyoukillkenny closed 4 years ago

Ohyoukillkenny commented 4 years ago

From Trill's documentation about its FlushPolicy, I found:

FlushPolicy: This policy specifies when to flush batched output events through the entire query from the ingress site.

  • None: Does not automatically flush. Output events will be batched and egressed normally.
  • FlushOnPunctuation (non-partitioned streams only): When a punctuation is ingressed or generated, a flush will also be propagated through the query. This is the default policy for nonpartitioned
  • FlushOnBatchBoundary: When a batch is filled on ingress, a flush will be propagated through the query.

I have two questions with regard to the above description. First of all, why data is always flushed when the batch is full in practice? Is the BatchBoundary the size of the batch? In practice, I set the input stream of type {payload:long, startTime:long, endTime:long} as

{1, 1, infinity}, {2, 2, infinity}, {3, 3, infinity}, {4, 4, infinity}, {5, 5, infinity}

Then, I set the size of the batch as 2:

Config.DataBatchSize = 2;

Also, every time when the input is produced, I print out it on its emission:

public class InputLongStream : IObservable<long>
{
    private readonly long N;
    public InputLongStream(long N) {
        this.N = N;
    }

    public IDisposable Subscribe(IObserver<long> observer)
    {
        for (long i = 1; i <= this.N; i++) {
            Console.WriteLine("input sends: " + i);
            observer.OnNext(i);
        }
        observer.OnCompleted();

        // since no more input will be sent, the subscription shall be empty
        return Disposable.Empty;
    }
}

Then, after I set the FlushPolicy as FlushOnPunctuation when I aggregate the sum of payloads, I expect to see the flush of data only at the end of the stream. However, what I observed was that the data still got flushed when the batch was full. Here is my executing code:

public static void Run()
{
    Config.DataBatchSize = 2;

    IObservable<long> rxStream = new InputLongStream(5);

    IObserver<StreamEvent<long>> rxConsumer = new Printer<StreamEvent<long>>();

    // ingress the rx stream into Trill
    IObservableIngressStreamable<long> ingressStream =
        rxStream.Select(x => StreamEvent.CreateInterval(x, StreamEvent.InfinitySyncTime, x))
            .ToStreamable(DisorderPolicy.Throw(), FlushPolicy.FlushOnPunctuation);

    // egress Trill's output into rx
    IObservable<StreamEvent<long>> outputStream = 
        ingressStream
            .Sum(x => x)
            .ToStreamEventObservable();

    // execution of the query
    outputStream.Subscribe(rxConsumer);
}

And here is the cmd outputs:

input sends: 1
input sends: 2
Received: [Start: 1,1]
Received: [End: 2,1,1]
input sends: 3
input sends: 4
Received: [Start: 2,3]
Received: [End: 3,2,3]
Received: [Start: 3,6]
Received: [End: 4,3,6]
input sends: 5
Received: [Start: 4,10]
Received: [End: 5,4,10]
Received: [Start: 5,15]
Received: [Punctuation: +inf]
End of the Stream

It looks like FlushOnPunctuation has the same functionality as FlushOnBatchBoundary.

By the way, the second question is when the flush policy is None that "output events will be batched and egressed normally", what does "normally" mean exactly? When I set the flush policy to be None in the above code, I observed exactly the same cmd outputs. I am very confused by the difference between these FlushPolicies.

Could anyone help me to understand these policies? I will really appreciate it!!

peterfreiling commented 4 years ago

FlushPolicy.None (or "egressed normally") means that every operator will independently buffer events until the number of buffered outputs reaches the batch size (Config.DataBatchSize), at which point the operator will egress that batch/pass it to the next downstream operator. Since any given operator may egress a different number of events from what it receives (e.g., Where operator may egress fewer events, or SelectMany may egress more events), operators will egress at different times. A flush differs from this behavior, as a flush is propagated throughout the entire query, so every operator will egress all output events that are ready, regardless of whether the batch is full or not. So, FlushPolicy.FlushOnBatchBoundary will wait for the ingress batch to reach the max size (or batch boundary), then flush the entire pipeline from that ingress site. So if you have ingress followed by a single Where operator that reduces the number of output events, those events will now be egressed through to the output even though the Where operator's output buffer was not full, whereas under FlushPolicy.None, the Where operator would wait until ingress passes the next batch(es) until the buffered output reaches the batch size. FlushPolicy.FlushOnPunctuation will flush the query in response to any punctuation.

Ohyoukillkenny commented 4 years ago

Thanks a lot! I get the idea. Thanks again for your time and your discussion.