apache / doris

Apache Doris is an easy-to-use, high performance and unified analytics database.
https://doris.apache.org
Apache License 2.0
12.79k stars 3.29k forks source link

[Bug] Cannot sink data to Doris(v1.1.1) with flink-doris-connector-1.14_2.12-1.1.0.jar #11590

Closed leeoo closed 2 years ago

leeoo commented 2 years ago

Search before asking

Version

Doris 1.1.1

What's Wrong?

Flink 1.14.5 + Doris 1.1.1 + Flink CDC Connector 2.2.1 + Dinky 0.6.6 + MySQL 8.0.29, would get exception "Could not perform checkpoint ..." when runing “MySQL CDC to Doris” FlinkSQL with flink-doris-connector-1.14_2.12-1.1.0.jar (Download it via https://mvnrepository.com/artifact/org.apache.doris/flink-doris-connector-1.14_2.12/1.1.0 ). 图片

But the job can be run correctly with flink-doris-connector-1.14_2.12-1.0.3.jar (Download it via https://mvnrepository.com/artifact/org.apache.doris/flink-doris-connector-1.14_2.12/1.0.3 ).

What You Expected?

Flink 1.14.5 + Doris 1.1.1 + Flink CDC Connector 2.2.1 + Dinky 0.6.6 + MySQL 8.0.29, “MySQL CDC to Doris” FlinkSQL job can be run correctly with flink-doris-connector-1.14_2.12-1.1.0.jar .

How to Reproduce?

Env: Flink 1.14.5 + Doris 1.1.1 + Flink CDC Connector 2.2.1 + Dinky 0.6.6 + MySQL 8.0.29;

1) Refer to the last FlinkSQL job code, create source table on MySQL 8 and insert some rows.

2) Refer to the last FlinkSQL job code, create sink table on Doris 1.1.1.

3) Run below FlinkSQL job on Dinky 0.6.6.

-- enable checkpoint
SET 'execution.checkpointing.interval' = '10s';

DROP TABLE IF EXISTS test_flink_cdc;
CREATE TABLE test_flink_cdc ( 
  id INT, 
  name STRING,
  primary key(id)  NOT ENFORCED
) WITH ( 
    'connector' = 'mysql-cdc',
    'hostname' = 'xxx',
    'port' = '3306',
    'username' = 'xxx',
    'password' = 'xxx',
    'database-name' = 'bigdata',
    'table-name' = 'test_cdc'
);

DROP TABLE IF EXISTS doris_test_sink;
CREATE TABLE doris_test_sink (
   id INT,
   name STRING
) WITH (
    'connector' = 'doris',
    'fenodes' = 'xxx,
    'table.identifier' = 'foobar.doris_test',
    'username' = 'root',
    'password' = '123456',
    'sink.batch.interval'='1'
);

INSERT INTO doris_test_sink select id,name from test_flink_cdc

Anything Else?

No response

Are you willing to submit PR?

Code of Conduct

JNSimba commented 2 years ago

Hello, it seems that you are using version 1.1.0 of flink-doris-connector. This version needs to specify label and is required. image

leeoo commented 2 years ago

Hello, it seems that you are using version 1.1.0 of flink-doris-connector. This version needs to specify label and is required. image

Yes, your are right. Many thanks!