apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.03k stars 1.82k forks source link

[SqlServer-CDC]当字段数量超过128,无法正常cdc #6658

Open bulolo opened 7 months ago

bulolo commented 7 months ago

Search before asking

What happened

当字段数量超过128,无法正常cdc, 127,包含127字段数,正常CDC

SeaTunnel Version

2.3.4

SeaTunnel Config

env {
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  SqlServer-CDC {
    username = "sa"
    password = "XXX"
    startup.mode="initial"
    exactly_once=true
    database-names = ["JUNYAO_TEST"]
    table-names = ["JUNYAO_TEST.wmwhse2.ORDERDETAIL", "JUNYAO_TEST.wmwhse3.ORDERDETAIL"]
    base-url = "jdbc:sqlserver://192.168.103.XXX:1433;databaseName=JUNYAO_TEST"
  }
}

sink {
  StarRocks {
    nodeUrls=[
        "192.168.103.XXX:8040"
    ]
    batch_max_rows = 1024
    table="ORDERDETAIL"
    database="scm"
    base-url="jdbc:mysql://192.168.103.XXX:9030"
    password="XXX"
    username="root"
    enable_upsert_delete = true
    schema_save_mode="ERROR_WHEN_SCHEMA_NOT_EXIST"
    data_save_mode="DROP_DATA"
  }
}

Running Command

./bin/seatunnel.sh --config ./example/wms-sqlserver2sr-orderdetail -e local

Error Exception

Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:199)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: com.hazelcast.nio.serialization.HazelcastSerializationException: java.lang.NegativeArraySizeException
        at com.hazelcast.internal.serialization.impl.SerializationUtil.handleException(SerializationUtil.java:111)
        at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:274)
        at com.hazelcast.spi.impl.NodeEngineImpl.toObject(NodeEngineImpl.java:385)
        at com.hazelcast.collection.impl.queue.QueueProxyImpl.drainTo(QueueProxyImpl.java:134)
        at org.apache.seatunnel.engine.server.task.flow.ShuffleSourceFlowLifeCycle.collect(ShuffleSourceFlowLifeCycle.java:94)
        at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
        at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78)
        at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:643)
        at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:944)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.NegativeArraySizeException
        at org.apache.seatunnel.api.table.type.SeaTunnelRow.<init>(SeaTunnelRow.java:38)
        at org.apache.seatunnel.engine.server.serializable.RecordSerializer.read(RecordSerializer.java:75)
        at org.apache.seatunnel.engine.server.serializable.RecordSerializer.read(RecordSerializer.java:33)
        at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44)
        at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:268)
        ... 13 more

        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:191)
        ... 2 more

Zeta or Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

NickCodeJourney commented 7 months ago

Provide the table‘s ddl

bulolo commented 7 months ago

Provide the table‘s ddl


-- Table structure for ORDERDETAIL


IF EXISTS (SELECT * FROM sys.all_objects WHERE object_id = OBJECT_ID(N'[wmwhse2].[ORDERDETAIL]') AND type IN ('U')) DROP TABLE [wmwhse2].[ORDERDETAIL] GO

