flink-extended / flink-remote-shuffle

Remote Shuffle Service for Flink
Apache License 2.0
191 stars 56 forks source link

Ensure all resources will be released for result partition and input gate when closed #55

Closed wsry closed 2 years ago

wsry commented 2 years ago

Motivation

The BufferPacker may release one buffer twice which may cause the following exception:

org.apache.flink.shaded.netty4.io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
        at org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at com.alibaba.flink.shuffle.plugin.transfer.BufferPacker.close(BufferPacker.java:130) ~[shuffle-plugin-1.1-SNAPSHOT.jar:?]
        at com.alibaba.flink.shuffle.plugin.transfer.RemoteShuffleOutputGate.close(RemoteShuffleOutputGate.java:139) ~[shuffle-plugin-1.1-SNAPSHOT.jar:?]
        at com.alibaba.flink.shuffle.plugin.transfer.RemoteShuffleResultPartition.close(RemoteShuffleResultPartition.java:331) ~[shuffle-plugin-1.1-SNAPSHOT.jar:?]
        at org.apache.flink.runtime.taskmanager.Task.closeAllResultPartitions(Task.java:1016) [flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.runtime.taskmanager.Task.releaseResources(Task.java:997) [flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:885) [flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) [flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at java.lang.Thread.run(Thread.java:874) [?:1.8.0_242]

This exception may future cause other resources not be released. This can be fixed by:

  1. Fix the duplicate recycle issue;
  2. Try to release all resources even when release one resource fails. (This is not done before because we expected that there should be no exception)

Changes

  1. Fix the duplicate recycle issue;
  2. Try to release all resources even when release one resource fails.

Test