apache / flink-cdc

Flink CDC is a streaming data integration tool
https://nightlies.apache.org/flink/flink-cdc-docs-stable
Apache License 2.0
5.61k stars 1.9k forks source link

[Bug] Flink CDC Sqlserver #2448

Closed shuxian6688 closed 1 year ago

shuxian6688 commented 1 year ago

Search before asking

Flink version

1.17.1

Flink CDC version

2.4.1

Database and its version

sqlserver 2014

Minimal reproduce step

private static DataType convertFromColumn(Column column) {
    switch (column.jdbcType()) {
        case Types.CHAR:
        case Types.VARCHAR:
        case Types.NCHAR:
        case Types.NVARCHAR:
        case Types.STRUCT:
        case Types.CLOB:
            return DataTypes.STRING();
        case Types.BIGINT:
            return DataTypes.BIGINT();
        case Types.BLOB:
            return DataTypes.BYTES();
        case Types.INTEGER:
        case Types.SMALLINT:
        case Types.TINYINT:
            return DataTypes.INT();
        case Types.FLOAT:
        case Types.REAL:
        case Types.DOUBLE:
        case Types.NUMERIC:
        case Types.DECIMAL:
            return DataTypes.DECIMAL(column.length(), column.scale().orElse(0));
        case Types.DATE:
            return DataTypes.DATE();
        case Types.TIMESTAMP:
        case Types.TIMESTAMP_WITH_TIMEZONE:
            return column.length() >= 0
                    ? DataTypes.TIMESTAMP(column.length())
                    : DataTypes.TIMESTAMP();
        case Types.BOOLEAN:
            return DataTypes.BOOLEAN();
        default:
            throw new UnsupportedOperationException(
                    String.format(
                            "Don't support SqlSever type '%s' yet, jdbcType:'%s'.",
                            column.typeName(), column.jdbcType()));
    }
}

What did you expect to see?

Test success.

What did you see instead?

Test run

Anything else?

Caused by: java.lang.UnsupportedOperationException: Don't support SqlSever type 'bigint identity' yet, jdbcType:'-5'. at com.ververica.cdc.connectors.sqlserver.source.utils.SqlServerTypeUtils.convertFromColumn(SqlServerTypeUtils.java:75) ~[flink-sql-connector-sqlserver-cdc-2.4.1.jar:2.4.1] at com.ververica.cdc.connectors.sqlserver.source.utils.SqlServerTypeUtils.fromDbzColumn(SqlServerTypeUtils.java:31) ~[flink-sql-connector-sqlserver-cdc-2.4.1.jar:2.4.1] at com.ververica.cdc.connectors.sqlserver.source.dialect.SqlServerChunkSplitter.fromDbzColumn(SqlServerChunkSplitter.java:144) ~[flink-sql-connector-sqlserver-cdc-2.4.1.jar:2.4.1] at com.ververica.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter.isEvenlySplitColumn(JdbcSourceChunkSplitter.java:118) ~[flink-sql-connector-oracle-cdc-2.4.1.jar:2.4.1] at com.ververica.cdc.connectors.sqlserver.source.dialect.SqlServerChunkSplitter.splitTableIntoChunks(SqlServerChunkSplitter.java:194) ~[flink-sql-connector-sqlserver-cdc-2.4.1.jar:2.4.1] at com.ververica.cdc.connectors.sqlserver.source.dialect.SqlServerChunkSplitter.generateSplits(SqlServerChunkSplitter.java:95) ~[flink-sql-connector-sqlserver-cdc-2.4.1.jar:2.4.1] at com.ververica.cdc.connectors.base.source.assigner.SnapshotSplitAssigner.getNext(SnapshotSplitAssigner.java:178) ~[flink-sql-connector-oracle-cdc-2.4.1.jar:2.4.1] at com.ververica.cdc.connectors.base.source.assigner.HybridSplitAssigner.getNext(HybridSplitAssigner.java:137) ~[flink-sql-connector-oracle-cdc-2.4.1.jar:2.4.1] at com.ververica.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator.assignSplits(IncrementalSourceEnumerator.java:174) ~[flink-sql-connector-oracle-cdc-2.4.1.jar:2.4.1] at com.ververica.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator.handleSplitRequest(IncrementalSourceEnumerator.java:97) ~[flink-sql-connector-oracle-cdc-2.4.1.jar:2.4.1] at org.apache.flink.runtime.source.coordinator.SourceCoordinator.handleRequestSplitEvent(SourceCoordinator.java:557) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$3(SourceCoordinator.java:284) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$10(SourceCoordinator.java:458) ~[flink-dist-1.17.1.jar:1.17.1] ... 8 more

Are you willing to submit a PR?

GOODBOY008 commented 1 year ago

@shuxian6688 Had fixed in master , just compile on your local.