CREATE TABLE [wmwhse2].[ORDERDETAIL] ( [SERIALKEY] int IDENTITY(1,1) NOT NULL, [WHSEID] nvarchar(30) COLLATE Chinese_PRC_CI_AS DEFAULT user_name() NULL, [ORDERKEY] nvarchar(10) COLLATE Chinese_PRC_CI_AS NOT NULL, [ORDERLINENUMBER] nvarchar(5) COLLATE Chinese_PRC_CI_AS NOT NULL, [ORDERDETAILSYSID] int NULL, [EXTERNORDERKEY] nvarchar(32) COLLATE Chinese_PRC_CI_AS DEFAULT '' NOT NULL, [EXTERNLINENO] nvarchar(20) COLLATE Chinese_PRC_CI_AS DEFAULT '' NOT NULL, [SKU] nvarchar(50) COLLATE Chinese_PRC_CI_AS DEFAULT '' NOT NULL, [STORERKEY] nvarchar(15) COLLATE Chinese_PRC_CI_AS DEFAULT '' NOT NULL, [MANUFACTURERSKU] nvarchar(50) COLLATE Chinese_PRC_CI_AS DEFAULT '' NOT NULL, [RETAILSKU] nvarchar(50) COLLATE Chinese_PRC_CI_AS DEFAULT '' NOT NULL, [ALTSKU] nvarchar(50) COLLATE Chinese_PRC_CI_AS DEFAULT '' NOT NULL, [ORIGINALQTY] decimal(22,5) DEFAULT 0 NOT NULL, [OPENQTY] decimal(22,5) DEFAULT 0 NOT NULL, [SHIPPEDQTY] decimal(22,5) DEFAULT 0 NOT NULL, [ADJUSTEDQTY] decimal(22,5) DEFAULT 0 NOT NULL, [QTYPREALLOCATED] decimal(22,5) DEFAULT 0 NOT NULL, [QTYALLOCATED] decimal(22,5) DEFAULT 0 NOT NULL, [QTYPICKED] decimal(22,5) DEFAULT 0 NOT NULL, [UOM] nvarchar(10) COLLATE Chinese_PRC_CI_AS DEFAULT '' NOT NULL, [PACKKEY] nvarchar(50) COLLATE Chinese_PRC_CI_AS DEFAULT 'STD' NOT NULL, [PICKCODE] nvarchar(10) COLLATE Chinese_PRC_CI_AS DEFAULT '' NOT NULL, [CARTONGROUP] nvarchar(10) COLLATE Chinese_PRC_CI_AS DEFAULT '' NOT NULL, [LOT] nvarchar(10) COLLATE Chinese_PRC_CI_AS DEFAULT '' NOT NULL, [ID] nvarchar(50) COLLATE Chinese_PRC_CI_AS DEFAULT '' NOT NULL, [FACILITY] nvarchar(20) COLLATE Chinese_PRC_CI_AS DEFAULT '' NOT NULL, [STATUS] nvarchar(10) COLLATE Chinese_PRC_CI_AS DEFAULT '02' NOT NULL, [UNITPRICE] float(53) DEFAULT 0 NOT NULL, [TAX01] float(53) DEFAULT 0 NOT NULL, [TAX02] float(53) DEFAULT 0 NOT NULL, [EXTENDEDPRICE] float(53) DEFAULT 0 NOT NULL, [UPDATESOURCE] nvarchar(10) COLLATE Chinese_PRC_CI_AS DEFAULT '0' NOT NULL, [LOTTABLE01] nvarchar(50) COLLATE Chinese_PRC_CI_AS DEFAULT '' NOT NULL, [LOTTABLE02] nvarchar(50) COLLATE Chinese_PRC_CI_AS DEFAULT '' NOT NULL, [LOTTABLE03] nvarchar(50) COLLATE Chinese_PRC_CI_AS DEFAULT '' NOT NULL, [LOTTABLE04] datetime NULL, [LOTTABLE05] datetime NULL, [LOTTABLE06] nvarchar(50) COLLATE Chinese_PRC_CI_AS DEFAULT '' NOT NULL, [LOTTABLE07] nvarchar(50) COLLATE Chinese_PRC_CI_AS DEFAULT '' NOT NULL, [LOTTABLE08] nvarchar(50) COLLATE Chinese_PRC_CI_AS DEFAULT '' NOT NULL, [LOTTABLE09] nvarchar(50) COLLATE Chinese_PRC_CI_AS DEFAULT '' NOT NULL, [LOTTABLE10] nvarchar(50) COLLATE Chinese_PRC_CI_AS DEFAULT '' NOT NULL, [EFFECTIVEDATE] datetime NULL, [FORTE_FLAG] nvarchar(6) COLLATE Chinese_PRC_CI_AS DEFAULT 'I' NOT NULL, [TARIFFKEY] nvarchar(10) COLLATE Chinese_PRC_CI_AS NULL, [SUSR1] nvarchar(30) COLLATE Chinese_PRC_CI_AS NULL, [SUSR2] nvarchar(30) COLLATE Chinese_PRC_CI_AS NULL, [SUSR3] nvarchar(30) COLLATE Chinese_PRC_CI_AS NULL, [SUSR4] nvarchar(30) COLLATE Chinese_PRC_CI_AS NULL, [SUSR5] nvarchar(30) COLLATE Chinese_PRC_CI_AS NULL, [NOTES] nvarchar(2000) COLLATE Chinese_PRC_CI_AS NULL, [WORKORDERKEY] nvarchar(10) COLLATE Chinese_PRC_CI_AS DEFAULT '' NULL, [ALLOCATESTRATEGYKEY] nvarchar(10) COLLATE Chinese_PRC_CI_AS NULL, [PREALLOCATESTRATEGYKEY] nvarchar(10) COLLATE Chinese_PRC_CI_AS NULL, [ALLOCATESTRATEGYTYPE] nvarchar(10) COLLATE Chinese_PRC_CI_AS NULL, [SKUROTATION] nvarchar(10) COLLATE Chinese_PRC_CI_AS DEFAULT 'Lot' NULL, [SHELFLIFE] int DEFAULT 0 NULL, [ROTATION] nvarchar(10) COLLATE Chinese_PRC_CI_AS NULL, [PALLET_ID] nvarchar(50) COLLATE Chinese_PRC_CI_AS DEFAULT '' NULL, [SUB_FLAG] nvarchar(1) COLLATE Chinese_PRC_CI_AS DEFAULT '' NULL, [PRODUCT_WEIGHT] float(53) DEFAULT 0 NOT NULL, [PRODUCT_CUBE] float(53) DEFAULT 0 NOT NULL, [ORIGCASEQTY] decimal(22,5) DEFAULT 0 NOT NULL, [ORIGPALLETQTY] decimal(22,5) DEFAULT 0 NOT NULL, [OKTOSUBSTITUTE] int DEFAULT 0 NOT NULL, [ISSUBSTITUTE] int DEFAULT 0 NOT NULL, [ORIGINALSKU] nvarchar(50) COLLATE Chinese_PRC_CI_AS NULL, [ORIGINALLINENUMBER] nvarchar(5) COLLATE Chinese_PRC_CI_AS NULL, [SHIPGROUP01] nvarchar(1) COLLATE Chinese_PRC_CI_AS DEFAULT 'N' NOT NULL, [SHIPGROUP02] nvarchar(1) COLLATE Chinese_PRC_CI_AS DEFAULT 'N' NOT NULL, [SHIPGROUP03] nvarchar(1) COLLATE Chinese_PRC_CI_AS DEFAULT 'N' NOT NULL, [ACTUALSHIPDATE] datetime NULL, [INTERMODALVEHICLE] nvarchar(10) COLLATE Chinese_PRC_CI_AS NULL, [PICKINGINSTRUCTIONS] nvarchar(255) COLLATE Chinese_PRC_CI_AS NULL, [CARTONBREAK] nvarchar(10) COLLATE Chinese_PRC_CI_AS DEFAULT '' NULL, [CARTONQTYBREAK] int NULL, [QTYINTRANSIT] decimal(22,5) DEFAULT 0 NOT NULL, [OPPREQUEST] nvarchar(1) COLLATE Chinese_PRC_CI_AS DEFAULT 0 NULL, [WPRELEASED] nvarchar(1) COLLATE Chinese_PRC_CI_AS DEFAULT '0' NOT NULL, [EXTERNALLOT] nvarchar(100) COLLATE Chinese_PRC_CI_AS NULL, [BUYERPO] nvarchar(50) COLLATE Chinese_PRC_CI_AS NULL, [GenerateContainerDetail] nvarchar(1) COLLATE Chinese_PRC_CI_AS DEFAULT '0' NOT NULL, [LABELNAME] nvarchar(20) COLLATE Chinese_PRC_CI_AS NULL, [STDSSCCLABELNAME] nvarchar(20) COLLATE Chinese_PRC_CI_AS NULL, [STDGTINLABELNAME] nvarchar(20) COLLATE Chinese_PRC_CI_AS NULL, [RFIDSSCCLABELNAME] nvarchar(20) COLLATE Chinese_PRC_CI_AS NULL, [RFIDGTINLABELNAME] nvarchar(20) COLLATE Chinese_PRC_CI_AS NULL, [EXTERNALLOCSEQUENCE] nvarchar(20) COLLATE Chinese_PRC_CI_AS NULL, [MINSHIPPERCENT] int DEFAULT '0' NOT NULL, [LINETYPE] nvarchar(1) COLLATE Chinese_PRC_CI_AS NULL, [COMPONENTQTY] decimal(22,5) NULL, [COMPONENTREFERENCE] nvarchar(50) COLLATE Chinese_PRC_CI_AS NULL, [OQCREQUIRED] nvarchar(1) COLLATE Chinese_PRC_CI_AS DEFAULT '0' NOT NULL, [OQCAUTOADJUST] nvarchar(1) COLLATE Chinese_PRC_CI_AS DEFAULT '0' NOT NULL, [ORDERDETAILID] nvarchar(32) COLLATE Chinese_PRC_CI_AS NULL, [LOTTABLE11] datetime NULL, [LOTTABLE12] datetime NULL, [SOURCEVERSION] nvarchar(22) COLLATE Chinese_PRC_CI_AS NULL, [REFERENCETYPE] nvarchar(64) COLLATE Chinese_PRC_CI_AS NULL, [REFERENCEDOCUMENT] nvarchar(64) COLLATE Chinese_PRC_CI_AS NULL, [REFERENCELOCATION] nvarchar(64) COLLATE Chinese_PRC_CI_AS NULL, [REFERENCEVERSION] nvarchar(20) COLLATE Chinese_PRC_CI_AS NULL, [REFERENCELINE] nvarchar(20) COLLATE Chinese_PRC_CI_AS NULL, [CUBICMETER] decimal(22,5) DEFAULT 0 NULL, [HUNDREDWEIGHT] decimal(22,5) DEFAULT 0 NULL, [StageLoc] nvarchar(20) COLLATE Chinese_PRC_CI_AS NULL, [FULFILLQTY] decimal(22,5) DEFAULT 0 NOT NULL, [DAPICKSORT] nvarchar(1) COLLATE Chinese_PRC_CI_AS DEFAULT '1' NULL, [SHORTSHIPREASON] nvarchar(10) COLLATE Chinese_PRC_CI_AS NULL, [ORIGLAYERQTY] decimal(22,5) DEFAULT 0 NOT NULL, [ReferenceAccountingEntity] nvarchar(64) COLLATE Chinese_PRC_CI_AS NULL, [ReferenceScheduleLine] nvarchar(20) COLLATE Chinese_PRC_CI_AS NULL, [BASEORDERKEY] nvarchar(10) COLLATE Chinese_PRC_CI_AS NULL, [CONVERTEDQTY] decimal(22,5) NULL, [PROCESSEDQTY] decimal(22,5) NULL, [QTYTOPROCESS] decimal(22,5) NULL, [SEQUENCE] decimal(18) NULL, [REQUISITIONDOCUMENT] nvarchar(64) COLLATE Chinese_PRC_CI_AS NULL, [REQUISITIONACCOUNTINGENTITY] nvarchar(64) COLLATE Chinese_PRC_CI_AS NULL, [REQUISITIONLOCATION] nvarchar(64) COLLATE Chinese_PRC_CI_AS NULL, [REQUISITIONVERSION] nvarchar(20) COLLATE Chinese_PRC_CI_AS NULL, [REQUISITIONLINE] nvarchar(20) COLLATE Chinese_PRC_CI_AS NULL, [REQUISITIONSCHEDULELINE] nvarchar(20) COLLATE Chinese_PRC_CI_AS NULL, [PURCHASEORDERDOCUMENT] nvarchar(64) COLLATE Chinese_PRC_CI_AS NULL, [PURCHASEORDERACCOUNTINGENTITY] nvarchar(64) COLLATE Chinese_PRC_CI_AS NULL, [PURCHASEORDERLOCATION] nvarchar(64) COLLATE Chinese_PRC_CI_AS NULL, [PURCHASEORDERVERSION] nvarchar(20) COLLATE Chinese_PRC_CI_AS NULL, [PURCHASEORDERLINE] nvarchar(20) COLLATE Chinese_PRC_CI_AS NULL, [PURCHASEORDERSCHEDULELINE] nvarchar(20) COLLATE Chinese_PRC_CI_AS NULL, [SALESORDERDOCUMENT] nvarchar(64) COLLATE Chinese_PRC_CI_AS NULL, [SALESORDERACCOUNTINGENTITY] nvarchar(64) COLLATE Chinese_PRC_CI_AS NULL, [SALESORDERLOCATION] nvarchar(64) COLLATE Chinese_PRC_CI_AS NULL, [SALESORDERVERSION] nvarchar(20) COLLATE Chinese_PRC_CI_AS NULL, [SALESORDERLINE] nvarchar(20) COLLATE Chinese_PRC_CI_AS NULL, [SALESORDERSCHEDULELINE] nvarchar(20) COLLATE Chinese_PRC_CI_AS NULL, [IDREQUIRED] nvarchar(18) COLLATE Chinese_PRC_CI_AS NULL, [KITORDERASN] nvarchar(10) COLLATE Chinese_PRC_CI_AS NULL, [PALLETESTIMATE] decimal(22,5) NULL, [BACKFLUSHINDICATOR] nvarchar(1) COLLATE Chinese_PRC_CI_AS DEFAULT '0' NULL, [ALLOCLOTLIMIT] nvarchar(1) COLLATE Chinese_PRC_CI_AS NULL, [LASTSHIPPEDDATE] datetime NULL, [LABELSPRINTED] nvarchar(1) COLLATE Chinese_PRC_CI_AS DEFAULT '0' NOT NULL, [NEWALLOCATIONSTRATEGY] nvarchar(10) COLLATE Chinese_PRC_CI_AS NULL, [ITEMCLASS] nvarchar(10) COLLATE Chinese_PRC_CI_AS NULL, [RUNINQTY] decimal(22,5) DEFAULT 0 NULL, [RUNOUTQTY] decimal(22,5) DEFAULT 0 NULL, [FGQTYRECEIVED] decimal(22,5) DEFAULT 0 NULL, [RUNINUOM] nvarchar(10) COLLATE Chinese_PRC_CI_AS DEFAULT '' NULL, [RUNOUTUOM] nvarchar(10) COLLATE Chinese_PRC_CI_AS DEFAULT '' NULL, [EXTERNALSHIPREQUESTQTY] decimal(22,5) NULL, [BATCHORDERNUMBER] nvarchar(10) COLLATE Chinese_PRC_CI_AS NULL, [BATCHORDERLINE] nvarchar(5) COLLATE Chinese_PRC_CI_AS NULL, [ORIGORDERNUMBER] nvarchar(10) COLLATE Chinese_PRC_CI_AS NULL, [ORIGORDERLINE] nvarchar(5) COLLATE Chinese_PRC_CI_AS NULL, [ADDDATE] datetime DEFAULT getutcdate() NOT NULL, [ADDWHO] nvarchar(256) COLLATE Chinese_PRC_CI_AS DEFAULT user_name() NOT NULL, [EDITDATE] datetime DEFAULT getutcdate() NOT NULL, [EDITWHO] nvarchar(256) COLLATE Chinese_PRC_CI_AS DEFAULT user_name() NOT NULL, [rtxcrmlinenumber] varchar(15) COLLATE Chinese_PRC_CI_AS NULL, [rtxdocument] nvarchar(10) COLLATE Chinese_PRC_CI_AS NULL, [rtxsapso] nvarchar(20) COLLATE Chinese_PRC_CI_AS NULL, [rtxsapsoline] nvarchar(10) COLLATE Chinese_PRC_CI_AS NULL, [rtxuom] nvarchar(30) COLLATE Chinese_PRC_CI_AS NULL, [rtxumwrk] nvarchar(50) COLLATE Chinese_PRC_CI_AS NULL, [rtxumlog] nvarchar(50) COLLATE Chinese_PRC_CI_AS NULL, [RTXCRMORDERKEY] nvarchar(30) COLLATE Chinese_PRC_CI_AS NULL, [RTXROTATION1] nvarchar(10) COLLATE Chinese_PRC_CI_AS DEFAULT '1' NOT NULL, [RTXSKUROTATION1] nvarchar(10) COLLATE Chinese_PRC_CI_AS DEFAULT 'Lottable04' NOT NULL, [RTXROTATION2] nvarchar(10) COLLATE Chinese_PRC_CI_AS DEFAULT '1' NOT NULL, [RTXSKUROTATION2] nvarchar(10) COLLATE Chinese_PRC_CI_AS DEFAULT 'Lottable03' NOT NULL, [RTX_EDITFLAG_SAP] nvarchar(5) COLLATE Chinese_PRC_CI_AS DEFAULT '' NOT NULL, [RTX_QUEUE_SAP] int DEFAULT 0 NOT NULL, [RTX_DATE_SAP] date NULL, [RTXCOSTCENTER] nvarchar(50) COLLATE Chinese_PRC_CI_AS NULL, [rtxzshrq] numeric(22,5) NULL, [rtxcurrency] nvarchar(20) COLLATE Chinese_PRC_CI_AS NULL, [rtxtariff] nvarchar(20) COLLATE Chinese_PRC_CI_AS NULL, [rtxfreight] numeric(22,5) NULL, [RTXSAPPOLINE] nvarchar(20) COLLATE Chinese_PRC_CI_AS NULL, [RTXINVOICENUMBER] nvarchar(20) COLLATE Chinese_PRC_CI_AS NULL, [C_POSITION] varchar(20) COLLATE Chinese_PRC_CI_AS DEFAULT ' ' NULL, [rtxbusinessoperator] varchar(30) COLLATE Chinese_PRC_CI_AS NULL, [rtxboffice] varchar(50) COLLATE Chinese_PRC_CI_AS NULL, [rtxcrmline] varchar(30) COLLATE Chinese_PRC_CI_AS NULL ) GO

