apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.02k stars 1.82k forks source link

[Bug] [spark] The op field conflicts with the internal field of the Seatunnel Spark engine and cannot be read from the data source. #7392

Open lizc9 opened 3 months ago

lizc9 commented 3 months ago

Search before asking

What happened

When I read the op field from json, and defind it as string type, I get the following error: java.lang.ClassCastException: java.lang.Byte cannot be cast to org.apache.spark.unsafe.types.UTF8String This is because of the following code, it defind op column as byte type: org.apache.seatunnel.translation.spark.serialization.InternalRowConverter#convert(org.apache.seatunnel.api.table.type.SeaTunnelRow, org.apache.seatunnel.api.table.type.SeaTunnelRowType)

SeaTunnel Version

2.3.6

SeaTunnel Config

# Defining the runtime environment
env {
  parallelism = 1
  job.mode = "BATCH"
}
source {
  Kafka {
    topic = "test"
    bootstrap.servers = "localhost:9092"
    consumer.group = "seatunnel"
    format = "json"
    kafka.config = {
      max.poll.records = 1000
      auto.offset.reset = "earliest"
      enable.auto.commit = "false"
    },
    schema = {
        columns = [
            {
              name = before
              type = string
              nullable = true
            },
            {
              name = after
              type = string
              nullable = true
            },
            {
              name = source
              type = string
              nullable = true
            },
            {
              name = op
              type = string
              nullable = true
            },
            {
              name = ts_ms
              type = bigint
              nullable = true
            }
        ]
    }
  }
}

transform {
}

sink {
  Console {
  }
}

Running Command

./bin/start-seatunnel-spark-3-connector-v2.sh --master "local[4]" --deploy-mode client --config ./config/batch.conf

Error Exception

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (local executor driver): java.lang.ClassCastException: java.lang.Byte cannot be cast to org.apache.spark.unsafe.types.UTF8String
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String(rows.scala:46)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String$(rows.scala:46)
    at org.apache.spark.sql.catalyst.expressions.SpecificInternalRow.getUTF8String(SpecificInternalRow.scala:193)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:32)
    at org.apache.seatunnel.core.starter.spark.execution.TransformExecuteProcessor$TransformIterator.hasNext(TransformExecuteProcessor.java:174)
    at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:45)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:435)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1538)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:480)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:381)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Zeta or Flink or Spark Version

Spark: 3.3

Java or Scala Version

java 1.8

Screenshots

image image

Are you willing to submit PR?

Code of Conduct

zhilinli123 commented 3 months ago

Can you provide a message data case? I'll follow up.

Carl-Zhou-CN commented 2 months ago

hi , @lizc9 Previously op was a reserved field, but now it is open and you can refer to the latest dev

lizc9 commented 2 months ago

Can you provide a message data case? I'll follow up. kafka key:

{
"id": "c-ecaae896-0bb8-3a37-a142-ef8b7e662900"
}

kafka content:

{
"before": {
"id": "c-ecaae896-0bb8-3a37-a142-ef8b7e662900",
"ingestion_time": 1723018622139909,
"city": "unknown"
},
"after": {
"id": "c-ecaae896-0bb8-3a37-a142-ef8b7e662900",
"ingestion_time": 1723018622139909,
"city": "unknown"
},
"source": {
"version": "2.0.1.Final",
"connector": "postgresql",
"name": "polar",
"ts_ms": 1723018690396,
"snapshot": "false",
"db": "prod_datastore01",
"sequence": "[\"343153176654624\",\"343153176654624\"]",
"schema": "public",
"table": "clips",
"txId": 2982233777,
"lsn": 343153176654624,
"xmin": null
},
"op": "u",
"ts_ms": 1723018690754,
"transaction": null
}
lizc9 commented 2 months ago

hi , @lizc9 Previously op was a reserved field, but now it is open and you can refer to the latest dev

Thanks, may I ask in which Seatunnel version this feature will be released? Is there a release plan?