microsoft / Trill

Trill is a single-node query processor for temporal or streaming data.
MIT License
1.25k stars 133 forks source link

Trill multicast vs publish #52

Open tadeuszwojcik opened 5 years ago

tadeuszwojcik commented 5 years ago

Hello,

I'd like to experiment with a Trill lib a little bit and was wondering what is the best approach for use case where there is a single 'source stream' for example trading data, like individual trades and many queries for such source stream, where first query would be Where that would narrow down trades to individual instruments. In a writing queries guide I've read about multicast that is necessary for such use case, but when looking at the source code I've also found about Publish . I'm not really familiar with rx so that is a little bit confusing for me when to use Multicast vs Publish , what would you suggest? Or perhaps it's better to create separate streams instead of single one for each Where so then multicast is not necessary? Do you know any guidelines, lessons learned about that? I'd love to read more, but couldn't find anything in docs.

I've also seen mentions about partitioning in the source code, is this only related to group operator or something that could also be useful for my use case?

I'll be setting low batch size (< 5, maybe less), are there any settings that I could tweak for very near real-time queries to get best perf, sacrificing throughput, but getting lowest latency possible ?

Thanks a lot!

badrishc commented 5 years ago

Multicast would be the best fit when you have a source that you would like to use to feed to a fixed number (known a priori) of receiver sub-queries. The source is Subscribed to exactly once, Trill inress, batching, and/or columnarization occur exactly once, and the same data is fed to all the multicast subscribers. The Subscribe to source happens as soon as the required number of Subscribe operations are performed on the Multicast endpoint.

Publish is the dynamic version -- you create a Publish endpoint that anyone can dynamically Subscribe to even runtime. The (single) Subscribe to upstream occurs when you call Connect on the endpoint. Any new subscribers after a connect simply receive the stream starting from that point forward. Note that because such a subscribe latches on to the stream mid-stream, the user needs to be careful not to use end edges, because then you could have an end edge without a corresponding start edge, which would be a malformed stream. For this reason, I would avoid Publish unless you know what you are doing.

A third option is to use neither, just call Subscribe on the source separately for each query. This results in multiple Subscribe calls being made to the source, which may be more expensive (as each Subscribe will have its own Trill ingress, batching, etc.), but in this case, the source becomes responsible for generating a correct stream to each subscriber independently.

cybertyche commented 5 years ago

The "partitioned" versions of the operators that you see in the code has to do with a feature called Partitioned Streams. If you ingress data as PartitionedStreamEvents or specify a partition lambda at ingress, you essentially turn this feature on.

What the feature does is allow Trill to handle multiple timelines - one per key - instead of a single one. Handling disorder, for instance, becomes a per-partition concern. Ordinarily, time in Trill is considered a global construct that is uniform across all data that is seen. With partitions, each individual partition is allowed to progress time individually. The downside is that you cannot then query across partitions; whatever query you specify is applied per-partition.

For example, consider a scenario where you have 10k sensors measuring temperature, and you want to find the maximum temperature per sensor per day. Without partitions, the time that each sensor's data is measured against a single advancing timeline. The disorder policies are applied globally. That means that if 100 of those 10k sensors are lagging well behind, then you will either not see results until they have caught up or that lagging data will be either dropped or have their time adjusted.

However, given that the query is returning answers per-sensor, there is really no reason for one sensor's data being behind or ahead to impact any other sensor's data. That's what partitions allow - each sensor will have its own timeline that is not impacted by any other.

tadeuszwojcik commented 5 years ago

@badrishc , @cybertyche I really appreciate your thoughtful responses, thank you!

@badrishc I think I get it now and I'll use Publish I think, as from what I understand it's perf characteristics are the same as Multicast but it's more flexible for my use case, I'm perfectly fine with it's drawbacks (missing events if subscribing later on, I also don't use edge events). Am I understanding it correctly that I call streamable.Publish() only once, something like:

var streamSource = new Subject<StreamEvent<SensorEvent>>();

var streamable = streamSource.ToStreamable();

var connectableStreamable = streamable.Publish();

connectableStreamable.Connect();

var query1 = connectableStreamable
                 .Where(...).ToStreamEventObservable();
var query2 = connectableStreamable
                 .Where(...).ToStreamEventObservable();

await foreach (var sensorEvent in client.SensorEventsStream()) 
{
   streamSource.OnNext(StreamEvent.CreatePoint(DateTimeOffset.UtcNow.Ticks, sensorEvent));
}

And that code subscribes once to ingress observable, so Trill ingress, batching, and/or columnarization occurs exactly once. If I wouldn't use publish then each query would cause separate subscription for underlying ingress observable and that would be less performant, please correct me if I'm wrong. BTW, is pattern as in code above of going from IAsyncEnumerable to Observable correct assuming I don't have Observable to begin with?

