apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.94k stars 1.79k forks source link

[Bug] [Seatunnel-tansform-v2] org.apache.seatunnel.common.utils.SeaTunnelException: table S3SourceTransform not found #7872

Open WZH-hub opened 5 days ago

WZH-hub commented 5 days ago

Search before asking

What happened

description

seatunnle version 2.3.7, spark version 3.4.3, I can read from S3 and transform the table, the sink the transfromed table to console or S3, this works will in my local mode. But when I use in cluster mode, I can see that the spark-submit is correct, but get an error said that seatunnel can not find the source table from transform. When I delete the transform part, it works well.

SeaTunnel Version

2.3.7

SeaTunnel Config

env {
    job.mode = BATCH
    spark.app.name = SeaTunnelOnSpark
    spark.executor.cores = 1
    spark.executor.memory = 1g
    spark.kubernetes.driver.label.azkaban-project-name = ${azkaban-project-name}
    spark.kubernetes.driver.annotation.azkaban-job-info = ${azkaban-job-info}
    spark.kubernetes.driver.label.user = ${hadoop_proxy_user}
    spark.kubernetes.container.image = ${image}
}

source {
    S3File {
        result_table_name = "S3Source"
        file_format_type = "text"
        path = "XXX.txt"
        fs.s3a.endpoint = "XXX"
        fs.s3a.endpoint.region = "XXX"
        access_key = "XXX"
        secret_key = "XXX"
        bucket = "XXX"
        field_delimiter = "#"
        read_columns = ["name", "age","card"]
        schema {
            fields {
                name = string
                age = int
                card = int
            }
        }
    }
}

transform {
    Filter {
        source_table_name = "S3Source"
        result_table_name = "S3SourceTransform"
        include_fields = [name, card]
  }
}

sink {
    Console {
        source_table_name = "S3SourceTransform"
    }
}

Running Command

./bin/start-seatunnel-spark-3-connector-v2.sh \
    --name "seatunnel-spark-demo"    \
    --deploy-mode cluster \
    -i azkaban-project-name=$(XXX) \
    -i azkaban-job-info=${XXX} \
    -i hadoop_proxy_user=${XXX} \
    -i image=$XXX \
    --config $json_file

Error Exception

24/10/18 03:57:16 INFO AbstractPluginDiscovery: Load SeaTunnelSink Plugin from connectors
24/10/18 03:57:16 ERROR SeaTunnel: 

===============================================================================

24/10/18 03:57:16 ERROR SeaTunnel: Fatal Error, 

24/10/18 03:57:16 ERROR SeaTunnel: Please submit bug report in https://github.com/apache/seatunnel/issues

24/10/18 03:57:16 ERROR SeaTunnel: Reason:Run SeaTunnel on spark failed 

24/10/18 03:57:16 ERROR SeaTunnel: Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: Run SeaTunnel on spark failed
        at org.apache.seatunnel.core.starter.spark.command.SparkTaskExecuteCommand.execute(SparkTaskExecuteCommand.java:62)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at org.apache.seatunnel.core.starter.spark.SeaTunnelSpark.main(SeaTunnelSpark.java:35)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1020)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1111)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1120)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.seatunnel.common.utils.SeaTunnelException: table S3SourceTransform not found
        at org.apache.seatunnel.core.starter.spark.execution.SparkAbstractPluginExecuteProcessor.lambda$fromSourceTable$1(SparkAbstractPluginExecuteProcessor.java:82)
        at java.base/java.util.Optional.orElseThrow(Unknown Source)
        at org.apache.seatunnel.core.starter.spark.execution.SparkAbstractPluginExecuteProcessor.fromSourceTable(SparkAbstractPluginExecuteProcessor.java:79)
        at org.apache.seatunnel.core.starter.spark.execution.SinkExecuteProcessor.execute(SinkExecuteProcessor.java:99)
        at org.apache.seatunnel.core.starter.spark.execution.SparkExecution.execute(SparkExecution.java:71)
        at org.apache.seatunnel.core.starter.spark.command.SparkTaskExecuteCommand.execute(SparkTaskExecuteCommand.java:60)
        ... 14 more

24/10/18 03:57:16 ERROR SeaTunnel: 
===============================================================================

24/10/18 03:57:16 INFO SparkContext: SparkContext is stopping with exitCode 0.
24/10/18 03:57:16 INFO SparkUI: Stopped Spark web UI at http://seatunnel-spark-demo-c296aa929dc56850-driver-svc.mb-md8.svc:4040
24/10/18 03:57:16 INFO KubernetesClusterSchedulerBackend: Shutting down all executors
24/10/18 03:57:16 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down
24/10/18 03:57:16 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed.
24/10/18 03:57:17 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
24/10/18 03:57:17 INFO MemoryStore: MemoryStore cleared
24/10/18 03:57:17 INFO BlockManager: BlockManager stopped
24/10/18 03:57:17 INFO BlockManagerMaster: BlockManagerMaster stopped
24/10/18 03:57:17 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
24/10/18 03:57:17 INFO SparkContext: Successfully stopped SparkContext
Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: Run SeaTunnel on spark failed
        at org.apache.seatunnel.core.starter.spark.command.SparkTaskExecuteCommand.execute(SparkTaskExecuteCommand.java:62)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at org.apache.seatunnel.core.starter.spark.SeaTunnelSpark.main(SeaTunnelSpark.java:35)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1020)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1111)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1120)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.seatunnel.common.utils.SeaTunnelException: table S3SourceTransform not found
        at org.apache.seatunnel.core.starter.spark.execution.SparkAbstractPluginExecuteProcessor.lambda$fromSourceTable$1(SparkAbstractPluginExecuteProcessor.java:82)
        at java.base/java.util.Optional.orElseThrow(Unknown Source)
        at org.apache.seatunnel.core.starter.spark.execution.SparkAbstractPluginExecuteProcessor.fromSourceTable(SparkAbstractPluginExecuteProcessor.java:79)
        at org.apache.seatunnel.core.starter.spark.execution.SinkExecuteProcessor.execute(SinkExecuteProcessor.java:99)
        at org.apache.seatunnel.core.starter.spark.execution.SparkExecution.execute(SparkExecution.java:71)
        at org.apache.seatunnel.core.starter.spark.command.SparkTaskExecuteCommand.execute(SparkTaskExecuteCommand.java:60)
        ... 14 more
24/10/18 03:57:17 INFO ShutdownHookManager: Shutdown hook called

Zeta or Flink or Spark Version

Spark 3.4.3 mirror image

Java or Scala Version

jdk1.8.0_131

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

WZH-hub commented 2 days ago

有人能解答一下吗?