vesoft-inc / nebula-flink-connector

Flink Connector for Nebula Graph
48 stars 30 forks source link

Is there is any example or a guide for flink sql? #88

Closed lichunown closed 1 year ago

lichunown commented 1 year ago

General Question

I have compiled this nebula-flink-connector (v3.5) to a jar package follows 编译 Nebula Flink Connector, and put it into [flink_server_path]/lib. Then, I try to execute the sql code from flink/bin/sql-client.sh.

[In nebula], I have a schema as follows:

> desc tag user;
| Field         | Type                | Null  | Default      | Comment 
+---------------+---------------------+-------+--------------+---------
| "name"        | "fixed_string(100)" | "NO"  |           | "姓名"          |
| "sex"            | "int8"           | "YES" | __NULL__  | "性别 0男;1女"
| "auth_flag"   | "int8"           | "NO"      | 0         | "是否注册认证 1认证;0非认证"  |
| "wx_open_id"  | "fixed_string(32)"  | "YES" | __NULL__ | "微信openID"

[In Flink], I tried to connect the nebula server vias the following codes:

CREATE TABLE t_user_info (
    `name` VARCHAR(100),
    `auth_flag` INT,
    `wx_open_id` VARCHAR(32)
  ) WITH (
    'connector' = 'nebula',
    'meta-address' = '127.0.0.1:9559',
    'graph-address' = '127.0.0.1:9669',
    'username' = 'root',
    'password' = 'nebula',
    'graph-space' = 'jmt',
    'label-name' = 'user'
  );

However, where I tried to select the data, it raise NullPointerException:

Flink SQL> select * from  t_user_info; 
[ERROR] Could not execute SQL statement. Reason:
java.lang.NullPointerException

By the way, how can I insert data into nebula? I have tried with #57. But the write-mode doesn't support.

So, is there any guide or an examples?

eg:

how to access the vertex or edges? And whether the attribute id(vertex) effects the results?

wey-gu commented 1 year ago

Could you please take a look at this question? @Nicole00 thanks!

Nicole00 commented 1 year ago

@lichunown please refer https://github.com/vesoft-inc/nebula-flink-connector/blob/24a751463f2db0c045a83aec2408e3301aa5b146/connector/src/test/java/org/apache/flink/connector/nebula/sink/AbstractNebulaOutputFormatITTest.java#L112

lichunown commented 1 year ago

@lichunown please refer

https://github.com/vesoft-inc/nebula-flink-connector/blob/24a751463f2db0c045a83aec2408e3301aa5b146/connector/src/test/java/org/apache/flink/connector/nebula/sink/AbstractNebulaOutputFormatITTest.java#L112

Thanks, I found the sulotions.


Here are my summaries.

Taking the nba space as an exmple.

For vertex data, a vid should be added at the begining to indicate the id(vertex) field.

CREATE TABLE nba_player (
      `vid` VARCHAR(32),
      `age` BIGINT,
          `name` VARCHAR(100)
  ) WITH (
    'connector' = 'nebula',
    'meta-address' = '127.0.0.1:9559',
    'graph-address' = '127.0.0.1:9669',
    'username' = 'root',
    'password' = 'nebula',
    'graph-space' = 'nba',
    'data-type' = 'vertex',
    'label-name' = 'player'
  );

For edge data, three field should be defined to indicate the src, dst, and rank of edges.

CREATE TABLE nba_like (
    `sid` VARCHAR(32),
    `did` VARCHAR(32),
    `rid`  BIGINT,
    `likeness` BIGINT
  ) WITH (
    'connector' = 'nebula',
    'meta-address' = '127.0.0.1:9559',
    'graph-address' = '127.0.0.1:9669',
    'username' = 'root',
    'password' = 'nebula',
    'graph-space' = 'nba',
    'data-type' = 'edge',
    'label-name' = 'like',
    'src-id-index' = '0',
    'dst-id-index' = '1',
    'rank-id-index' = '2'
  );

Then, I can use sql to operate the database.

select * from nba_player;
select * from nba_like;

insert into nba_player values ('awtf', 18, 'awtf'); 
insert into nba_player values ('aaaaaaaaaaaa', 18, 'aaaaaaaaaaaaaaaa'); 
insert into nba_like values ('awtf', 'aaaaaaaaaaaa', 0, 99);