apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.99k stars 1.81k forks source link

[Bug] [connector-jdbc] When the column data type is varbinary and all row values of the column are null (or the first row is null), a data type conversion error occurs. #7618

Open line0820 opened 1 month ago

line0820 commented 1 month ago

Search before asking

What happened

When the column data type is varbinary and all row values of the column are null (or the first row is null), a data type conversion error occurs.

SeaTunnel Version

2.3.7

SeaTunnel Config

env {
  execution.parallelism = 2
  job.mode = "BATCH"
}
source {
    Jdbc {
        url = "jdbc:sqlserver://host.docker.internal:1433;databaseName=Test_1;encrypt=false;"
        driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
        connection_check_timeout_sec = 100
        user = "erp"
        password = "123456"
        query = "select cast([c1_Guid] as nvarchar(100)) AS [c1_Guid],[c2_Datetime],[c3_Datetime2],cast([c4_Datetimeoffset] as nvarchar(100)) AS [c4_Datetimeoffset],cast([c5_Datetimeoffset] as nvarchar(100)) AS [c5_Datetimeoffset],[c6_Int],[c7_Nvarchar],[c8_Char],[c9_Bit],[c10_Text],[c12_Decimal],[c13_Numeric],[c14_time],[c15_nvarcharMax],[c16_Bigint],[c17_smallint],[c18_real],[c19_float],[c20_varbinary],cast([ModifyDate] as nvarchar(100)) AS [ModifyDate] from [dbo].[Table_Test]"
    }
}

transform {
    # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
    # please go to https://seatunnel.apache.org/docs/transform/sql
}

sink {
  jdbc {
    url = "jdbc:sqlserver://host.docker.internal:1433;databaseName=Test_2;encrypt=false;"
    driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    user = "erp"
    password = "123456"

    database = "Test_2"
    table = "dbo.Table_Test"
    support_upsert_by_query_primary_key_exist = true
    generate_sink_sql = true
    primary_keys = ["c1_Guid"]
  }
}

Running Command

docker
dophinscheduler3.2.1 + seatunnel2.3.7
local
jdbc:mssql-jdbc-12.8.0.jre8.jar
database:mssql
table:
USE [Test_1]
GO

/****** Object:  Table [dbo].[Table_Test]    Script Date: 2024/9/10 9:02:10 ******/
SET ANSI_NULLS ON
GO

SET QUOTED_IDENTIFIER ON
GO

CREATE TABLE [dbo].[Table_Test](
    [c1_Guid] [uniqueidentifier] NOT NULL,
    [c2_Datetime] [datetime] NOT NULL,
    [c3_Datetime2] [datetime2](7) NULL,
    [c4_Datetimeoffset] [datetimeoffset](7) NULL,
    [c5_Datetimeoffset] [datetimeoffset](4) NULL,
    [c6_Int] [int] NULL,
    [c7_Nvarchar] [nvarchar](50) NULL,
    [c8_Char] [char](10) NULL,
    [c9_Bit] [bit] NULL,
    [c10_Text] [text] NULL,
    [c12_Decimal] [decimal](18, 6) NULL,
    [c13_Numeric] [numeric](18, 0) NULL,
    [c14_time] [time](7) NULL,
    [c15_nvarcharMax] [nvarchar](max) NULL,
    [c16_Bigint] [bigint] NULL,
    [c17_smallint] [smallint] NULL,
    [c18_real] [real] NULL,
    [c19_float] [float] NULL,
    [c20_varbinary] [varbinary](100) NULL,
    [ModifyDate] [datetimeoffset](7) NULL,
 CONSTRAINT [PK_Table_Test] PRIMARY KEY CLUSTERED 
(
    [c1_Guid] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON, OPTIMIZE_FOR_SEQUENTIAL_KEY = OFF) ON [PRIMARY]
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]
GO

Error Exception