bulolo commented 7 months ago

Provide the table‘s ddl

确保是多张表,字段超过128,就会报错,我是将字段慢慢缩减到128才发现的,128字段内,2个表正常同步

bulolo commented 7 months ago

https://dev-files-1253767413.cos.ap-guangzhou.myqcloud.com/temp/ORDERDETAIL.sql https://dev-files-1253767413.cos.ap-guangzhou.myqcloud.com/temp/ORDERDETAIL2.sql

bulolo commented 7 months ago

https://dev-files-1253767413.cos.ap-guangzhou.myqcloud.com/temp/ORDERDETAIL.sql https://dev-files-1253767413.cos.ap-guangzhou.myqcloud.com/temp/ORDERDETAIL2.sql

这2个是我同步的表及数据内容

NickCodeJourney commented 7 months ago

What is the database version?

bulolo commented 7 months ago

What is the database version?

2019

Carl-Zhou-CN commented 7 months ago

@bulolo Can set result_table_name source_table_name Try it?

bulolo commented 7 months ago

@bulolo Can set result_table_name source_table_name Try it?

the same

env {
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  SqlServer-CDC {
    username = "sa"
    password = "Password!"
    startup.mode="initial"
    exactly_once=true
    result_table_name = "temp_orderdetail"
    database-names = ["SCPRD"]
    table-names = ["SCPRD.wmwhse2.ORDERDETAIL", "SCPRD.wmwhse3.ORDERDETAIL"]
    base-url = "jdbc:sqlserver://192.168.103.XXX:1433;databaseName=SCPRD"
  }
}

