apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.94k stars 1.8k forks source link

[Bug] [Connector-V2-Maxcompute] Maxcompute sink can't map field #7164

Closed panpan2019 closed 3 months ago

panpan2019 commented 3 months ago

Search before asking

What happened

When performing Maxcompute data writing, an exception of field type conversion is reported(Cannot cast java.lang.String to java.lang.Byte). After reviewing the code, it is found that the Connector-Maxcompute-Sink does not map with the source-side fields when writing data.

SeaTunnel Version

2.3.2

SeaTunnel Config

{
    "env" : {
        "execution.parallelism" : 2,
        "job.mode" : "BATCH",
        "shade.identifier" : "base64"
    },
    "source" : [
        {
            "accessId" : "***",
            "accesskey" : "***",
            "driver" : "com.aliyun.odps.jdbc.OdpsDriver",
            "endpoint" : "http://*******/api",
            "parallelism" : 1,
            "plugin_name" : "Maxcompute",
            "project" : "JH_SHZT_SJCJZJ",
            "query" : "select `c7` from fake_source",
            "result_table_name" : "Maxcompute_source_4",
            "table_name" : "fake_source"
        }
    ],
    "transform" : [
        {
            "field_mapper" : {
                "c7" : "c7"
            },
            "plugin_name" : "FieldMapper",
            "result_table_name" : "FieldMapper_transform_0",
            "source_table_name" : [
                "Maxcompute_source_4"
            ]
        }
    ],
    "sink" : [
        {
            "accessId" : "***",
            "accesskey" : "***",
            "driver" : "com.aliyun.odps.jdbc.OdpsDriver",
            "endpoint" : "http://***/api",
            "generate_sink_sql" : true,
            "overwrite" : true,
            "plugin_name" : "Maxcompute",
            "project" : "JH_SHZT_SJCJZJ",
            "source_table_name" : [
                "FieldMapper_transform_0"
            ],
            "table_name" : "fake_sink"
        }
    ]
}

CREATE TABLE JH_SHZT_SJCJZJ.fake_sink (
    c1 TINYINT,
    c2 SMALLINT,
    c3 INT,
    c4 BIGINT,
    c5 FLOAT,
    c6 DOUBLE,
    c7 VARCHAR(10),
    c8 CHAR(10),
    c9 STRING,
    c10 DATE,
    c11 DATETIME,
    c12 TIMESTAMP,
    c13 BOOLEAN,
    c14 BINARY,
    c15 MAP<STRING,STRING>,
    c16 ARRAY<INT>,
    c17 STRUCT<s1:STRING,s2:INT,s3:ARRAY<FLOAT>>
);

-- Drop table

-- DROP TABLE JH_SHZT_SJCJZJ.fake_source;

CREATE TABLE JH_SHZT_SJCJZJ.fake_source (
    c1 TINYINT,
    c2 SMALLINT,
    c3 INT,
    c4 BIGINT,
    c5 FLOAT,
    c6 DOUBLE,
    c7 VARCHAR(10),
    c8 CHAR(10),
    c9 STRING,
    c10 DATE,
    c11 DATETIME,
    c12 TIMESTAMP,
    c13 BOOLEAN,
    c14 BINARY,
    c15 MAP<STRING,STRING>,
    c16 ARRAY<INT>,
    c17 STRUCT<s1:STRING,s2:INT,s3:ARRAY<FLOAT>>
);

Running Command

./seatunnel.sh --config  xxx.json

Error Exception

