vesoft-inc / nebula-flink-connector

Flink Connector for Nebula Graph
49 stars 30 forks source link

Support all insert/update/delete operations for dynamic table sink #81

Closed linhr closed 1 year ago

linhr commented 1 year ago

What type of PR is this?

What problem(s) does this PR solve?

Issue(s) number: #77

Description:

This code change enables the dynamic table sink to process upstream change log and performs vertex/edge insert/update/delete operations for each individual row.

How do you solve it?

The solution buffers the rows (and deduplicates them by primary keys) and delegates the execution to three executors based on the row kind (for insert, update, and delete, respectively) when committing the batch.

The primary key for vertices is the vertex ID, and the primary key for edges is the combination of source vertex ID, destination vertex ID, and the rank.

New and existing test cases can show that the solution is working.

Special notes for your reviewer, ex. impact of this fix, design document, etc:

To improve code readability, there are some minor (but backward-incompatible) changes to the public interface. The changes can be seen from the diff in README.md and the Java code in the example directory. Specifically, the following has changed:

  1. The .builder() method of various execution option builder classes have been corrected to .build(), and old .builder() remains for compatibility.
  2. The design of generic typing for some classes, along with some method signatures, have changed (e.g. NebulaBatchOutputFormat and NebulaSinkFunction).
  3. The "batch" option has been renamed to "batch size" in the DataStream API, and old batch remains for compatibility.
codecov-commenter commented 1 year ago

Codecov Report

Base: 61.59% // Head: 65.18% // Increases project coverage by +3.59% :tada:

Coverage data is based on head (e3541c2) compared to base (29a0db2). Patch coverage: 90.50% of modified lines in pull request are covered.