sink {
  StarRocks {
    source_table_name = "temp_orderdetail"
    nodeUrls=[
        "192.168.103.XXX:8040"
    ]
    batch_max_rows = 1024
    table="ORDERDETAIL"
    database="scm"
    base-url="jdbc:mysql://192.168.103.XXX:9030"
    password="XXX"
    username="root"
    enable_upsert_delete = true
    schema_save_mode="ERROR_WHEN_SCHEMA_NOT_EXIST"
    data_save_mode="DROP_DATA"
  }
}
2024-04-13 16:58:51,434 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
    at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:199)
    at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
    at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: com.hazelcast.nio.serialization.HazelcastSerializationException: java.lang.NegativeArraySizeException
    at com.hazelcast.internal.serialization.impl.SerializationUtil.handleException(SerializationUtil.java:111)
    at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:274)
    at com.hazelcast.spi.impl.NodeEngineImpl.toObject(NodeEngineImpl.java:385)
    at com.hazelcast.collection.impl.queue.QueueProxyImpl.drainTo(QueueProxyImpl.java:134)
    at org.apache.seatunnel.engine.server.task.flow.ShuffleSourceFlowLifeCycle.collect(ShuffleSourceFlowLifeCycle.java:94)
    at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73)
    at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
    at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78)
    at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:643)
    at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:944)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.NegativeArraySizeException
    at org.apache.seatunnel.api.table.type.SeaTunnelRow.<init>(SeaTunnelRow.java:38)
    at org.apache.seatunnel.engine.server.serializable.RecordSerializer.read(RecordSerializer.java:75)
    at org.apache.seatunnel.engine.server.serializable.RecordSerializer.read(RecordSerializer.java:33)
    at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44)
    at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:268)
    ... 13 more

    at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:191)
    ... 2 more

