apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.02k stars 1.82k forks source link

[Bug] [connector-jdbc] Mysql bigint(20) cannot be used as a partition_column #4627

Closed LYL41011 closed 1 year ago

LYL41011 commented 1 year ago

Search before asking

What happened

I want to synchronize data from MySQL to Elasticsearch. Due to the large amount of data, I must use concurrency to increase the synchronization speed. However, when I configure concurrency and use bigint(20) id (mysql ddl is :id bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '物理主键') as the partition column, the synchronization task reports an error

Caused by: org.apache.seatunnel.api.common.PrepareFailException: ErrorCode:[API-01], ErrorDescription:[Configuration item validate failed] - PluginName: jdbc, PluginType: source, Message: org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException: ErrorCode:[COMMON-06], ErrorDescription:[Illegal argument] - id Decimal(20, 0) is not numeric type

SeaTunnel Version

2.3.1

SeaTunnel Config

env {
  execution.parallelism = 10
  job.mode = "BATCH"
  checkpoint.interval = 20000

}

source {

    Jdbc {
        url = "jdbc:mysql://xxx:4491/cis?tinyInt1isBit=false"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 100
        user = "cis_r4"
        password = "xxx"
        query = "select id,biz_no,cust_no,user_no,control_code,memo,opr_user,opr_date,date_created,created_by,date_update,update_by,date_updated,updated_by from u_user_control"
        partition_column = "id"
    }

}
sink {
    Elasticsearch {
        hosts = ["xxx:18064"]
        index = "black_record_test"
        username  = "vocnw_w"
        password  = "xxx"
    }
}

Running Command

../bin/start-seatunnel-flink-13-connector-v2.sh --config mysql2es_test

Error Exception

SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: ErrorCode:[API-01], ErrorDescription:[Configuration item validate failed] - PluginName: jdbc, PluginType: source, Message: org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException: ErrorCode:[COMMON-06], ErrorDescription:[Illegal argument] - id Decimal(20, 0) is not numeric type
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
        at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: org.apache.seatunnel.api.common.PrepareFailException: ErrorCode:[API-01], ErrorDescription:[Configuration item validate failed] - PluginName: jdbc, PluginType: source, Message: org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException: ErrorCode:[COMMON-06], ErrorDescription:[Illegal argument] - id Decimal(20, 0) is not numeric type
        at org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSource.prepare(JdbcSource.java:96)
        at org.apache.seatunnel.core.starter.flink.execution.SourceExecuteProcessor.initializePlugins(SourceExecuteProcessor.java:132)
        at org.apache.seatunnel.core.starter.flink.execution.FlinkAbstractPluginExecuteProcessor.<init>(FlinkAbstractPluginExecuteProcessor.java:67)
        at org.apache.seatunnel.core.starter.flink.execution.SourceExecuteProcessor.<init>(SourceExecuteProcessor.java:58)
        at org.apache.seatunnel.core.starter.flink.execution.FlinkExecution.<init>(FlinkExecution.java:85)
        at org.apache.seatunnel.core.starter.flink.command.FlinkTaskExecuteCommand.execute(FlinkTaskExecuteCommand.java:59)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at org.apache.seatunnel.core.starter.flink.SeaTunnelFlink.main(SeaTunnelFlink.java:34)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
        ... 11 more

Flink or Spark Version

Flink1.13.6

Java or Scala Version

java8

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

zhilinli123 commented 1 year ago

Leave it to me to fix

github-actions[bot] commented 1 year ago

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

github-actions[bot] commented 1 year ago

This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.