vesoft-inc / nebula-flink-connector

Flink Connector for Nebula Graph
48 stars 30 forks source link

Improve connector failure handling logic #85

Closed linhr closed 1 year ago

linhr commented 1 year ago

What type of PR is this?

What problem(s) does this PR solve?

Issue(s) number

N/A

Description

We have found a few edge cases when the connector does not handle failures gracefully. This PR tries to make the implementation more robust, based on our observations running the connector in production in the past few months.

How do you solve it?

We have the following changes to the connector:

  1. Support retries in the executor.
  2. Support raising exceptions and letting the task fail fast when there are unrecoverable errors. (This behavior is configurable. See the following about the configuration.)
  3. Support renewing the session when the execution fails. This fixes #83.
  4. Do not return and store the error message from the NebulaBatchExecutor.executeBatch() method. This prevents Java heap memory to grow out-of-bound when there are a large number of correlated errors.
  5. Polish various error messages in the log and exceptions.

We introduce three new options for the Table API connector:

The DataStream connector can be configured similarly using ExecutionOptions.

Special notes for your reviewer, ex. impact of this fix, design document, etc:

N/A

codecov-commenter commented 1 year ago

Codecov Report

Patch coverage: 71.27% and project coverage change: +1.88 :tada:

Comparison is base (6b3cad3) 65.12% compared to head (a8cc8d1) 67.00%.

:exclamation: Current head a8cc8d1 differs from pull request most recent head 2e86172. Consider uploading reports for the commit 2e86172 to get more accurate results

:mega: This organization is not using Codecov’s GitHub App Integration. We recommend you install it so Codecov can continue to function properly for your repositories. Learn more

