pravega / flink-connectors

Apache Flink connectors for Pravega.
Apache License 2.0
96 stars 67 forks source link

Improve watermark handling (per-segment watermark) #41

Closed EronWright closed 3 years ago

EronWright commented 7 years ago

Problem description Time is a first-class aspect of the Flink Streaming programming model. The progress of time is based on watermarks that typically track the timestamps seen in records as they're emitted by the source. The system works best if sources emit records in roughly the order they were received in; in other words, Flink's ability to reorder out-of-order elements is limited. Two problems occur when a source emits records in a highly unordered fashion.

  1. Any record with a timestamp older than the current watermark is considered a 'late' record which requires special handling by the application.
  2. Any record with a timestamp newer than the current watermark is buffered in operator state. Depending on the state backend, this can increase memory and/or disk usage.

The Pravega source already processes segments in order; a successor won't be processed until its predecessors have been. However, segments that meet that criteria are simply processed in the order that the reader group encounters them. The ordering isn't strong enough to avoid emission of late records.

Another cause of out-of-orderness is the fact that some segments are much longer than others, combined with the fact that a segment is processed completely once it is assigned to a reader. For example, imagine that segment A contains a full day's worth of records, while segment B was split into segments C and D at mid-day due to a scale event. A given reader may well emit all of A before tackling C/D.

This problem asserts itself mainly in historical processing scenarios.

Suggestions for an improvement

  1. Consider prioritizing unassigned segments by their start time.
  2. Consider introducing a per-segment watermark assigner, with automatic min-watermark calculation across segments. This may help with problem (1). See also Kafka Per-Partition Watermarks feature docs.
  3. Consider employing a segment selection policy that would discourage a given reader from assigning itself a segment that contains elements older than the reader's current watermark. For example, once a segment has been processed, store the per-segment watermark into reader group state as the effective 'start time' of the successors.
StephanEwen commented 7 years ago

It sounds like per-segment watermarks are a way to go there. That would mean that we need a somewhat lower level reader interface, though.

I am wondering what Pravega's big picture plan on time and order handling are. So far, there is only order by key, but any elaborate event time handling needs some more ordering, at at least information about the out-of-order-ness bounds.

fpj commented 7 years ago

There are a number of things to be mentioned here. I hope this comment doesn't get too long.

The main abstraction that Pravega exposes is stream. Streams internally are implemented with segments and composing segments gives us a stream. It is useful to think of Pravega as a segment store, where each segment is a sequence of bytes organized according to temporal order. The core of Pravega does not really understands events, only bytes.

In principle, we can compose segments in any way we want, but we have chosen a few ways to compose them that make sense to the application, for example, scale up corresponds to ending a segment and starting n others. Although this idea of composing segments is really powerful, it brings a fair level of complexity when it comes to reasoning about order. For that reason, we have been avoiding at all costs to expose the segment abstraction. I also feel that manipulating segments directly leads to designs that have some rigid structure that I'd like to avoid, e.g., the mapping of partitions to sources is static.

We have talked about a low-level reader API because we want the ability of writing multiple flavors of reader group rather than cram all possible features within one reader group class. But, simply exposing all the segment manipulation would probably be a bad idea because it would lead to confusion and bugs. The API should be minimal, yet powerful enough to enable the different flavors.

As for time tracking, we could offer time references without exposing the notion of segments. We could do it in a way that is assigned by applications or at the AppendProcessor level at the segment store. My main concern here is having to guarantee monotonicity at the segment store when the application decides the timestamps. As a first cut, we don't really want the segment store service itself interpreting the bytes of a segment for anything. Would it be sufficient if the writers added app-provided timestamps on a per segment basis, even if the timestamps aren't synchronized across writers?

In the A B C D example above, let's assume two cases: single reader, multiple readers. For the single reader case, I'd expect the reader to pick hit the end of B before it finishes reading A. The reader would handle the sealed segment response and would request the successors from the controller. The reader next adds the successors to the state of the reader group, and it will pick them up because there is no one else. The case of multiple readers is pretty much the same, except that other readers might end up picking it up.

Prioritizing unassigned segments by their start time would make sense in the case partitions sit there waiting for some time for some reader to pick them up. I believe we try to have them assigned as soon as possible, and so if we are doing a good job at it, then we don't really need to do that prioritization.

I think suggestion 1 and 3 are pretty much the same, but perhaps I'm missing the difference.

EronWright commented 7 years ago

This long comment is my way of saying that the Pravega connector (also the Kinesis connector) faces new challenges, and that we shouldn't assume that historical stream processing 'just works' with the wonderful determinism we've come to expect from Flink. I am not quite the expert that other folks here are, these are just my observations.

I find it useful to imagine there being two 'regimes' in stream processing. Consider a job started against some pre-existing stream. Processing begins with the oldest records, working thru them as quickly as possible to reach the tail. This is the 'historical' regime. Once at the tail, the processor enters a 'real-time' regime. There are important differences. In the real-time regime, the various time domains are roughly aligned, elements are processed roughly in the order they're received in, and the logic of watermark assignment can reasonably be delegated to the application. In the historical regime, there's an abundance of old data to process, system internals (like the implementation details in the reader group) heavily influence the processing order, and watermark assignment becomes more complex. Unless the connector is designed with historical processing in mind, the net effect is a loss of determinism, late records, etc.

Having per-partition assigners, in combination with watermark aggregation logic within the source and elsewhere, seems to be a good approach to decouple the app-specific aspects of assignment from the system-dependent aspects.

