pravega / flink-connectors

Apache Flink connectors for Pravega.
Apache License 2.0
96 stars 67 forks source link

Implement watermark idleness #17

Closed EronWright closed 4 years ago

EronWright commented 7 years ago

Problem description Subtasks that are without assigned segments/shards/partitions for an indefinite period should enter an idle state. Otherwise downstream operators may stall waiting for watermark progression. See FLINK-5017.

From SourceFunction.SourceContext:

/**
 * Marks the source to be temporarily idle. This tells the system that this source will
 * temporarily stop emitting records and watermarks for an indefinite amount of time. This
 * is only relevant when running on {@link TimeCharacteristic#IngestionTime} and
 * {@link TimeCharacteristic#EventTime}, allowing downstream tasks to advance their
 * watermarks without the need to wait for watermarks from this source while it is idle.
 *
 * <p>Source functions should make a best effort to call this method as soon as they
 * acknowledge themselves to be idle. The system will consider the source to resume activity
 * again once {@link SourceContext#collect(T)}, {@link SourceContext#collectWithTimestamp(T, long)},
 * or {@link SourceContext#emitWatermark(Watermark)} is called to emit elements or watermarks from the source.
 */
 @PublicEvolving
 void markAsTemporarilyIdle();

Problem location FlinkPravegaReader

Suggestions for an improvement Call markAsTemporarilyIdle whenever the Pravega reader has no segments assigned.

StephanEwen commented 7 years ago

Yes, that would definitely be good.

What we need for that is callbacks when readers go to zero subscribed segments and when they go again to at least one subscribed segment.

The notification (zero subscribed segments) should also come when the reader starts with zero segments.

skrishnappa commented 7 years ago

I believe this is applicable only when our sources start sending events with timestamps (using SourceContext.collectWithTimestamp) which we currently don't; until we have this - https://github.com/pravega/pravega/issues/191.

StephanEwen commented 7 years ago

it is also relevant if users explicitly extract timestamps and data-driven watermarks after the source, like that:

DataStream<Event> stream = env.addSource(new FlinkPravegaReader<>(...));
stream
    .assignTimestampsAndWatermarks(new MyExtractor())
    .keyBy( ... )
    .window(...)
    .apply(...);
EronWright commented 7 years ago

Opened pravega/pravega#1285 to track the enhancement needed to the reader.

fpj commented 7 years ago

I'm not sure I follow. What's the difference between a source that has a segment assigned and no data for an unbounded amount of time versus a source that has no segment assigned?

Currently, the Pravega code makes no distinction between the two cases.

EronWright commented 7 years ago

@fpj @tkaitchuck it is a nuanced distinction to be sure. But I think there's a good rationale. In the following, I will use the term idle and active to distinguish between a source that has no segments assigned and one that is actively waiting (maybe indefinitely) for new elements.

The issue boils down to how watermarks are generated and propagated. Streams have an associated watermark assigner, and that assigner has considerable freedom as to when to advance the event-time clock (by emitting a watermark). Typically the assigner uses incoming elements to produce a watermark, perhaps with a bounded out-of-orderness. As far as I know, the system does not automatically advance the event-time clock due to inactivity; unless you use a specialized assigner, the event-time clock will remain at a fixed point in time until a new element arrives.

How watermarks propagate thru the distributed system is, a given operator instance takes the minimum watermark across all active source instances that it is wired to. If an active source instance never emits a watermark, time will simply not advance downstream of that source. That is by design (however, see FLINK-5018 which intends to add an idle timeout). When a source instance marks itself as idle, it indicates to downstream operator instances to not consider that source in the min-watermark calculation, thus allowing the clock to advance. In other words, an idle source releases its hold on the event-time clock.

One more thing: if an idle instance were to later become active (assumedly by being assigned a segment), any record whose timestamp is older than the current watermark is considered late. Late records are by default dropped (though may be handled by advanced programs).

Some classes I found useful to survey in preparing this:

tkaitchuck commented 7 years ago

We should only emit an idleness marker if both the reader has no segments AND there are no unassigned segments. Because if there are unassigned segments we don't want those excluded from the min-time calculation.

But this actually highlights the problem is more complicated.

Even if there are assigned segments, the mere fact that some are unassigned could lead it to falsely conclude that all of the data is accounted for when it is not. So instead we would want to add some sort of other marker that means the reverse of idle when there are unassigned segments. That way if there are say 10 readers and each has one segment the presence of an 11th segment that nobody is reading from should be taken into account.

This of course is further complicated by the fact that not all of the readers will be emitting the same status concurrently. So if for example one segment were passed from one reader to another, becoming briefly unassigned, this interval might be short enough that it would not be perceptible in the minimum calculation. But if it was not the time could suddenly jump forward and then back. Alternately if we have the readers flagging this property, they might disagree as to when the segment was actually unavailable resulting in a much large window during which the minimum could not be safely calculated.

We could get a consistent snapshot as to where all the readers are would be to using the existing ReaderGroup checkpoint mechanism. If ever say 5 seconds some process invoked another checkpoint with an ID that was a monotonic sequence, each of the readers would eventually emit that CPID, this is guaranteed to represent a coherent snapshot of the stream. This would be great for tracking time if all the segments were assigned. But as they might not be we don't have a way to know the timestamp for those are not. If we did have a notion of time, even a application supplied one, then we could provide this. We could obviously stick timestamps into the events in our system, but that would only give us write time and not event time. So we could instead support a mechanism where upon CP (or perhaps randomly) the application could supply a timestamp. Then that could be stored and used as a point of reference for the unassigned segments. Then we could simply have method on the reader group to extract the minimum time of the group.