@cybertyche all makes sense, thanks for the details, very useful info.

I'd also like to ask how I could add high availability for queries as in example above? In docs I've seen example where there is a single source->single query, but I'm not sure how to implement that for single-source/ingress-> multiple queries? I know I need to register both input and output, but in that case input is one and it's shared.

Regarding optimization for low latency, are there any special tweaks/knobs I should know about? Like using row storage vs column storage or vice versa, small buffers size, or dropping of out of order messages? Or it's a matter of experimentation for given use case?

Do you recommend using structs as stream event payload? Or that would be the case only for small structs? Currently I'm using readonly structs but given that there is no way to pass those by reference (in operator) it won't bring any benefits, am I right?

EDIT: I wanted to write query that will calculate at most every 1 millisecond sum of events from last minute, here's what I got, is it correct?:

connectableStreamable
                 .Where(...)
                 .HoppingWindowLifetime(TimeSpan.FromMinutes(1).Ticks, TimeSpan.FromMilliseconds(1).Ticks)
                 .Sum(x => x.Size)
                 .AlterEventDuration(1)
                 .ToStreamEventObservable();

I've noticed if I remove AlterEventDuration(1) it will always produce two values one from previous computation and then current one, I'm bit lost why is this the case. Also could it be written as such (like hoppin window but with duration 1) ?:

connectableStreamable
                 .Where(...)
                 .AlterEventLifetime(origTime=> (1 + ((origTime - 1) / ONE_MS_TICKS)) * ONE_MS_TICKS, 1)
                 .Sum(x => x.Size)
                 .ToStreamEventObservable();

Query I'd really want to write would be one minute sliding window, but that would produce only last value on flush/punctuation, something like:

connectableStreamable
                 .Where(...)
                 .AlterEventDuration(TimeSpan.FromMinutes(1).Ticks)
                 .Sum(x => x.Size)
                 .ToStreamEventObservable();

but it produced new data on every new input, what I'd like it to do is to produce new data on flush (punctuation/full buffer) as flush happens after whole batch of input is produced ,but I think it's not possible so second best thing for me is that hopping window re-calculated every 1 millisecond. Or are there other alternatives?

Thanks a lot!

cybertyche commented 5 years ago

I'm going to write a longer response to cover more of the above, but I wanted to say one thing quickly. Just because your input events are not start-end edge pairs does not mean that they won't be generated by operators in the Trill query. Whenever possible, Trill operators try to produce intervals, but sometimes for latency reduction they will produce start-end edge pairs so that downstream computation is not affected.

It's actually what you're seeing in your sample query when you remove the AlterEventDuration(1) call - you're not seeing duplicate data, you're seeing start/end edge pairs.

More to come...

tadeuszwojcik commented 5 years ago

@cybertyche oh, that makes sense, thank you! Looking forward for more of your thoughts. Maybe I should also clarify what I'd like to achieve in general. There will be one input stream (IAsyncEnumerable to be precise) with data and many various queries for it (aggregations, grouping etc) using sliding or hopping windows with various window sizes. I'd like to add high availability for it as well so wouldn't have to replay everything back on each restart, but as mentioned not sure how to implement that in one input, many outputs scenario. Input events will be point events ingested in batches (between 1 and 50 or so) and after each batch I'd like to 'flush' all queries and produce outputs.

I'd like to achieve constant low latency if possible as well, sacrificing overall throughput as I don't expect more than 500-10000 events per second.

I've experimented with setting DataBatchSize to 1 but that is not optimal as it produces outputs for each event in batch obviously.

I've experimented with adding punctuation events after each batch being added to input stream with startTime = lastEventTicks + 1 (as otherwise last event won't be flushed from what I understand). That seems to be working, but I can't achieve reliable latency characteristics, sometimes it's 10 microseconds, sometimes it's 10 milliseconds (I measured calls to streamSource.OnNext(StreamEvent.CreatePunctuation where stream source is rx Subject). Setting ForceRowBasedExecution wasn't helping either, in general it was slower.

I've tried sliding windows (so output is produced for each input) and hopping windows (where period was set to one millisecond in ticks), but that also didn't help with reliable latency. What I'd like to achieve is more or less constant latency it can be like 2ms but always in that ballpark (0-2ms), not 0.05ms and 10ms sometimes. Would partitioning stream help here? Is something like that even possible with Trill? I find this lib pretty innovating and fascinating, but perhaps it's not tailored for my use case and I shouldn't fight with it. Would appreciate your thoughts on that.

BTW, sliding/hopping windows I've experimented with never reset to 0 if no event is provided during window period (there were punctuation events only), is that expected? Thanks!

cybertyche commented 5 years ago

