pravega / flink-connectors

Apache Flink connectors for Pravega.
Apache License 2.0
96 stars 67 forks source link

FlinkPravegaReader fails to achieve exactly-once semantics in some cases #89

Closed EronWright closed 5 years ago

EronWright commented 6 years ago

Problem description In the edge case where a task fails before the first checkpoint is successful, the behavior should be that the group is rewound to the initial state. For example, if the group was configured to start from the beginning of the stream, it should restart from the beginning.

Due to how the source is implemented by creating the reader group in the source constructor (which is not re-executed in this case), and that the hook isn't invoked when there's no state to restore, the actual behavior is that the group simply continues from where it left off. Actually, when the replacement tasks start up, an error occurs due to dirty state:

java.lang.IllegalStateException: The requested reader: Source: Custom Source -> Sink: Unnamed (1/4) cannot be added to the group because it is already in the group. Perhaps close() was not called?
    at io.pravega.client.stream.impl.ReaderGroupStateManager.initializeReader(ReaderGroupStateManager.java:118)
    at io.pravega.client.stream.impl.ClientFactoryImpl.createReader(ClientFactoryImpl.java:138)
    at io.pravega.client.stream.impl.ClientFactoryImpl.createReader(ClientFactoryImpl.java:124)
    at io.pravega.connectors.flink.util.FlinkPravegaUtils.createPravegaReader(FlinkPravegaUtils.java:110)
    at io.pravega.connectors.flink.FlinkPravegaReader.run(FlinkPravegaReader.java:227)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
    at org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:39)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
    at java.lang.Thread.run(Thread.java:748)

Simply removing oneself from the online readers would fix the above symptom but would produce undesirable at-most-once behavior.

Suggestions for an improvement The obvious fix is to change Flink's hook functionality to invoke the hook in the non-restore case too. In that case, the hook would reinitialize the group. Alternately, the tasks could catch the above exception and reset the reader group state, with some additional coordination.

As a workaround, the reader could wait for the first Flink checkpoint to arrive before processing any elements. There's a catch-22: the Flink checkpoints are communicated to the task via the reader group state!

EronWright commented 6 years ago

We need a workaround for Flink 1.4 and a true fix for Flink 1.5. Of course I'll file a ticket for the latter with some concrete ideas.

One idea is to add an initializeState method to MasterTriggerRestoreHook, which would be called unconditionally (on initial execution and on recovery, with or without checkpoint state). The Pravega connector would initialize or forcibly reinitialize the reader group state here. A subsequent call to restoreCheckpoint would restore reader group checkpoint state (as it does now). A nice side-benefit of this approach is to move reader group initialization from client to JM.

Update: opened FLINK-8533.

Update 2: patch submitted.

EronWright commented 6 years ago

The patch was merged for Flink 1.5. Once 1.5 is out, we will update the connector to use the new functionality. The tricky part is whether to continue to support 1.4.

Basically the ReaderCheckpointHook class will have the following new code:

    // lifecycle

    @Override
    public void reset() {
        // reset the reader group to its initial condition
        log.debug("Resetting the state of reader group {} to its initial state.", readerGroup.getGroupName());
        readerGroup.resetReaderGroup(readerGroupConfig);
    }

    @Override
    public void close() {
        readerGroup.close();
    }
vijikarthi commented 5 years ago

closed https://github.com/pravega/flink-connectors/pull/233