The problem I see with this is that it puts a bunch more logic into the Reader itself. Is there a simpler solution to this? Certainly we have an API on reader group to inquire if there are any unassigned segments, or to generate a StreamCut from something, or to inject a marker. But I'm not really sure what a clean API would be.

EronWright commented 7 years ago

Tom's comments seem relevant to #41 too. Basically the dynamic assignment of segments to readers complicates watermark handling.

I agree that event time cannot advance if there are unassigned segments, i.e. some part of the keyspace is not being consumed by someone. Future segments are a different matter, right?

I'm drawn to per-segment watermarks, provided by app code (WatermarkAssigner) and persisted into reader group state, to help coordinate watermark progression (and maybe segment assignment).

fpj commented 7 years ago

It feels like there are two different, but related, issues being discussed: 1- How a source should emit idle markers 2- How to deal with the dynamic assignment of segments

The distinction is important because the consequences of the dynamic assignment of segments are present even in the absence of idle sources.

As I understand it, emitting an idle marker say that the source has no segment assigned, independent of the state of the rest of the group. There might be unassigned segments, which the idle source might pick up at a later time, but at the moment it has none and it doesn't want advancing the clock to be blocked on it unnecessarily.

The second point refers to the correctness of the event-time clock in the presence of dynamic assignment of segments. It seems that the main concern is that an unassigned segment might be unaccounted for in the computation that advances the logical clock. This computation depends on each source computing the minimum locally and sending downstream.

I have a couple of questions about this point: 1- If there is a transfer of ownership for a segment, and the segment is such that the new owner is ahead with respect to the time it has reported so far, then a number of events in the segment will be late. For example, if a segment S is at time 10 and it moves to a reader R that is at time 11, then all events in S up to 11 will be late. This sounds correct from the description above, although possibly not entirely desirable. 2- In the presence of stream scaling (say a segment splits into two), is there any concern with determining the initial time of the new segments? This is possibly transparent to the watermark assigner and it would know what to do, in which case this is simply a clarification.

Doing per segment watermarks and using the reader group state to coordinate sounds correct, but it seems heavyweight and it loses the benefit of simply computing a local minimum (operator instance takes the minimum watermark across all active source instances). If we continue computing a local minimum, then it might be fine from consistency perspective, but possibly at the cost of declaring some events late unnecessarily.

StephanEwen commented 7 years ago

Thanks for all the good input and thoughts here!

If I understood this correctly, to determine if a source is idle, we'd need a consistent point. Can we just say that a FlinkPravegaReader instance checks at checkpoint whether it has any segment (pending)? Checkpoints should come frequently enough to be suitable points for that.

To avoid the problem that Flavio illustrated seems more tricky. We would need to exploit the partial order between records or segments to fix that. That partial order is dependent on who created the records or segments when. Thinking a bit wild here: In some sense, it is almost easier to solve that on the "producer" side. The producers is where any notion of order and out of orderness that impacts event time actually comes from. Reconstructing watermarks in the streaming source (the readers) is actually a workaround for the fact that producing into the storage does not add any form of "progress" information.

Actually, hah, here is another cool problem - how do we guarantee order per keys through the entire streaming topology? For some users that is pretty crucial. When a segment moves from one reader to another, its events can overtake events emitted by the reader that previously had the segment.

EronWright commented 7 years ago

Stephan, your last point relates to the lack of total ordering in the topology. My understanding is that event time and the progression of watermarks is the basis for correct, ordered processing; one cannot depend on the physical ordering. I believe this is why certain operators (e.g. CEP) use buffers to reorder elements into event time order. Your point is still valid and points to a real problem, at least in other time domains.

tkaitchuck commented 7 years ago

@StephanEwen

Actually, hah, here is another cool problem - how do we guarantee order per keys through the entire streaming topology? For some users that is pretty crucial. When a segment moves from one reader to another, its events can overtake events emitted by the reader that previously had the segment.

I think there, we have at least a partial solution. Within a reader group segments that are 'available' can get picked up by any reader at any time. But the only way for segments to become available are:

It is fairly reasonable to assume that the application "flushed" all the data it is going to emit before shutting down or being declared dead. If the same was true of resuming reading after encountering a checkpoint, then aside from when the stream is scaling, all of the data should be "flushed" from the first reader before any is seen by the second.

Perhaps we could inject an automatic checkpoint to handle the split case. Or we could give checkpoints an 'ack' to delay the pickup of the segment. We have a few options.

Using checkpoints as a time marker is not a bad idea, but obviously there would still need to be some logic to extract and track the timestamps from events themselves. This probably makes the most sense, but a crazier alternative would be to inject checkpoint markers into the stream at Production time. That would obviously involve creating some new weird API, but it would allow for checkpoints to come directly from the data stream, rather than relying on a master process, and they could have timestamps embedded into them.

fpj commented 4 years ago

This has been resolved with the current watermarking support and corresponding implementation in the connector, issues #265 #266.