vesoft-inc / nebula-flink-connector

Flink Connector for Nebula Graph
48 stars 30 forks source link

FlinkSQL Nebula as Sink #57

Closed spike-liu closed 2 years ago

spike-liu commented 2 years ago

FlinkSQL: take Nebula as Sink.

Please feel free to review.

BTW, integration test has been tested locally with the docker-compose.yaml as below:

https://github.com/vesoft-inc/nebula-flink-connector/pull/57/files#diff-881de9edd148f6d70ee8c0ebc96cc09268471ba279c5d0b953b0c8abcf9d2e43

image

CLAassistant commented 2 years ago

CLA assistant check
All committers have signed the CLA.

spike-liu commented 2 years ago

@Nicole00 Thanks for the build approval.

Meanwhile I have fixed the build error caused by check-style and updated the pull request. Please review again.

By the way, this was a mistake and check-style had been enabled in my development environment.

spike-liu commented 2 years ago

@Nicole00 We believe you are very busy. Would you please spare us some time to review this PR?

We are enthusiasm for this project. Our colleagues would like to contribute more codes based on this PR.

Or is it possible to assign someone else to help us out?

QingZ11 commented 2 years ago

@Nicole00 We believe you are very busy. Would you please spare us some time to review this PR?

We are enthusiasm for this project. Our colleagues would like to contribute more codes based on this PR.

Or is it possible to assign someone else to help us out?

Please wait a moment, I have communicated with Nicole, for she is on a long vacation, so the pr processing is not very timely.

Sorry for our slow reply. And thank you for your patience.

(supported by Google Translate)

spike-liu commented 2 years ago

@QingZ11 it is great to hear from you and thanks for letting us know.

Nicole00 commented 2 years ago

@QingZ11 it is great to hear from you and thanks for letting us know.

So sorry for the late reply. I'm on maternity leave so not review it in time. We'll review this pr as soon as possible.

Nicole00 commented 2 years ago

It's a great PR! thanks again for your work and attention to nebula. 👍🏻

Nicole00 commented 2 years ago
  1. How can we create a graph space using flink sql?
  2. And if there's point to define the vid(src id, dst id) data type in the create sql.
spike-liu commented 2 years ago

@QingZ11 it is great to hear from you and thanks for letting us know.

So sorry for the late reply. I'm on maternity leave so not review it in time. We'll review this pr as soon as possible.

It is our sunny day to hear from you:)

Please forgive us to push you during your leave.

spike-liu commented 2 years ago
  1. How can we create a graph space using flink sql? [Spike] Currently Flink SQL only support few statements as below, so no chance for us to create graph space: https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/overview/
image
  1. And if there's point to define the vid(src id, dst id) data type in the create sql. [Spike] based on my understanding, srcId and dstId have to be included in insert statements while all the columns included in insert statements have to be included in Create table statement. Hence it seems this is inevitable. Correct me if I am wrong^-^
Nicole00 commented 2 years ago
  1. How can we create a graph space using flink sql? [Spike] Currently Flink SQL only support few statements as below, so no chance for us to create graph space: https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/overview/
image
  1. And if there's point to define the vid(src id, dst id) data type in the create sql. [Spike] based on my understanding, srcId and dstId have to be included in insert statements while all the columns included in insert statements have to be included in Create table statement. Hence it seems this is inevitable. Correct me if I am wrong^-^
  1. The supported statements in Flink SQL includes CREATE DATABASE, for nebula it's equal to CREATE SPACE.
  2. I agree that srcid and dstid should be included in insert statements. but how to define the datatype of vid in CREATE TABLE statement? like vid STRING or vid BUGINT? In fact, the datatype in CREATE TABLE statement is useless, it's point in CREATE SPACE statement.
Nicole00 commented 2 years ago

For more clear function and more convenient for review, please submit the source function in another pr? Thanks~

spike-liu commented 2 years ago
  1. How can we create a graph space using flink sql? [Spike] Currently Flink SQL only support few statements as below, so no chance for us to create graph space: https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/overview/
image
  1. And if there's point to define the vid(src id, dst id) data type in the create sql. [Spike] based on my understanding, srcId and dstId have to be included in insert statements while all the columns included in insert statements have to be included in Create table statement. Hence it seems this is inevitable. Correct me if I am wrong^-^
  1. The supported statements in Flink SQL includes CREATE DATABASE, for nebula it's equal to CREATE SPACE.
  2. I agree that srcid and dstid should be included in insert statements. but how to define the datatype of vid in CREATE TABLE statement? like vid STRING or vid BUGINT? In fact, the datatype in CREATE TABLE statement is useless, it's point in CREATE SPACE statement.