2024-09-10 08:43:36,441 ERROR [o.a.s.c.s.j.i.JdbcOutputFormat] [st-multi-table-sink-writer-2] - JDBC executeBatch error, retry times = 0
    java.sql.BatchUpdateException: Implicit conversion from data type nvarchar to varbinary is not allowed. Use the CONVERT function to run this query.
        at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeBatch(SQLServerPreparedStatement.java:2444) ~[mssql-jdbc-12.8.0.jre8.jar:?]
        at org.apache.seatunnel.shade.com.zaxxer.hikari.pool.ProxyStatement.executeBatch(ProxyStatement.java:127) ~[connector-jdbc-2.3.7.jar:2.3.7]
        at org.apache.seatunnel.shade.com.zaxxer.hikari.pool.HikariProxyPreparedStatement.executeBatch(HikariProxyPreparedStatement.java) ~[connector-jdbc-2.3.7.jar:2.3.7]
        at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.FieldNamedPreparedStatement.executeBatch(FieldNamedPreparedStatement.java:534) ~[connector-jdbc-2.3.7.jar:2.3.7]
        at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:51) ~[connector-jdbc-2.3.7.jar:2.3.7]
        at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferReducedBatchStatementExecutor.executeBatch(BufferReducedBatchStatementExecutor.java:89) ~[connector-jdbc-2.3.7.jar:2.3.7]
        at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:172) ~[connector-jdbc-2.3.7.jar:2.3.7]
        at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:136) ~[connector-jdbc-2.3.7.jar:2.3.7]
        at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSinkWriter.prepareCommit(JdbcSinkWriter.java:131) ~[connector-jdbc-2.3.7.jar:2.3.7]
        at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.lambda$prepareCommit$4(MultiTableSinkWriter.java:217) ~[seatunnel-starter.jar:2.3.7]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_402]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_402]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_402]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_402]
        at java.lang.Thread.run(Thread.java:750) [?:1.8.0_402]
    2024-09-10 08:43:36,441 WARN  [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=885686863849848833, pipelineId=1, taskGroupId=30000}] - [localhost]:5802 [seatunnel-35394] [5.1] Exception in org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask@2621d0be
    java.lang.RuntimeException: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException: ErrorCode:[COMMON-10], ErrorDescription:[Flush data operation that in sink connector failed]
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:292) ~[seatunnel-starter.jar:2.3.7]
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:74) ~[seatunnel-starter.jar:2.3.7]
        at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39) ~[seatunnel-starter.jar:2.3.7]
        at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27) ~[seatunnel-starter.jar:2.3.7]
        at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:70) ~[seatunnel-starter.jar:2.3.7]
        at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50) ~[seatunnel-starter.jar:2.3.7]
        at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51) ~[seatunnel-starter.jar:2.3.7]
        at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73) ~[seatunnel-starter.jar:2.3.7]
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168) ~[seatunnel-starter.jar:2.3.7]
        at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78) ~[seatunnel-starter.jar:2.3.7]
        at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:717) [seatunnel-starter.jar:2.3.7]
        at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1039) [seatunnel-starter.jar:2.3.7]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_402]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_402]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_402]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_402]
        at java.lang.Thread.run(Thread.java:750) [?:1.8.0_402]
    Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException: ErrorCode:[COMMON-10], ErrorDescription:[Flush data operation that in sink connector failed]
        at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:234) ~[seatunnel-starter.jar:2.3.7]
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:208) ~[seatunnel-starter.jar:2.3.7]
        ... 16 more
    Caused by: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException: ErrorCode:[COMMON-10], ErrorDescription:[Flush data operation that in sink connector failed]
        at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:1.8.0_402]
        at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_402]
        at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:232) ~[seatunnel-starter.jar:2.3.7]
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:208) ~[seatunnel-starter.jar:2.3.7]
        ... 16 more
    Caused by: org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException: ErrorCode:[COMMON-10], ErrorDescription:[Flush data operation that in sink connector failed]
        at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:142) ~[connector-jdbc-2.3.7.jar:2.3.7]
        at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSinkWriter.prepareCommit(JdbcSinkWriter.java:131) ~[connector-jdbc-2.3.7.jar:2.3.7]
        at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.lambda$prepareCommit$4(MultiTableSinkWriter.java:217) ~[seatunnel-starter.jar:2.3.7]
        ... 5 more
    Caused by: java.sql.BatchUpdateException: Implicit conversion from data type nvarchar to varbinary is not allowed. Use the CONVERT function to run this query.
        at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeBatch(SQLServerPreparedStatement.java:2444) ~[mssql-jdbc-12.8.0.jre8.jar:?]
        at org.apache.seatunnel.shade.com.zaxxer.hikari.pool.ProxyStatement.executeBatch(ProxyStatement.java:127) ~[connector-jdbc-2.3.7.jar:2.3.7]
        at org.apache.seatunnel.shade.com.zaxxer.hikari.pool.HikariProxyPreparedStatement.executeBatch(HikariProxyPreparedStatement.java) ~[connector-jdbc-2.3.7.jar:2.3.7]
        at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.FieldNamedPreparedStatement.executeBatch(FieldNamedPreparedStatement.java:534) ~[connector-jdbc-2.3.7.jar:2.3.7]
        at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:51) ~[connector-jdbc-2.3.7.jar:2.3.7]
        at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferReducedBatchStatementExecutor.executeBatch(BufferReducedBatchStatementExecutor.java:89) ~[connector-jdbc-2.3.7.jar:2.3.7]
        at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:172) ~[connector-jdbc-2.3.7.jar:2.3.7]
        at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:136) ~[connector-jdbc-2.3.7.jar:2.3.7]
        at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSinkWriter.prepareCommit(JdbcSinkWriter.java:131) ~[connector-jdbc-2.3.7.jar:2.3.7]
        at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.lambda$prepareCommit$4(MultiTableSinkWriter.java:217) ~[seatunnel-starter.jar:2.3.7]

Zeta or Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

line0820 commented 1 month ago

In addition, when listed as datetimeoffset type, the time zone part will be removed, but it can be converted to varchar type in the source to avoid errors.

github-actions[bot] commented 3 weeks ago

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.