apache / flink-cdc

Flink CDC is a streaming data integration tool
https://nightlies.apache.org/flink/flink-cdc-docs-stable
Apache License 2.0
5.64k stars 1.91k forks source link

[Failed to perform CDC of SQL Server memory optimized table] #2585

Closed Tandoy closed 11 months ago

Tandoy commented 11 months ago

Search before asking

Flink version

1.13.6

Flink CDC version

2.4.1

Database and its version

SQL server 2016

Minimal reproduce step

    public static void main(String[] args) throws Exception {
        SqlServerSourceBuilder.SqlServerIncrementalSource<String> sqlServerSource =
                new SqlServerSourceBuilder()
                        .hostname("")
                        .port(1433)
                        .databaseList("")
                        .tableList("") // **_This table is a memory-optimized table_**
                        .username("")
                        .password("")
                        .deserializer(new JsonDebeziumDeserializationSchema())
                        .startupOptions(StartupOptions.initial())
                        .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // enable checkpoint
        env.enableCheckpointing(3000);
        // set the source parallelism to 2
        env.fromSource(
                        sqlServerSource,
                        WatermarkStrategy.noWatermarks(),
                        "SqlServerIncrementalSource")
                .setParallelism(2)
                .print()
                .setParallelism(1);

        env.execute("Print SqlServer Snapshot + Change Stream");
    }

What did you expect to see?

Data collection successful

What did you see instead?

Data collection failed

Anything else?

Is there any method or initialization parameter setting for Flink-CDC to set the isolation sector? Similar to the link below https://learn.microsoft.com/zh-cn/sql/relational-databases/in-memory-oltp/transactions-with-memory-optimized-tables?view=sql-server-ver16

Are you willing to submit a PR?

Tandoy commented 11 months ago

Later, the configuration parameter 'debezium.snapshot.isolation.mode' = 'snapshot' of Flink Sql was used instead. The same error will also be reported

Tandoy commented 11 months ago

WITH(SNAPSHOT) Adding this hits after the table name can solve this problem