vesoft-inc / nebula-flink-connector

Flink Connector for Nebula Graph
49 stars 30 forks source link

support nebula source for flink sql connector #58

Closed Nicole00 closed 2 years ago

Nicole00 commented 2 years ago

as title

liuxiaocs7 commented 2 years ago

/assign

spike-liu commented 2 years ago

@Nicole00 An implementation for nebula as source has been pushed as follow:

https://github.com/vesoft-inc/nebula-flink-connector/pull/57/commits/b847b5bed5b190f5a376635bac25dd80cb3a2708

liuxiaocs7 commented 2 years ago

This issue is created to better track for OSPP 2022.

Connector Options

Options Required Default Type Description
meta-address required none String The nebula meta server address.
graph-address required none String The nebula graph server address.
username required none String The nebula server name.
password required none String The nebula server password.
graph-space optional none String The nebula graph space name.
label-name optional none String The nebula graph space label name.
data-type optional none Enum The nebula graph data type.
timeout optional 1000ms Integer The nebula execute timeout duration.
src-id-index optional -1 Integer The nebula execute edge src index.
dst-id-index optional -1 Integer The nebula execute edge dst index.
rank-id-index optional -1 Integer The nebula execute edge rank index.

Data Type Mapping

Flink Type Nebula Type
CHAR FIXED_STRING/GEOGRAPHY
VARCHAR STRING
STRING STRING
BOOLEAN BOOL
TINYINT INT8
SMALLINT INT16
INTEGER INT32
BIGINT INT64/INT/TIMESTAMP
FLOAT FLOAT
DOUBLE DOUBLE
DATE DATE
TIME TIME
TIMESTAMP DATETIME
TIMESTAMP_LTZ DATETIME
BYTES Not supported
DECIMAL Not supported
INTERVAL_YEAR_MONTH Not supported
INTERVAL_DAY_TIME Not supported
ARRAY Not supported
MAP Not supported
ROW Not supported
MULTISET Not supported
RAW Not supported

more information coming soon.

spike-liu commented 2 years ago

PR is committed as https://github.com/vesoft-inc/nebula-flink-connector/pull/67

liuxiaocs7 commented 2 years ago

Hello, this pr #60 has already includes nebula source for flink sql connector, but it doesn't seem to be compatible with the current sink function implementation(#57), i will try to fix it.

liuxiaocs7 commented 2 years ago

Hello, @spike-liu. I am new to Flink and Nebula, your work(#57) inspires me, I have a few questions about the implementation of flink sql as sink.

  1. I think the table created by flink sql is a temporary table now, and the table name is default_catalog.default_database.table_name, and in your implementation the table name in flink sql create statement is the same as the vertex/edge name in nebula? If that's true, how to create two tables in flink sql from different nebula graph spaces with same name?

for example:

Shoule we add a parameter in with clause?

At the same time, I noticed the listTables function in NebulaCatalog, Tag and Edge, tag starts with VERTEX. and edge starts with EDGE., Should we be compatible with this table name design if we want to use our own catalog?

  1. I noticed that your example in sink table the dataType is all String, but if i create a source table(eg: upstream from jdbc) with different dataType(create table xxx col1 int, col2 date, col3 time...) and select data from this table, i can't sink the data because datatype is incompatible.

Should we customize type conversions instead of using the internal Rowdata to Row?

Looking forward for your reply, thank you.

spike-liu commented 2 years ago

Hello, this pr #60 has already includes nebula source for flink sql connector, but it doesn't seem to be compatible with the current sink function implementation(#57), i will try to fix it.

Oops, this is awkward. It seems there are duplicated work here. Anyway, first come and first served. #67 has been closed. Go ahead, @liuxiaocs7 .

spike-liu commented 2 years ago

Hello, @liuxiaocs7 . As a matter of fact, we are also using Flink in our project recently and glad to discuss these details with you.

For question 1: I agree with you. Would you please create an issue to track this enhancement? For question 2: Would you please check Flink native data type cast as below? I am not sure if this could resolve incompatible issue you mention above.

https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/types/#casting

If not, I would think there are two recipes to resolve data type cast issue: 1. put them in FlinkSQL. 2. put them in nebula-flink-connector. If these casts are specific for nebula, I would prefer recipe 2. Otherwise recipe 1 would be better because it is shared among all connectors, like mysql, kafka, hbase and etc.

spike-liu commented 2 years ago

@liuxiaocs7 just a friendly suggestion, how about creating separate issues for discussion in the future? Your question is valuable for us I think.

liuxiaocs7 commented 2 years ago

@liuxiaocs7 just a friendly suggestion, how about creating separate issues for discussion in the future? Your question is valuable for us I think.

Thanks for your suggestion, I'm going to create a new issue to discuss this question. And develop the habit of discussing only one question in an issue.

liuxiaocs7 commented 2 years ago

Hello, this pr #60 has already includes nebula source for flink sql connector, but it doesn't seem to be compatible with the current sink function implementation(#57), i will try to fix it.

Oops, this is awkward. It seems there are duplicated work here. Anyway, first come and first served. #67 has been closed. Go ahead, @liuxiaocs7 .

I'll try my best to get it done, thanks a lot for your help.

liuxiaocs7 commented 2 years ago

Hello, @liuxiaocs7 . As a matter of fact, we are also using Flink in our project recently and glad to discuss these details with you.

For question 1: I agree with you. Would you please create an issue to track this enhancement? For question 2: Would you please check Flink native data type cast as below? I am not sure if this could resolve incompatible issue you mention above.

https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/types/#casting

If not, I would think there are two recipes to resolve data type cast issue: 1. put them in FlinkSQL. 2. put them in nebula-flink-connector. If these casts are specific for nebula, I would prefer recipe 2. Otherwise recipe 1 would be better because it is shared among all connectors, like mysql, kafka, hbase and etc.

@spike-liu , sorry for the late reply, now we can discuss question 1 in #70. welcome to discuss with us there. As for question 2, I will learn about data types and casting above first, maybe it also needs a new issue.