2024-04-13 16:58:51,434 ERROR [o.a.s.c.s.SeaTunnel           ] [main] -
===============================================================================

Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
    at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:199)
    at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
    at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: com.hazelcast.nio.serialization.HazelcastSerializationException: java.lang.NegativeArraySizeException
    at com.hazelcast.internal.serialization.impl.SerializationUtil.handleException(SerializationUtil.java:111)
    at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:274)
    at com.hazelcast.spi.impl.NodeEngineImpl.toObject(NodeEngineImpl.java:385)
    at com.hazelcast.collection.impl.queue.QueueProxyImpl.drainTo(QueueProxyImpl.java:134)
    at org.apache.seatunnel.engine.server.task.flow.ShuffleSourceFlowLifeCycle.collect(ShuffleSourceFlowLifeCycle.java:94)
    at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73)
    at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
    at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78)
    at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:643)
    at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:944)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.NegativeArraySizeException
    at org.apache.seatunnel.api.table.type.SeaTunnelRow.<init>(SeaTunnelRow.java:38)
    at org.apache.seatunnel.engine.server.serializable.RecordSerializer.read(RecordSerializer.java:75)
    at org.apache.seatunnel.engine.server.serializable.RecordSerializer.read(RecordSerializer.java:33)
    at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44)
    at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:268)
    ... 13 more

    at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:191)
    ... 2 more
