vesoft-inc / nebula-flink-connector

Flink Connector for Nebula Graph
49 stars 30 forks source link

NebulaBatchOutputFormat.commit is not return when source data amount is small #5

Closed ystaticy closed 3 years ago

ystaticy commented 3 years ago

In NebulaBatchOutputFormat.commit there is :

numPendingRow.compareAndSet(executionOptions.getBatch(), 0);

when we set executionOptions.setBatch = 100; But we only have 50 rows in DataSource;

numPendingRow( =50) will not euqals executionOptions.getBatch()( =100); numPendingRow will not reset to 0;

the flush() while loop will running all the time;it will block the flink checkpoint

so I think we should change the first parameter of numPendingRow.compareAndSet

it should be:

long pendingRow=numPendingRow.get();
numPendingRow.compareAndSet(pendingRow ,0);
Nicole00 commented 3 years ago

Sorry for reply late. This is a bug for flush, the update for numPendingRow should be changed. Can you please push a pr to fix this?

ystaticy commented 3 years ago

Sorry for reply late. This is a bug for flush, the update for numPendingRow should be changed. Can you please push a pr to fix this?

Thanks for your reply. I will push a pr to fix it.

ystaticy commented 3 years ago

Sorry for reply late. This is a bug for flush, the update for numPendingRow should be changed. Can you please push a pr to fix this?

I already push a pr : https://github.com/vesoft-inc/nebula-flink-connector/pull/6 can you help me to review it?

Nicole00 commented 3 years ago

Thank you for your contribution! Please sign our Contributor License Agreement as mentioned in your PR.