Additional details and impacted files ```diff @@ Coverage Diff @@ ## master #81 +/- ## ============================================ + Coverage 61.59% 65.18% +3.59% - Complexity 291 308 +17 ============================================ Files 52 53 +1 Lines 1786 1873 +87 Branches 166 167 +1 ============================================ + Hits 1100 1221 +121 + Misses 596 566 -30 + Partials 90 86 -4 ``` | [Impacted Files](https://codecov.io/gh/vesoft-inc/nebula-flink-connector/pull/81?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=vesoft-inc) | Coverage Δ | | |---|---|---| | [...ebula/connection/NebulaMetaConnectionProvider.java](https://codecov.io/gh/vesoft-inc/nebula-flink-connector/pull/81/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=vesoft-inc#diff-Y29ubmVjdG9yL3NyYy9tYWluL2phdmEvb3JnLmFwYWNoZS5mbGluay9jb25uZWN0b3IvbmVidWxhL2Nvbm5lY3Rpb24vTmVidWxhTWV0YUNvbm5lY3Rpb25Qcm92aWRlci5qYXZh) | `59.25% <0.00%> (+3.70%)` | :arrow_up: | | [...ector/nebula/sink/NebulaEdgeBatchOutputFormat.java](https://codecov.io/gh/vesoft-inc/nebula-flink-connector/pull/81/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=vesoft-inc#diff-Y29ubmVjdG9yL3NyYy9tYWluL2phdmEvb3JnLmFwYWNoZS5mbGluay9jb25uZWN0b3IvbmVidWxhL3NpbmsvTmVidWxhRWRnZUJhdGNoT3V0cHV0Rm9ybWF0LmphdmE=) | `0.00% <0.00%> (ø)` | | | [...e.flink/connector/nebula/utils/NebulaConstant.java](https://codecov.io/gh/vesoft-inc/nebula-flink-connector/pull/81/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=vesoft-inc#diff-Y29ubmVjdG9yL3NyYy9tYWluL2phdmEvb3JnLmFwYWNoZS5mbGluay9jb25uZWN0b3IvbmVidWxhL3V0aWxzL05lYnVsYUNvbnN0YW50LmphdmE=) | `95.00% <ø> (ø)` | | | [...e.flink/connector/nebula/utils/PartitionUtils.java](https://codecov.io/gh/vesoft-inc/nebula-flink-connector/pull/81/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=vesoft-inc#diff-Y29ubmVjdG9yL3NyYy9tYWluL2phdmEvb3JnLmFwYWNoZS5mbGluay9jb25uZWN0b3IvbmVidWxhL3V0aWxzL1BhcnRpdGlvblV0aWxzLmphdmE=) | `85.71% <ø> (ø)` | | | [...connector/nebula/table/NebulaDynamicTableSink.java](https://codecov.io/gh/vesoft-inc/nebula-flink-connector/pull/81/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=vesoft-inc#diff-Y29ubmVjdG9yL3NyYy9tYWluL2phdmEvb3JnLmFwYWNoZS5mbGluay9jb25uZWN0b3IvbmVidWxhL3RhYmxlL05lYnVsYUR5bmFtaWNUYWJsZVNpbmsuamF2YQ==) | `84.84% <71.42%> (-5.16%)` | :arrow_down: | | [...connector/nebula/sink/NebulaBatchOutputFormat.java](https://codecov.io/gh/vesoft-inc/nebula-flink-connector/pull/81/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=vesoft-inc#diff-Y29ubmVjdG9yL3NyYy9tYWluL2phdmEvb3JnLmFwYWNoZS5mbGluay9jb25uZWN0b3IvbmVidWxhL3NpbmsvTmVidWxhQmF0Y2hPdXRwdXRGb3JtYXQuamF2YQ==) | `47.14% <83.33%> (-2.27%)` | :arrow_down: | | [...link/connector/nebula/sink/NebulaSinkFunction.java](https://codecov.io/gh/vesoft-inc/nebula-flink-connector/pull/81/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=vesoft-inc#diff-Y29ubmVjdG9yL3NyYy9tYWluL2phdmEvb3JnLmFwYWNoZS5mbGluay9jb25uZWN0b3IvbmVidWxhL3NpbmsvTmVidWxhU2lua0Z1bmN0aW9uLmphdmE=) | `66.66% <83.33%> (ø)` | | | [.../nebula/sink/NebulaTableBufferReducedExecutor.java](https://codecov.io/gh/vesoft-inc/nebula-flink-connector/pull/81/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=vesoft-inc#diff-Y29ubmVjdG9yL3NyYy9tYWluL2phdmEvb3JnLmFwYWNoZS5mbGluay9jb25uZWN0b3IvbmVidWxhL3NpbmsvTmVidWxhVGFibGVCdWZmZXJSZWR1Y2VkRXhlY3V0b3IuamF2YQ==) | `90.00% <90.00%> (ø)` | | | [...ector/nebula/statement/VertexExecutionOptions.java](https://codecov.io/gh/vesoft-inc/nebula-flink-connector/pull/81/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=vesoft-inc#diff-Y29ubmVjdG9yL3NyYy9tYWluL2phdmEvb3JnLmFwYWNoZS5mbGluay9jb25uZWN0b3IvbmVidWxhL3N0YXRlbWVudC9WZXJ0ZXhFeGVjdXRpb25PcHRpb25zLmphdmE=) | `92.53% <95.00%> (+25.87%)` | :arrow_up: | | [...nnector/nebula/statement/EdgeExecutionOptions.java](https://codecov.io/gh/vesoft-inc/nebula-flink-connector/pull/81/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=vesoft-inc#diff-Y29ubmVjdG9yL3NyYy9tYWluL2phdmEvb3JnLmFwYWNoZS5mbGluay9jb25uZWN0b3IvbmVidWxhL3N0YXRlbWVudC9FZGdlRXhlY3V0aW9uT3B0aW9ucy5qYXZh) | `93.75% <95.45%> (+26.00%)` | :arrow_up: | | ... and [14 more](https://codecov.io/gh/vesoft-inc/nebula-flink-connector/pull/81/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=vesoft-inc) | | Help us with your feedback. Take ten seconds to tell us [how you rate us](https://about.codecov.io/nps?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=vesoft-inc). Have a feature suggestion? [Share it here.](https://app.codecov.io/gh/feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=vesoft-inc)

:umbrella: View full report at Codecov.
:loudspeaker: Do you have feedback about the report comment? Let us know in this issue.

Nicole00 commented 1 year ago

It's an excellent pr, thanks very mach for your contribution @linhr

linhr commented 1 year ago

@Nicole00 Thanks for your review! I've made some changes to the code according to your feedback. Let me know if they look good.

linhr commented 1 year ago

@Nicole00 Do you think if this PR can be made into the 3.4.0 release? Let me know what you think about my changes after your initial review. Thanks a lot!