I feel the issue of non-determinism in historical processing is most acute in combination with dynamic work rebalancing and/or dynamic partitioning. Take the Kafka connector - each reader has a fixed set of partitions to process, and so the min-watermark calculation is rational. Throw in dynamic partitioning or rebalancing, and the possibility of late records is greatly increased.

StephanEwen commented 7 years ago

I hope to find some time in the next days to give respond with some more details, but for now here is a summary:

EronWright commented 7 years ago

For me, a key question is the semantics of the application-specific watermark generation function. Ideally the function conveys domain knowledge (e.g. IOT specifics) without knowing the system internals (e.g. how the reader group selects segments). Meanwhile, the watermark produced by the function is aggregated by the system at various levels, e.g. across subtask instances and across partitions in the Kafka case.

We have considerable flexibility in how the Pravega source invokes the watermark function and performs aggregation. Could we somehow simplify the app developer's burden of writing a correct watermark function? Here's a few hypothetical solutions with diagrams. Note the function-like syntax W(...), which means a stateful instance of a watermark function.

First, the data we'll be working with. Three keys (A,B,C) with six segments (S1..S6). In this example, the producer is well-behaved, producing ascending timestamps per key.

┌────┬───┬───────┐     ┌────┐┌────┐┌────┐┌────┐
│1   │2  │4      │   A │ A1 ││ A2 ││ A3 ││ .. │
│    │   │ A     │     └────┘└────┘└────┘└────┘
│    │ A ├───┬───┤     ┌────┐┌────┐┌────┐┌────┐
│ A  │ B │5  │6  │   B │ B1 ││ B2 ││ B3 ││ .. │
│ B  │   │ B │   │     └────┘└────┘└────┘└────┘
│ C  ├───┴───┤ B │     ┌────┐┌────┐┌────┐┌────┐
│    │3      │ C │   C │ C1 ││ C2 ││ C3 ││ .. │
│    │ C     │   │     └────┘└────┘└────┘└────┘
└────┴───────┴───┘                             

1) Watermark function per routing key. This is an ideal solution because it matches Pravega's natural ordering guarantee and hides all system internals. In this example, the app could simply use the predefined AscendingTimestampExtractor function.

    ┌────┐┌────┐┌────┐┌────┐  
  W(│ A1 ││ A2 ││ A3 ││ .. │) 
    └────┘└────┘└────┘└────┘  
    ┌────┐┌────┐┌────┐┌────┐  
  W(│ B1 ││ B2 ││ B3 ││ .. │) 
    └────┘└────┘└────┘└────┘  
    ┌────┐┌────┐┌────┐┌────┐  
  W(│ C1 ││ C2 ││ C3 ││ .. │) 
    └────┘└────┘└────┘└────┘  

2) Watermark function per segment. Here, the function must deal with ordering across keys, but may be written as though the stream consists of one fixed segment.

   ┌────┐┌────┐┌────┐┌────┐┌────┐┌────┐┌────┐┌────┐┌────┐  
 W(│ A1 ││ B1 ││ A2 ││ B2 ││ B3 ││ C1 ││ C2 ││ A3 ││ .. │) 
   └────┘└────┘└────┘└────┘└────┘└────┘└────┘└────┘└────┘  

3) Watermark function per sub-task. Here, the function must deal with ordering across keys and with ordering of segment processing. This diagram reflects the current implementation.

   ╔════╗╔════╗╔════╗╔════╗       
 W(║ S1 ║║ S2 ║║ S5 ║║ .. ║)      
   ╚════╝╚════╝╚════╝╚════╝       
   ╔════╗╔════╗╔════╗╔════╗╔════╗ 
 W(║ -  ║║ S3 ║║ S6 ║║ S4 ║║ .. ║)
   ╚════╝╚════╝╚════╝╚════╝╚════╝ 

For reference, here's the source for commonly-used watermark functions:

tkaitchuck commented 7 years ago

In the above examples do the w() groupings also corispond to the data received by one reader?

Sections 1) and 2) use A1, B1, C3 etc to label events, 3) uses S1-S5 do these refer to segments? If so, the order of segments listed would violate ordering guarantees, and that aside this is not the way the code works now. You did not mention having a watermark function per stream or per reader group. These are also options. However at the end of the day I think we are most interested in event time and not write time. In which case the only extractor that really makes sense is BoundedOutOfOrdernessTimestampExtractor as we don't control event time.

EronWright commented 7 years ago

Sorry, diagram (3) is not very clear. It illustrates only the order of segment processing as experienced by a particular (stateful) watermark function instance. It does not imply that S5 and S6 would be processed concurrently. I stand by my assertion that a subtask could process S6 before S4.

tkaitchuck commented 7 years ago

The client is not returning segments. So a segment is never 'processed'. If you mean that some events in segment 4 could go after some events in segment 6, then yes. That shouldn't be surprising. After all they are in totally independent parts of the routing key space, and the events were written in parallel, it is only natural that they would be read in parallel. It is also true that it is possible that in the above example that A1, A2, A3 will all be read before C1, or all after. The contract is ordering by keys. The segments are an irrelevant implementation detail. There is no way to know they exist at an API level. Hence the Flink connector should not know or care about their existence or structure.

EronWright commented 7 years ago

I think that Tom is rightly concerned about keeping segments as an implementation detail. The reason they are mentioned is because segments affect processing order in today's reader group implementation. Sorry for any undue implications about exposing segments to users.

EronWright commented 5 years ago

Just a note, there's improved support for tracking a global watermark at the Flink level, based on a general-purpose 'global aggregate tracker' provided by the JM. Might be useful. https://issues.apache.org/jira/browse/FLINK-10886