apache / doris-flink-connector

Flink Connector for Apache Doris
https://doris.apache.org/
Apache License 2.0
330 stars 226 forks source link

[Bug] (concurrency) Thread safety problem of loading data to Doris #100

Closed DarvenDuan closed 1 year ago

DarvenDuan commented 1 year ago

Search before asking

Version

1.1.1

What's Wrong?

When I used flink-doris-connector_1.14_2.12(1.1.1) to load data to Doris, Checkpoint failed due to java.lang.InterruptedException which was thrown by exception checker. But Stream load job in Doris were normal.

In the source code, exception checker will invoke method DorisStreamLoad.handlePreCommitResponse() if variable loading = true, in method DorisWriter.prepareCommit(), variable loading will be set to false to disable exception checker, and then invokes method DorisStreamLoad.handlePreCommitResponse(). But method DorisStreamLoad.handlePreCommitResponse() may be invoked by exception checker thread before loading is set to false by main thread. So exception checker thread and main thread will invokes DorisStreamLoad.handlePreCommitResponse() at the same time. So this may cause concurrent access problems.

What You Expected?

Exception checker thread and main thread Invokes method handlePreCommitResponse() synchronously

How to Reproduce?

No response

Anything Else?

No response

Are you willing to submit PR?

Code of Conduct

JNSimba commented 1 year ago

But method DorisStreamLoad.handlePreCommitResponse() may be invoked by exception checker thread before loading is set to true by main thread.

In the checkdone method, when loading is false, it will be skipped directly, should this problem be avoided?

GoGoWen commented 1 year ago

this should be an issue like https://github.com/apache/doris-flink-connector/pull/102.

DarvenDuan commented 1 year ago

But method DorisStreamLoad.handlePreCommitResponse() may be invoked by exception checker thread before loading is set to true by main thread.

In the checkdone method, when loading is false, it will be skipped directly, should this problem be avoided?

Thank you for your replay, I'm sorry that my description has something wrong, it should be "But method DorisStreamLoad.handlePreCommitResponse() may be invoked by exception checker thread before loading is set to false by main thread" , Exactly InterruptException may be cased by dorisStreamLoad.getPendingLoadFuture().

When doing checkpoint, method dorisStreamLoad.startLoad() will be invoked and changes pendingLoadFuture, but in checkDone() dorisStreamLoad.getPendingLoadFuture() may get old pendingLoadFuture.

PR #102 may fix this problem?

JNSimba commented 1 year ago

When doing checkpoint, method dorisStreamLoad.startLoad() will be invoked and changes pendingLoadFuture, but in checkDone() dorisStreamLoad.getPendingLoadFuture() may get old pendingLoadFuture.

Sorry, I didn't understand what you meant. Under normal circumstances, the Interrupt exception in the checkDone() method is due to the end of streamload, but it has not reached dorisStreamLoad.stopLoad(). When checkpointing, the snapshotState method will open a new pendingLoadFuture through dorisStreamLoad.startLoad. However, at this time, the dorisStreamLoad.handlePreCommitResponse step in the checkDone method should not be possible, right? Because dorisStreamLoad.getPendingLoadFuture().isDone() is flase?

GoGoWen commented 1 year ago

However, at this time, the dorisStreamLoad.handlePreCommitResponse step in the checkDone method should not be possible, right? Because dorisStreamLoad.getPendingLoadFuture().isDone() is flase?

Guys, In my opinoin, dorisStreamLoad.getPendingLoadFuture().isDone() may be false as the Future in dorisStreamLoad is not volatile. dorisStreamLoad.getPendingLoadFuture() in checkDone will get previous Done Future even the new Future is set in startLoad for next stream load。