Curious the behavior you are seeing about windows not resetting to 0 - we're not seeing that behavior. If you have a repro for that that is small, please let us know and we will investigate.

One thing that may be helpful for you is rather than injecting punctuation to induce flushing, you can set an ingress policy of FlushPolicy.FlushOnBatchBoundary. That will induce a flush whenever the ingress batch is full.

Another thing is that a single query can have multiple outputs with no trouble in Trill - you can register a single ingress point and multiple egress points in a QueryContainer object and have each output subscribed to separately.

A couple of questions about your scenario that may help me figure out what may be the next step.

tadeuszwojcik commented 5 years ago

Sure, I've created simple project (.net core 3 preview) with repro - https://github.com/tadeuszwojcik/trill-sample . I bet it's me not doing something incorrect ... just not sure what.

Regarding QueryContainer, ok that makes sense, just to make sure I understand it correctly for sample code like below (uses publish):

var streamSource = new Subject<StreamEvent<SensorEvent>>();

var streamable = streamSource.ToStreamable();

var connectableStreamable = streamable.Publish();

connectableStreamable.Connect();

var query1 = connectableStreamable
                 .Where(...);
var query2 = connectableStreamable
                 .Where(...);

I could register two QueryContainers , first for (streamSource, query1) and second for (streamSource, query2) correct?

My data is already sorted by time, I think most of the queries would be done per-key, yes, I've already looked a little bit into partitioned streams. Regarding latency constraints, being below threshold is good enough for me.

Thanks!

cybertyche commented 5 years ago

I'm looking at our code for QueryContainer and the Publish method - I'm thinking it's not going to work in its current form because QueryContainer needs to know all of the input and output registry sites in order to do checkpoint restoration properly. I'm looking into this though and may have a different solution for you.

I do think I've gotten to the bottom of the repro. The issue is that by default Trill does not output data when a group has no data. Think of it like SQL in this respect - when using a GROUP BY clause, one only gets aggregate results returned for groups that contain data. Trill followed this precedent.

Now there is a way to change this. Instead of your call to .Count(), you can instead call: .Aggregate(w => w.Count().OutputDefaultWhenEmpty())

Count() is just a macro that expands to Aggregate(w => w.Count()), and OutputDefaultWhenEmpty is a method that modifies an aggregate expression to override the default behavior of not outputting when there is no matching data.

cybertyche commented 5 years ago

Also, as your data is pre-sorted, you may be able to benefit from a faster ingress method. Instead of a Subject<StreamEvent> you can instead create a Subject<ArraySegment<StreamEvent>>. It's a little generic-happy, I know. But it reduces the method overhead of all of the OnNext calls in favor of indexing through the array segment. It's an array segment so that you can reuse the same array repeatedly and populate it with start and end positions.

The downside is that FlushPolicy is not implemented on it yet, so it will only flush on punctuations (default behavior) but if that is important here we can do that fairly quickly.

tadeuszwojcik commented 5 years ago

Great, thanks for looking into this! I'll play around with ArraySegment as it seems to be perfect for my use case.

tadeuszwojcik commented 5 years ago

@cybertyche I've looked a bit more into ingress via ArraySegment and it's looking really nice so far, thanks for the tip! I've been reading over and over query writing guide as wanted to investigate if I can model potentially infinite (time wise) events in Trill. It seems to be possible using Start/End edge events, but only if for end event I know start time of it and I don't. Would it be possible to add End events without knowing start time of such event or it's not possible as both start time and event payload are used to 'match' start and end edges? I wonder how I could model differently perhaps, for example let's say I have a sensors sending temperature readings and last reading of temperature is what gets reported, so unless I get the update I should always return last read value. I was thinking that whenever update occurs I would add two events one 'End' event and one 'Start' for new updated temperature, but maybe that's not the way to model it? Would really appreciate your thoughts on that as I'm bit lost right now. EDIT: Actually I think ClipEventDuration may do the trick, will experiment with it.

cybertyche commented 5 years ago

ClipEventDuration will indeed do the trick for you, but the method "Sessionize" is a convenient macro method that you may want to use that could give you a simpler way to model the operation.

I sure wish we didn't need to have end events list the start time associated with it, but unfortunately there are semantic consequences with removing that dependency. Oh, the discussions we had about that one...

wassim-k commented 3 years ago

For the benefit of others, I ran into the same issue and was able to achieve consistent low latency by using:

var punctuationInterval = TimeSpan.FromMilliseconds(200);
var ingress = eventStream.ToAtemporalStreamable(TimelinePolicy.WallClock(punctuationInterval));

This seems to be the only method of ingress which uses an internal timer that pumps punctuations periodically and reliably, which helps maintain consistent low latency.

Goes without saying that this method does not allow for disorder policies but that wasn't required in our case, especially given that any disorder policy will delay the output by the re-order latency specified.