Yes, you are right. CREATE DATABASE is exactly what you want. However in our user case, production is highly stability sensitive, creating database during single data stream processing is forbidden. (not only for nebula, but also for mysql and etc.

Maybe we could create an separate issue to track this requirement.

spike-liu commented 2 years ago

For more clear function and more convenient for review, please submit the source function in another pr? Thanks~

Sorry for the extra complexity. There are some duplicated codes in Source & Sink, so it is hard to separate them.

However I have reverted this commit and maybe bring it on in the future if they were still needed.

Nicole00 commented 2 years ago

Yes, you are right. CREATE DATABASE is exactly what you want. However in our user case, production is highly stability sensitive, creating database during single data stream processing is forbidden. (not only for nebula, but also for mysql and etc.

Maybe we could create an separate issue to track this requirement.

I totally agree to submit a separate issue and pr for CREATE SPACE. And For flink sql connector, there's no need to forbidden the creating space operation(at most time, the creating space is together with creating tag/edge). Mysql's database is not allowed to be create because the connection to mysql needs database name, but nebula does not.

spike-liu commented 2 years ago

Yes, you are right. CREATE DATABASE is exactly what you want. However in our user case, production is highly stability sensitive, creating database during single data stream processing is forbidden. (not only for nebula, but also for mysql and etc. Maybe we could create an separate issue to track this requirement.

I totally agree to submit a separate issue and pr for CREATE SPACE. And For flink sql connector, there's no need to forbidden the creating space operation(at most time, the creating space is together with creating tag/edge). Mysql's database is not allowed to be create because the connection to mysql needs database name, but nebula does not.

Issue has been created as below: https://github.com/vesoft-inc/nebula-flink-connector/issues/62

codecov-commenter commented 2 years ago

Codecov Report

Merging #57 (6388c1a) into master (b94c0a6) will increase coverage by 11.28%. The diff coverage is 73.68%.

@@              Coverage Diff              @@
##             master      #57       +/-   ##
=============================================
+ Coverage     38.43%   49.72%   +11.28%     
- Complexity      152      190       +38     
=============================================
  Files            49       50        +1     
  Lines          1517     1613       +96     
  Branches        142      153       +11     
=============================================
+ Hits            583      802      +219     
+ Misses          881      743      -138     
- Partials         53       68       +15     
Impacted Files Coverage Δ
...r/nebula/catalog/factory/NebulaCatalogFactory.java 0.00% <0.00%> (ø)
...connector/nebula/sink/NebulaEdgeBatchExecutor.java 64.10% <ø> (ø)
...nnector/nebula/sink/NebulaVertexBatchExecutor.java 63.15% <ø> (ø)
...connector/nebula/sink/NebulaBatchOutputFormat.java 49.41% <62.50%> (+10.17%) :arrow_up:
...nector/nebula/table/NebulaDynamicTableFactory.java 80.85% <74.44%> (+80.85%) :arrow_up:
...ctor/nebula/sink/NebulaEdgeTableBatchExecutor.java 83.33% <83.33%> (ø)
...or/nebula/sink/NebulaVertexTableBatchExecutor.java 83.33% <83.33%> (ø)
...nnector/nebula/connection/NebulaClientOptions.java 90.00% <100.00%> (+0.12%) :arrow_up:
...ctor/nebula/sink/NebulaBatchTableOutputFormat.java 100.00% <100.00%> (ø)
...connector/nebula/table/NebulaDynamicTableSink.java 90.00% <100.00%> (+90.00%) :arrow_up:
... and 7 more

Continue to review full report at Codecov.

Legend - Click here to learn more Δ = absolute <relative> (impact), ø = not affected, ? = missing data Powered by Codecov. Last update b94c0a6...6388c1a. Read the comment docs.

spike-liu commented 2 years ago

Great work!@spike-liu Thanks again for your contribution~

It is my pleasure to work with you:)

liuxiaocs7 commented 2 years ago

Hello, @spike-liu. I am new to Flink and Nebula, your work inspires me, I have a few questions about the implementation of flink sql as sink.

  1. I think now the table is a temporary table, and the table name is default_catalog.default_database.table_name, and in your implementation the table in flink sql create statement is the same as the vertex/edge name in nebula? If that's true, how to create two table in different nebula graph space with same name?
liuxiaocs7 commented 2 years ago
  1. I noticed that your example in sink table the dataType is all String, but if i create a source table with different dataType and select data from the table, i can't sink the data because datatype is incompatible. IT maybe not a question because I don't know how you use it in your work.
liuxiaocs7 commented 2 years ago

Looking forward for your reply, If this is really the problem, I want to try and refine it, because that's the job of OSPP. Thank you.

spike-liu commented 2 years ago

@liuxiaocs7 replied in https://github.com/vesoft-inc/nebula-flink-connector/issues/58

liuxiaocs7 commented 2 years ago

Thank you for your reply, I created a comment here but then I noticed that it was a closed pr, I don't know if you could see it in time, so it was also mentioned in #58 . Feel sorry.