Additional details and impacted files ```diff @@ Coverage Diff @@ ## master #85 +/- ## ============================================ + Coverage 65.12% 67.00% +1.88% - Complexity 309 330 +21 ============================================ Files 53 55 +2 Lines 1878 1961 +83 Branches 167 170 +3 ============================================ + Hits 1223 1314 +91 + Misses 570 563 -7 + Partials 85 84 -1 ``` | [Impacted Files](https://codecov.io/gh/vesoft-inc/nebula-flink-connector/pull/85?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=vesoft-inc) | Coverage Δ | | |---|---|---| | [....flink/connector/nebula/catalog/NebulaCatalog.java](https://codecov.io/gh/vesoft-inc/nebula-flink-connector/pull/85?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=vesoft-inc#diff-Y29ubmVjdG9yL3NyYy9tYWluL2phdmEvb3JnLmFwYWNoZS5mbGluay9jb25uZWN0b3IvbmVidWxhL2NhdGFsb2cvTmVidWxhQ2F0YWxvZy5qYXZh) | `29.29% <0.00%> (ø)` | | | [...link/connector/nebula/sink/NebulaSinkFunction.java](https://codecov.io/gh/vesoft-inc/nebula-flink-connector/pull/85?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=vesoft-inc#diff-Y29ubmVjdG9yL3NyYy9tYWluL2phdmEvb3JnLmFwYWNoZS5mbGluay9jb25uZWN0b3IvbmVidWxhL3NpbmsvTmVidWxhU2lua0Z1bmN0aW9uLmphdmE=) | `66.66% <ø> (ø)` | | | [...ink/connector/nebula/source/NebulaInputFormat.java](https://codecov.io/gh/vesoft-inc/nebula-flink-connector/pull/85?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=vesoft-inc#diff-Y29ubmVjdG9yL3NyYy9tYWluL2phdmEvb3JnLmFwYWNoZS5mbGluay9jb25uZWN0b3IvbmVidWxhL3NvdXJjZS9OZWJ1bGFJbnB1dEZvcm1hdC5qYXZh) | `68.75% <0.00%> (ø)` | | | [...e.flink/connector/nebula/utils/NebulaConstant.java](https://codecov.io/gh/vesoft-inc/nebula-flink-connector/pull/85?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=vesoft-inc#diff-Y29ubmVjdG9yL3NyYy9tYWluL2phdmEvb3JnLmFwYWNoZS5mbGluay9jb25uZWN0b3IvbmVidWxhL3V0aWxzL05lYnVsYUNvbnN0YW50LmphdmE=) | `95.00% <ø> (ø)` | | | [...connector/nebula/sink/NebulaBatchOutputFormat.java](https://codecov.io/gh/vesoft-inc/nebula-flink-connector/pull/85?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=vesoft-inc#diff-Y29ubmVjdG9yL3NyYy9tYWluL2phdmEvb3JnLmFwYWNoZS5mbGluay9jb25uZWN0b3IvbmVidWxhL3NpbmsvTmVidWxhQmF0Y2hPdXRwdXRGb3JtYXQuamF2YQ==) | `54.90% <54.09%> (+7.75%)` | :arrow_up: | | [.../nebula/sink/NebulaTableBufferReducedExecutor.java](https://codecov.io/gh/vesoft-inc/nebula-flink-connector/pull/85?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=vesoft-inc#diff-Y29ubmVjdG9yL3NyYy9tYWluL2phdmEvb3JnLmFwYWNoZS5mbGluay9jb25uZWN0b3IvbmVidWxhL3NpbmsvTmVidWxhVGFibGVCdWZmZXJSZWR1Y2VkRXhlY3V0b3IuamF2YQ==) | `88.57% <80.00%> (-1.43%)` | :arrow_down: | | [...ink/connector/nebula/sink/NebulaBatchExecutor.java](https://codecov.io/gh/vesoft-inc/nebula-flink-connector/pull/85?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=vesoft-inc#diff-Y29ubmVjdG9yL3NyYy9tYWluL2phdmEvb3JnLmFwYWNoZS5mbGluay9jb25uZWN0b3IvbmVidWxhL3NpbmsvTmVidWxhQmF0Y2hFeGVjdXRvci5qYXZh) | `83.33% <83.33%> (ø)` | | | [...connector/nebula/sink/NebulaEdgeBatchExecutor.java](https://codecov.io/gh/vesoft-inc/nebula-flink-connector/pull/85?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=vesoft-inc#diff-Y29ubmVjdG9yL3NyYy9tYWluL2phdmEvb3JnLmFwYWNoZS5mbGluay9jb25uZWN0b3IvbmVidWxhL3NpbmsvTmVidWxhRWRnZUJhdGNoRXhlY3V0b3IuamF2YQ==) | `87.09% <100.00%> (+17.09%)` | :arrow_up: | | [...nnector/nebula/sink/NebulaVertexBatchExecutor.java](https://codecov.io/gh/vesoft-inc/nebula-flink-connector/pull/85?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=vesoft-inc#diff-Y29ubmVjdG9yL3NyYy9tYWluL2phdmEvb3JnLmFwYWNoZS5mbGluay9jb25uZWN0b3IvbmVidWxhL3NpbmsvTmVidWxhVmVydGV4QmF0Y2hFeGVjdXRvci5qYXZh) | `86.66% <100.00%> (+17.43%)` | :arrow_up: | | [...nnector/nebula/statement/EdgeExecutionOptions.java](https://codecov.io/gh/vesoft-inc/nebula-flink-connector/pull/85?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=vesoft-inc#diff-Y29ubmVjdG9yL3NyYy9tYWluL2phdmEvb3JnLmFwYWNoZS5mbGluay9jb25uZWN0b3IvbmVidWxhL3N0YXRlbWVudC9FZGdlRXhlY3V0aW9uT3B0aW9ucy5qYXZh) | `92.55% <100.00%> (+1.08%)` | :arrow_up: | | ... and [5 more](https://codecov.io/gh/vesoft-inc/nebula-flink-connector/pull/85?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=vesoft-inc) | | ... and [3 files with indirect coverage changes](https://codecov.io/gh/vesoft-inc/nebula-flink-connector/pull/85/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=vesoft-inc) Help us with your feedback. Take ten seconds to tell us [how you rate us](https://about.codecov.io/nps?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=vesoft-inc). Have a feature suggestion? [Share it here.](https://app.codecov.io/gh/feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=vesoft-inc)

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Do you have feedback about the report comment? Let us know in this issue.

linhr commented 1 year ago

@Nicole00 Thanks for your review! I've addressed the comments accordingly. Let me know if there is more feedback on this PR.

cc @wey-gu

wey-gu commented 1 year ago

Thank you @linhr for the contribution, it's great to have you in the NebulaGraph community!