2024-07-09 11:11:14 [pool-3-thread-2] [INFO] 2024-07-09 11:11:14,738 ERROR org.apache.seatunnel.core.starter.SeaTunnel - 
2024-07-09 11:11:14 [pool-3-thread-2] [INFO] ===============================================================================
2024-07-09 11:11:14 [pool-3-thread-2] [INFO] 
2024-07-09 11:11:14 [pool-3-thread-2] [INFO] 
2024-07-09 11:11:14 [pool-3-thread-2] [INFO] 
2024-07-09 11:11:14 [pool-3-thread-2] [INFO] Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
2024-07-09 11:11:14 [pool-3-thread-2] [INFO]    at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:188)
2024-07-09 11:11:14 [pool-3-thread-2] [INFO]    at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
2024-07-09 11:11:14 [pool-3-thread-2] [INFO]    at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
2024-07-09 11:11:14 [pool-3-thread-2] [INFO] Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.RuntimeException: java.lang.ClassCastException: Cannot cast java.lang.String to java.lang.Byte
2024-07-09 11:11:14 [pool-3-thread-2] [INFO]    at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:208)
2024-07-09 11:11:14 [pool-3-thread-2] [INFO]    at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:59)
2024-07-09 11:11:14 [pool-3-thread-2] [INFO]    at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)
2024-07-09 11:11:14 [pool-3-thread-2] [INFO]    at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)
2024-07-09 11:11:14 [pool-3-thread-2] [INFO]    at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:76)
2024-07-09 11:11:14 [pool-3-thread-2] [INFO]    at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:51)
2024-07-09 11:11:14 [pool-3-thread-2] [INFO]    at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:52)
2024-07-09 11:11:14 [pool-3-thread-2] [INFO]    at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73)
2024-07-09 11:11:14 [pool-3-thread-2] [INFO]    at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:165)
2024-07-09 11:11:14 [pool-3-thread-2] [INFO]    at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78)
2024-07-09 11:11:14 [pool-3-thread-2] [INFO]    at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:613)
2024-07-09 11:11:14 [pool-3-thread-2] [INFO]    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
2024-07-09 11:11:14 [pool-3-thread-2] [INFO]    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
2024-07-09 11:11:14 [pool-3-thread-2] [INFO]    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
2024-07-09 11:11:14 [pool-3-thread-2] [INFO]    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
2024-07-09 11:11:14 [pool-3-thread-2] [INFO]    at java.lang.Thread.run(Thread.java:750)
2024-07-09 11:11:14 [pool-3-thread-2] [INFO] Caused by: java.lang.ClassCastException: Cannot cast java.lang.String to java.lang.Byte
2024-07-09 11:11:14 [pool-3-thread-2] [INFO]    at java.lang.Class.cast(Class.java:3369)
2024-07-09 11:11:14 [pool-3-thread-2] [INFO]    at com.aliyun.odps.data.OdpsTypeTransformer.transform(OdpsTypeTransformer.java:208)
2024-07-09 11:11:14 [pool-3-thread-2] [INFO]    at com.aliyun.odps.data.ArrayRecord.set(ArrayRecord.java:117)
2024-07-09 11:11:14 [pool-3-thread-2] [INFO]    at org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeTypeMapper.getMaxcomputeRowData(MaxcomputeTypeMapper.java:81)
2024-07-09 11:11:14 [pool-3-thread-2] [INFO]    at org.apache.seatunnel.connectors.seatunnel.maxcompute.sink.MaxcomputeWriter.write(MaxcomputeWriter.java:78)
2024-07-09 11:11:14 [pool-3-thread-2] [INFO]    at org.apache.seatunnel.connectors.seatunnel.maxcompute.sink.MaxcomputeWriter.write(MaxcomputeWriter.java:43)
2024-07-09 11:11:14 [pool-3-thread-2] [INFO]    at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:203)
2024-07-09 11:11:14 [pool-3-thread-2] [INFO]    ... 15 more
2024-07-09 11:11:14 [pool-3-thread-2] [INFO] 
2024-07-09 11:11:14 [pool-3-thread-2] [INFO]    at org.apache.seatunnel.engine.client.job.ClientJobProxy.waitForJobComplete(ClientJobProxy.java:122)
2024-07-09 11:11:14 [pool-3-thread-2] [INFO]    at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:181)
2024-07-09 11:11:14 [pool-3-thread-2] [INFO]    ... 2 more

Zeta or Flink or Spark Version

zeta2.3.2

Java or Scala Version

jdk8

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

panpan2019 commented 3 months ago

mark