bulolo commented 7 months ago

@Carl-Zhou-CN @NickCodeJourney @jbonofre 我发现问题了,是starrocks的问题 使用console 可以,也就是 starrocks字段数超过128无法写入。 但这里就更加奇怪了,单张表的时候,超过128是可以的,多张表超过128不行。

table-names = ["SCPRD.wmwhse2.ORDERDETAIL"] --- 超过128字段可以同步starrocks
table-names = ["SCPRD.wmwhse2.ORDERDETAIL", "SCPRD.wmwhse3.ORDERDETAIL"] ----超过128字段,无法同步starrocks

另外我有一个疑问,muitl-table 多table的时候不支持 Transform,那么 上游的 SqlServer-CDC 当只需要5个字段到下游starrocks,该如何做到?

Caused by: org.apache.seatunnel.engine.common.exception.JobDefineCheckException: Adding transform to multi-table source is not supported.
env {
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  SqlServer-CDC {
    username = "sa"
    password = "Password!"
    startup.mode="initial"
    exactly_once=true
    result_table_name = "temp_orderdetail"
    database-names = ["SCPRD"]
    table-names = ["SCPRD.wmwhse2.ORDERDETAIL", "SCPRD.wmwhse3.ORDERDETAIL"]
    base-url = "jdbc:sqlserver://192.168.103.113:1433;databaseName=SCPRD"
  }
}

