apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.63k stars 1.69k forks source link

[Bug] [SQLServer CDC2Doris] Data write error #6964

Open trhlxc opened 1 month ago

trhlxc commented 1 month ago

Search before asking

What happened

CDC monitors two tables, a piece of data is inserted into table B in sqlserver, and both table A and table B are written in doris

SeaTunnel Version

2.3.5

SeaTunnel Config

env {
  # You can set engine configuration here
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {

  SqlServer-CDC {
    result_table_name = "Source1"
    username = "xxx"
    password = "xxx@read"
    startup.mode = "initial"
    incremental.parallelism = "1"
    database-names = ["iplant"]
    table-names = ["iplant.dbo.test","iplant.dbo.test2"]
    base-url = "jdbc:sqlserver://xxxxx:1433;databaseName=iplant"
    }

}

sink {
    Doris {
        # Doris 连接信息
        fenodes = "xxxx:8030"
        username = root
        password = ""
        database = "ODS"
        table = "ODS_TEST_${table_name}_CDC"
        source_table_name = ["Source1"]

        # 其他通用配置
        sink.enable-delete = "true"
        schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
        data_save_mode = "APPEND_DATA"
        sink.label-prefix = "test_onetable"
        sink.enable-2pc = "true"

        # Doris 数据格式配置
        doris.config {
        format = "json"
        read_json_by_line = "true"
        }
    }  
}
# source ddl
CREATE TABLE iplant.dbo.test (
    item varchar(50) COLLATE Chinese_PRC_CI_AI NOT NULL,
    createdate datetime NULL,
    createby varchar(50) COLLATE Chinese_PRC_CI_AI NULL,
    [date] datetime NULL,
    CONSTRAINT PK_test PRIMARY KEY (item)
);
CREATE TABLE iplant.dbo.test2 (
    item varchar(50) COLLATE Chinese_PRC_CI_AI NOT NULL,
    createdate datetime NULL,
    createby varchar(50) COLLATE Chinese_PRC_CI_AI NULL,
    [date] datetime NULL,
    CONSTRAINT PK_test2 PRIMARY KEY (item)
);

##sink ddl
CREATE TABLE ODS.ODS_TEST_test_CDC (
    item varchar(20) not NULL,
    createdate datetime DEFAULT NULL,
    createby varchar(50) DEFAULT NULL,
    d_date datetime DEFAULT NULL
)
ENGINE=OLAP
UNIQUE KEY(item)
COMMENT "测试"
DISTRIBUTED BY HASH(`item`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"function_column.sequence_col" = 'd_date',
"in_memory" = "false",
"storage_format" = "V2"
);
CREATE TABLE ODS.ODS_TEST_test2_CDC (
    item varchar(20) not NULL,
    createdate datetime DEFAULT NULL,
    createby varchar(50) DEFAULT NULL,
    d_date datetime DEFAULT NULL
)
ENGINE=OLAP
UNIQUE KEY(item)
COMMENT "测试2"
DISTRIBUTED BY HASH(`item`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"function_column.sequence_col" = 'd_date',
"in_memory" = "false",
"storage_format" = "V2"
);

## insert 
insert into iplant.dbo.test2 (item,createdate,createby,date)
VALUES 
('1006', '2024-06-07', 'Jane Smith','2024-06-09');

Running Command

./bin/seatunnel.sh --config ./job/sqlserverTest1.config

Error Exception

sink can't find the target table

Zeta or Flink or Spark Version

zate

Java or Scala Version

java1.8

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

EricJoy2048 commented 1 month ago

Can you update table = "ODS_TEST_${table_name}_CDC" to table = "ODS.ODS_TEST_${table_name}_CDC"

trhlxc commented 3 weeks ago

@EricJoy2048 You can't use it like that. Snipaste_2024-06-25_16-50-26 Please test the bug.