apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.03k stars 1.82k forks source link

[Bug] [SqlServer-CDC] STREAMING job with "startup.mode = earliest" failed #7398

Closed xiaohundun closed 2 months ago

xiaohundun commented 3 months ago

Search before asking

What happened

STREAMING job with "startup.mode = earliest" failed

LOG:

2024-08-14 11:02:13,015 ERROR [i.d.p.ErrorHandler            ] [debezium-reader-0] - Producer failure
java.lang.ArrayIndexOutOfBoundsException: Index 1 out of bounds for length 1
        at io.debezium.connector.sqlserver.Lsn.compareTo(Lsn.java:149) ~[?:?]
        at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:228) ~[?:?]
        at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:150) ~[?:?]
        at org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.reader.fetch.transactionlog.SqlServerTransactionLogFetchTask$TransactionLogSplitReadTask.execute(SqlServerTransactionLogFetchTask.java:168) ~[?:?]
        at org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.reader.fetch.transactionlog.SqlServerTransactionLogFetchTask.execute(SqlServerTransactionLogFetchTask.java:75) ~[?:?]
        at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:106) ~[?:?]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
        at java.lang.Thread.run(Thread.java:840) [?:?]

SeaTunnel Version

2.3.6

SeaTunnel Config

seatunnel:
  engine:
    classloader-cache-mode: true
    history-job-expire-minutes: 1440
    backup-count: 1
    queue-type: blockingqueue
    print-execution-info-interval: 60
    print-job-metrics-info-interval: 60
    slot-service:
      dynamic-slot: true
    checkpoint:
      interval: 300000
      timeout: 60000
      storage:
        type: hdfs
        max-retained: 3
        plugin-config:
          namespace: /tmp/seatunnel/checkpoint_snapshot
          storage.type: hdfs
          fs.defaultFS: file:///tmp/ # Ensure that the directory has written permission

Running Command

submit job

{
    "env": {
        "job.name": "dbo_INV_Invtory_Batch_reverse",
        "job.mode": "STREAMING",
        "parallelism": 1,
        "checkpoint.interval": 300000
    },
    "source": [
        {
            "plugin_name": "SqlServer-CDC",
            "exactly_once": "true",
            "result_table_name": "dbo_INV_Invtory_Batch_reverse",
            "username": "sa",
            "password": "",
            "startup.mode": "earliest",
            "database-names": [
                "gemini"
            ],
            "table-names": [
                "gemini.dbo.INV_Invtory_Batch"
            ],
            "table-names-config": [
                {
                    "table": "gemini.dbo.INV_Invtory_Batch",
                    "primaryKeys": [
                        "id"
                    ]
                }
            ],
            "base-url": "jdbc:sqlserver://?:?;encrypt=true;trustServerCertificate=true;databaseName=?;"
        }
    ],
    "sink": [
        {
            "plugin_name": "Jdbc",
            "source_table_name": "dbo_INV_Invtory_Batch_reverse",
            "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
            "url": "jdbc:sqlserver://?:?;encrypt=true;trustServerCertificate=true;databaseName=?;",
            "user": "?",
            "password": "?",
            "generate_sink_sql": "true",
            "database": "?",
            "table": "dbo.INV_Invtory_Batch_reverse",
            "primary_keys": [
                "id"
            ]
        }
    ]
}

Error Exception

Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 1 out of bounds for length 1
        at io.debezium.connector.sqlserver.Lsn.compareTo(Lsn.java:149) ~[?:?]
        at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:228) ~[?:?]
        at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:150) ~[?:?]
        at org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.reader.fetch.transactionlog.SqlServerTransactionLogFetchTask$TransactionLogSplitReadTask.execute(SqlServerTransactionLogFetchTask.java:168) ~[?:?]
        at org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.reader.fetch.transactionlog.SqlServerTransactionLogFetchTask.execute(SqlServerTransactionLogFetchTask.java:75) ~[?:?]
        at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:106) ~[?:?]
        ... 5 more

Zeta or Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

xiaohundun commented 3 months ago
image
liunaijie commented 3 months ago

link same type pr https://github.com/apache/seatunnel/pull/7381

xiaohundun commented 3 months ago

link same type pr #7381

Thank you for replying. I think these two issues are of different types. In SQL Server CDC earliest mode, the IncrementalSplitAssigner generates a startupOffset of 00 in the org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.offset.LsnOffsetFactory, which leads to an error when comparing LSNs in SqlServerStreamingChangeEventSource#Line228.

xiaohundun commented 3 months ago

Can someone take a look at this issue~

xiaohundun commented 3 months ago

Another small issue is that the source code of startup.mode differs from both the description and the documentation on the official website.

image image