transform {
  Sql {
    source_table_name = "temp_orderdetail"
    result_table_name = "temp_orderdetail2"
    query = "select ORDERKEY from temp_orderdetail"
  }
}

sink {
  console {
    source_table_name = "temp_orderdetail2"
  }
}
Carl-Zhou-CN commented 6 months ago

@bulolo When dealing with multiple tables, the source_table_name and result_table_name are used solely for indicating the upstream and downstream relationships. Therefore, in the SQL Transform, the table names referenced in the SELECT statement should no longer be the values of source_table_name, but rather the names of the tables obtained from the source.

bulolo commented 6 months ago

table-names = ["SCPRD.wmwhse2.ORDERDETAIL", "SCPRD.wmwhse3.ORDERDETAIL"]

if the source has two table how Transform deal with select

env {
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  SqlServer-CDC {
    username = "sa"
    password = "Password!"
    startup.mode="initial"
    exactly_once=true
    result_table_name = "temp_orderdetail"
    database-names = ["SCPRD"]
    table-names = ["SCPRD.wmwhse2.ORDERDETAIL", "SCPRD.wmwhse3.ORDERDETAIL"]
    base-url = "jdbc:sqlserver://192.168.103.113:1433;databaseName=SCPRD"
  }
}

transform {
  Sql {
    source_table_name = "temp_orderdetail"
    result_table_name = "temp_orderdetail2"
    query = "select ORDERKEY from ORDERDETAIL"  -----------> is that right? from  this source table["SCPRD.wmwhse2.ORDERDETAIL", "SCPRD.wmwhse3.ORDERDETAIL"]
  }
}

