[X] I had searched in the issues and found no similar issues.
What happened
Error synchronizing data from multiple tables to Kafka from MSSQL
SeaTunnel Version
2.3.5
SeaTunnel Config
env {
# You can set engine configuration here
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
SqlServer-CDC {
base-url = "jdbc:sqlserver://localhost:1433;databaseName=test_01"
username = "sa"
password = "password"
database-names = ["test_01"]
table-names = ["test_01.dbo.table_a","test_01.dbo.table_b"]
startup.mode="initial"
}
}
transform {
}
sink {
Kafka {
bootstrap.servers = "xxxx"
topic = "SYNC-TEST"
kafka.request.timeout.ms = 60000
semantics = EXACTLY_ONCE
}
}
Running Command
./bin/seatunnel.sh --config ./config/test.config -e local
Error Exception
2024-05-16 21:08:04,646 ERROR [o.a.k.c.p.KafkaProducer ] [BlockingWorker-TaskGroupLocation{jobId=843474790923632641, pipelineId=1, taskGroupId=90000}] - [Producer clientId=producer-SeaTunnel4023-1, transactionalId=SeaTunnel4023-1] Interrupted while joining ioThread
java.lang.InterruptedException: null
at java.lang.Object.wait(Native Method) ~[?:1.8.0_391]
at java.lang.Thread.join(Thread.java:1265) ~[?:1.8.0_391]
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1265) ~[?:?]
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1242) ~[?:?]
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1218) ~[?:?]
at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaTransactionSender.close(KafkaTransactionSender.java:119) ~[?:?]
at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSinkWriter.close(KafkaSinkWriter.java:140) ~[?:?]
at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.close(SinkFlowLifeCycle.java:158) ~[seatunnel-starter.jar:2.3.5]
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$close$5(SeaTunnelTask.java:330) ~[seatunnel-starter.jar:2.3.5]
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) [?:1.8.0_391]
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) [?:1.8.0_391]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) [?:1.8.0_391]
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291) [?:1.8.0_391]
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731) [?:1.8.0_391]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_391]
at java.util.concurrent.ForkJoinPool.helpComplete(ForkJoinPool.java:1881) [?:1.8.0_391]
at java.util.concurrent.ForkJoinPool.externalHelpComplete(ForkJoinPool.java:2478) [?:1.8.0_391]
at java.util.concurrent.ForkJoinTask.externalAwaitDone(ForkJoinTask.java:324) [?:1.8.0_391]
at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:405) [?:1.8.0_391]
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734) [?:1.8.0_391]
at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160) [?:1.8.0_391]
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174) [?:1.8.0_391]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) [?:1.8.0_391]
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) [?:1.8.0_391]
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583) [?:1.8.0_391]
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.close(SeaTunnelTask.java:327) [seatunnel-starter.jar:2.3.5]
at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:720) [seatunnel-starter.jar:2.3.5]
at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1004) [seatunnel-starter.jar:2.3.5]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_391]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_391]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_391]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_391]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_391]
2024-05-16 21:08:04,646 INFO [o.a.k.c.p.KafkaProducer ] [BlockingWorker-TaskGroupLocation{jobId=843474790923632641, pipelineId=1, taskGroupId=90000}] - [Producer clientId=producer-SeaTunnel4023-1, transactionalId=SeaTunnel4023-1] Proceeding to force close the producer since pending requests could not be completed within timeout 9223372036854775807 ms.
2024-05-16 21:08:04,647 INFO [o.a.k.c.m.Metrics ] [BlockingWorker-TaskGroupLocation{jobId=843474790923632641, pipelineId=1, taskGroupId=90000}] - Metrics scheduler closed
2024-05-16 21:08:04,647 INFO [o.a.k.c.m.Metrics ] [BlockingWorker-TaskGroupLocation{jobId=843474790923632641, pipelineId=1, taskGroupId=90000}] - Closing reporter org.apache.kafka.common.metrics.JmxReporter
2024-05-16 21:08:04,647 INFO [o.a.k.c.m.Metrics ] [BlockingWorker-TaskGroupLocation{jobId=843474790923632641, pipelineId=1, taskGroupId=90000}] - Metrics reporters closed
2024-05-16 21:08:04,647 INFO [o.a.k.c.u.AppInfoParser ] [BlockingWorker-TaskGroupLocation{jobId=843474790923632641, pipelineId=1, taskGroupId=90000}] - App info kafka.producer for producer-SeaTunnel4023-1 unregistered
2024-05-16 21:08:04,665 INFO [c.h.i.i.NodeExtension ] [main] - [localhost]:5801 [seatunnel-253591] [5.1] Destroying node NodeExtension.
2024-05-16 21:08:04,666 INFO [c.h.i.i.Node ] [main] - [localhost]:5801 [seatunnel-253591] [5.1] Hazelcast Shutdown is completed in 28 ms.
2024-05-16 21:08:04,666 INFO [c.h.c.LifecycleService ] [main] - [localhost]:5801 [seatunnel-253591] [5.1] [localhost]:5801 is SHUTDOWN
2024-05-16 21:08:04,666 INFO [s.c.s.s.c.ClientExecuteCommand] [main] - Closed HazelcastInstance ......
2024-05-16 21:08:04,666 INFO [s.c.s.s.c.ClientExecuteCommand] [main] - Closed metrics executor service ......
2024-05-16 21:08:04,666 ERROR [o.a.s.c.s.SeaTunnel ] [main] -
===============================================================================
2024-05-16 21:08:04,666 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Fatal Error,
2024-05-16 21:08:04,666 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Please submit bug report in https://github.com/apache/seatunnel/issues
2024-05-16 21:08:04,666 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Reason:SeaTunnel job executed failed
2024-05-16 21:08:04,667 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:202)
at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: com.hazelcast.nio.serialization.HazelcastSerializationException: java.lang.NegativeArraySizeException
at com.hazelcast.internal.serialization.impl.SerializationUtil.handleException(SerializationUtil.java:111)
at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:274)
at com.hazelcast.spi.impl.NodeEngineImpl.toObject(NodeEngineImpl.java:385)
at com.hazelcast.collection.impl.queue.QueueProxyImpl.drainTo(QueueProxyImpl.java:134)
at org.apache.seatunnel.engine.server.task.flow.ShuffleSourceFlowLifeCycle.collect(ShuffleSourceFlowLifeCycle.java:94)
at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73)
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78)
at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:703)
at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1004)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.NegativeArraySizeException
at org.apache.seatunnel.api.table.type.SeaTunnelRow.<init>(SeaTunnelRow.java:38)
at org.apache.seatunnel.engine.server.serializable.RecordSerializer.read(RecordSerializer.java:75)
at org.apache.seatunnel.engine.server.serializable.RecordSerializer.read(RecordSerializer.java:33)
at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44)
at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:268)
... 13 more
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:194)
... 2 more
2024-05-16 21:08:04,667 ERROR [o.a.s.c.s.SeaTunnel ] [main] -
===============================================================================
Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:202)
at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: com.hazelcast.nio.serialization.HazelcastSerializationException: java.lang.NegativeArraySizeException
at com.hazelcast.internal.serialization.impl.SerializationUtil.handleException(SerializationUtil.java:111)
at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:274)
at com.hazelcast.spi.impl.NodeEngineImpl.toObject(NodeEngineImpl.java:385)
at com.hazelcast.collection.impl.queue.QueueProxyImpl.drainTo(QueueProxyImpl.java:134)
at org.apache.seatunnel.engine.server.task.flow.ShuffleSourceFlowLifeCycle.collect(ShuffleSourceFlowLifeCycle.java:94)
at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73)
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78)
at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:703)
at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1004)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.NegativeArraySizeException
at org.apache.seatunnel.api.table.type.SeaTunnelRow.<init>(SeaTunnelRow.java:38)
at org.apache.seatunnel.engine.server.serializable.RecordSerializer.read(RecordSerializer.java:75)
at org.apache.seatunnel.engine.server.serializable.RecordSerializer.read(RecordSerializer.java:33)
at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44)
at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:268)
... 13 more
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:194)
... 2 more
2024-05-16 21:08:09,127 INFO [r.IncrementalSourceSplitReader] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=843474790923632641, pipelineId=1, taskGroupId=30000}] - Close current fetcher org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceScanFetcher
2024-05-16 21:08:09,138 INFO [i.d.j.JdbcConnection ] [pool-26-thread-1] - Connection gracefully closed
2024-05-16 21:08:09,139 INFO [i.d.j.JdbcConnection ] [pool-27-thread-1] - Connection gracefully closed
2024-05-16 21:08:09,140 INFO [o.a.s.c.s.c.s.r.f.SplitFetcher] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=843474790923632641, pipelineId=1, taskGroupId=30000}] - Split fetcher 0 exited.
2024-05-16 21:08:14,521 INFO [r.IncrementalSourceSplitReader] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=843474790923632641, pipelineId=1, taskGroupId=30000}] - Close current fetcher org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceScanFetcher
2024-05-16 21:08:14,525 INFO [i.d.j.JdbcConnection ] [pool-28-thread-1] - Connection gracefully closed
2024-05-16 21:08:14,527 INFO [i.d.j.JdbcConnection ] [pool-29-thread-1] - Connection gracefully closed
2024-05-16 21:08:14,527 INFO [o.a.s.c.s.c.s.r.f.SplitFetcher] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=843474790923632641, pipelineId=1, taskGroupId=30000}] - Split fetcher 0 exited.
2024-05-16 21:08:18,585 INFO [o.a.k.c.m.Metrics ] [ForkJoinPool.commonPool-worker-5] - Metrics scheduler closed
2024-05-16 21:08:18,585 INFO [o.a.k.c.m.Metrics ] [ForkJoinPool.commonPool-worker-5] - Closing reporter org.apache.kafka.common.metrics.JmxReporter
2024-05-16 21:08:18,585 INFO [o.a.k.c.m.Metrics ] [ForkJoinPool.commonPool-worker-5] - Metrics reporters closed
2024-05-16 21:08:18,585 INFO [o.a.k.c.u.AppInfoParser ] [ForkJoinPool.commonPool-worker-5] - App info kafka.producer for producer-SeaTunnel6694-1 unregistered
2024-05-16 21:08:18,597 INFO [o.a.k.c.m.Metrics ] [ForkJoinPool.commonPool-worker-3] - Metrics scheduler closed
2024-05-16 21:08:18,597 INFO [o.a.k.c.m.Metrics ] [ForkJoinPool.commonPool-worker-3] - Closing reporter org.apache.kafka.common.metrics.JmxReporter
2024-05-16 21:08:18,597 INFO [o.a.k.c.m.Metrics ] [ForkJoinPool.commonPool-worker-3] - Metrics reporters closed
2024-05-16 21:08:18,597 INFO [o.a.k.c.u.AppInfoParser ] [ForkJoinPool.commonPool-worker-3] - App info kafka.producer for producer-SeaTunnel3841-1 unregistered
2024-05-16 21:08:29,196 INFO [o.a.k.c.m.Metrics ] [ForkJoinPool.commonPool-worker-4] - Metrics scheduler closed
2024-05-16 21:08:29,196 INFO [o.a.k.c.m.Metrics ] [ForkJoinPool.commonPool-worker-4] - Closing reporter org.apache.kafka.common.metrics.JmxReporter
2024-05-16 21:08:29,196 INFO [o.a.k.c.m.Metrics ] [ForkJoinPool.commonPool-worker-4] - Metrics reporters closed
2024-05-16 21:08:29,197 INFO [o.a.k.c.u.AppInfoParser ] [ForkJoinPool.commonPool-worker-4] - App info kafka.producer for producer-SeaTunnel4066-1 unregistered
2024-05-16 21:08:29,197 INFO [o.a.k.c.m.Metrics ] [ForkJoinPool.commonPool-worker-2] - Metrics scheduler closed
2024-05-16 21:08:29,197 INFO [o.a.k.c.m.Metrics ] [ForkJoinPool.commonPool-worker-2] - Closing reporter org.apache.kafka.common.metrics.JmxReporter
2024-05-16 21:08:29,198 INFO [o.a.k.c.m.Metrics ] [ForkJoinPool.commonPool-worker-2] - Metrics reporters closed
2024-05-16 21:08:29,198 INFO [o.a.k.c.u.AppInfoParser ] [ForkJoinPool.commonPool-worker-2] - App info kafka.producer for producer-SeaTunnel7334-1 unregistered
2024-05-16 21:08:34,594 INFO [o.a.k.c.m.Metrics ] [ForkJoinPool.commonPool-worker-6] - Metrics scheduler closed
2024-05-16 21:08:34,595 INFO [o.a.k.c.m.Metrics ] [ForkJoinPool.commonPool-worker-6] - Closing reporter org.apache.kafka.common.metrics.JmxReporter
2024-05-16 21:08:34,595 INFO [o.a.k.c.m.Metrics ] [ForkJoinPool.commonPool-worker-6] - Metrics reporters closed
2024-05-16 21:08:34,596 INFO [o.a.k.c.u.AppInfoParser ] [ForkJoinPool.commonPool-worker-6] - App info kafka.producer for producer-SeaTunnel3209-1 unregistered
2024-05-16 21:08:34,595 INFO [o.a.k.c.m.Metrics ] [ForkJoinPool.commonPool-worker-1] - Metrics scheduler closed
2024-05-16 21:08:34,598 INFO [o.a.k.c.m.Metrics ] [ForkJoinPool.commonPool-worker-1] - Closing reporter org.apache.kafka.common.metrics.JmxReporter
2024-05-16 21:08:34,598 INFO [o.a.k.c.m.Metrics ] [ForkJoinPool.commonPool-worker-1] - Metrics reporters closed
2024-05-16 21:08:34,599 INFO [o.a.k.c.u.AppInfoParser ] [ForkJoinPool.commonPool-worker-1] - App info kafka.producer for producer-SeaTunnel5301-1 unregistered
2024-05-16 21:08:34,604 INFO [s.c.s.s.c.ClientExecuteCommand] [ForkJoinPool.commonPool-worker-1] - run shutdown hook because get close signal
This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.
Search before asking
What happened
Error synchronizing data from multiple tables to Kafka from MSSQL
SeaTunnel Version
2.3.5
SeaTunnel Config
Running Command
Error Exception
Zeta or Flink or Spark Version
Zeta
Java or Scala Version
No response
Screenshots
No response
Are you willing to submit PR?
Code of Conduct