sink {
  console {
    source_table_name = "temp_orderdetail2"
  }
}
Carl-Zhou-CN commented 6 months ago

table-names = ["SCPRD.wmwhse2.ORDERDETAIL", "SCPRD.wmwhse3.ORDERDETAIL"]

if the source has two table how Transform deal with select

env {
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  SqlServer-CDC {
    username = "sa"
    password = "Password!"
    startup.mode="initial"
    exactly_once=true
    result_table_name = "temp_orderdetail"
    database-names = ["SCPRD"]
    table-names = ["SCPRD.wmwhse2.ORDERDETAIL", "SCPRD.wmwhse3.ORDERDETAIL"]
    base-url = "jdbc:sqlserver://192.168.103.113:1433;databaseName=SCPRD"
  }
}

transform {
  Sql {
    source_table_name = "temp_orderdetail"
    result_table_name = "temp_orderdetail2"
    query = "select ORDERKEY from ORDERDETAIL"  -----------> is that right? from  this source table["SCPRD.wmwhse2.ORDERDETAIL", "SCPRD.wmwhse3.ORDERDETAIL"]
  }
}

sink {
  console {
    source_table_name = "temp_orderdetail2"
  }
}

Sorry, this is not supported yet

Carl-Zhou-CN commented 6 months ago
但这里就更加奇怪了,单张表的时候,超过128是可以的,多张表超过128不行

It should be because starrocks does not Support multi-table sink

cuijunle commented 6 months ago

i met same bug