apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.81k stars 1.76k forks source link

[Bug] [seatunnel-engine] SeaTunnel flink engin application mode read file not existed #7321

Open Adamyuanyuan opened 1 month ago

Adamyuanyuan commented 1 month ago

Search before asking

What happened

When using Flink Application mode to submit Seatunnel, if the same error always occurs.

  1. same as https://github.com/apache/seatunnel/issues/6622
  2. Confirm that the configuration file path is correct.
  3. Flink Session mode is running normally.

SeaTunnel Version

2.3.6

SeaTunnel Config

env {
  # You can set SeaTunnel environment configuration here
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 2000
}

source {
  # This is a example source plugin **only for test and demonstrate the feature source plugin**
  FakeSource {
    parallelism = 1
    result_table_name = "fake"
    row.num = 16
    schema = {
      fields {
        name = "string"
        age = "int"
      }
    }
  }

  # If you would like to get more information about how to configure SeaTunnel and see full list of source plugins,
  # please go to https://seatunnel.apache.org/docs/category/source-v2
}

sink {
  Console {
  }

  # If you would like to get more information about how to configure SeaTunnel and see full list of sink plugins,
  # please go to https://seatunnel.apache.org/docs/category/sink-v2
}

### Running Command

```shell
${FLINK_HOME}/bin/flink run-application -d     -c org.apache.seatunnel.core.starter.flink.SeaTunnelFlink     -t yarn-application     /mnt/apache-seatunnel-2.3.6/starter/seatunnel-flink-15-starter.jar     --config hdfs:///user/root/config/v2.streaming1.conf     --name test1fake  

${FLINK_HOME}/bin/flink run-application -d     -c org.apache.seatunnel.core.starter.flink.SeaTunnelFlink     -t yarn-application     /mnt/apache-seatunnel-2.3.6/starter/seatunnel-flink-15-starter.jar     --config config/v2.streaming1.conf     --name test1fake  

### Error Exception

```log
2024-08-06 10:11:59,841 ERROR org.apache.seatunnel.core.starter.SeaTunnel                  [] -                                                             
                                                                                                                                                            ===============================================================================                                                                             
                                                                                                                                                                                                                                                                                                                        2024-08-06 10:11:59,841 ERROR org.apache.seatunnel.core.starter.SeaTunnel                  [] - Fatal Error,                                                
                                                                                                                                                            2024-08-

06 10:11:59,841 ERROR org.apache.seatunnel.core.starter.SeaTunnel                  [] - Please submit bug report in https://github.com/apache/seatun
                                                                                                                                                            2024-08-06 10:11:59,841 ERROR org.apache.seatunnel.core.starter.SeaTunnel                  [] - Reason:ErrorCode:[COMMON-22], ErrorDescription:[SeaTunnel re                                                                                                                                                            2024-08-06 10:11:59,842 ERROR org.apache.seatunnel.core.starter.SeaTunnel                  [] - Exception StackTrace:org.apache.seatunnel.common.exception.S        at org.apache.seatunnel.common.exception.CommonError.fileNotExistFailed(CommonError.java:86)                                                        
        at org.apache.seatunnel.core.starter.utils.FileUtils.checkConfigExist(FileUtils.java:66)                                                            
        at org.apache.seatunnel.core.starter.flink.command.FlinkTaskExecuteCommand.execute(FlinkTaskExecuteCommand.java:50)                                 
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)                                                                               
        at org.apache.seatunnel.core.starter.flink.SeaTunnelFlink.main(SeaTunnelFlink.java:34)                                                              
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)                                                                                      
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)                                                                    
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)                                                            
        at java.lang.reflect.Method.invoke(Method.java:498)                                                                                                 
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)


### Zeta or Flink or Spark Version

flink-1.18.1

### Java or Scala Version

_No response_

### Screenshots

_No response_

### Are you willing to submit PR?

- [ ] Yes I am willing to submit a PR!

### Code of Conduct

- [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct)
Carl-Zhou-CN commented 1 month ago

I tried to solve it, but should our focus be on configuring support for shared storage or letting Flink distribute that configuration?

@hailin0 @TyrantLucifer @EricJoy2048 @Hisoka-X @liugddx

Hisoka-X commented 1 month ago

Flink distribute that configuration

I perfer this one

hanhanzhang commented 1 week ago

I tried to solve it, but should our focus be on configuring support for shared storage or letting Flink distribute that configuration?

@hailin0 @TyrantLucifer @EricJoy2048 @Hisoka-X @liugddx

support for shared storage, this is also available for other engines

hanhanzhang commented 1 week ago

Flink distribute that configuration

I perfer this one

Spark has ability to distribute files. flink doesn't have that, does it?

Carl-Zhou-CN commented 1 week ago

Flink distribute that configuration

I perfer this one

Spark has ability to distribute files. flink doesn't have that, does it?

Use the yarn distribution capability -Dyarn.ship-files

Carl-Zhou-CN commented 1 week ago

@hanhanzhang Are you interested? I can assign it to you

hanhanzhang commented 1 week ago

@hanhanzhang Are you interested? I can assign it to you

We already have this problem now, assign it to me, thanks

Carl-Zhou-CN commented 1 week ago

@hanhanzhang Are you interested? I can assign it to you

We already have this problem now, assign it to me, thanks

Thanks for your contribution, we can discuss various ways here

hanhanzhang commented 1 week ago

@hanhanzhang Are you interested? I can assign it to you

We already have this problem now, assign it to me, thanks

Thanks for your contribution, we can discuss various ways here

I tried to solve this problem in yarn deployment mode, using the yarn ship file to solve the configuration file delivery, but new problems would be exposed: task plugins also need to be delivered to cluster using yarn ship archives. If there are many plugins, the uploading of plugins will take a lot of time, the task startup will be slow, and plugin discover module needs to be modified. In my opinion, application mode needs more modifications. I'm thinking about whether introducing storage modules is the best way, do you think?

Hisoka-X commented 1 week ago

I'm thinking about whether introducing storage modules is the best way

I think we should give users the choice. By default, each time the jar will be ship to cluster. If the user needs it, they can modify the address of the jar package by configuring it (like yarn.provided.lib.dirs).

hanhanzhang commented 1 week ago

I'm thinking about whether introducing storage modules is the best way

I think we should give users the choice. By default, each time the jar will be ship to cluster. If the user needs it, they can modify the address of the jar package by configuring it (like yarn.provided.lib.dirs).

yes, this reduces jar upload time.

Carl-Zhou-CN commented 1 week ago

We can look at capabilities like spark --jar to see if flink can support it

Hisoka-X commented 1 week ago

We can look at capabilities like spark --jar to see if flink can support it

We already do same thing on flink in https://github.com/apache/seatunnel/blob/90cd46f50ac2f01e11ac5b7002688ea6c657cc82/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractFlinkRuntimeEnvironment.java#L235

The problem is we need upload it every times.

Hisoka-X commented 1 week ago

I tried to solve this problem in yarn deployment mode, using the yarn ship file to solve the configuration file delivery

I think we should face this problem first. Could you created a PR for this? @hanhanzhang

Carl-Zhou-CN commented 1 week ago

@hanhanzhang Can you provide the flink shell command that you are printing now?

hanhanzhang commented 1 week ago

@hanhanzhang Can you provide the flink shell command that you are printing now?

[root@localhost 2.3.7]# ${SEATUNNEL_HOME}/bin/start-seatunnel-flink-15-connector-v2.sh --config /usr/seatunnel/test.conf --deploy-mode run-application --target yarn-application --yarnqueue default --yarnjobManagerMemory 1024 --yarntaskManagerMemory 1024 Execute SeaTunnel Flink Job: ${FLINK_HOME}/bin/flink run-application --target yarn-application -D "yarn.ship-files=/usr/seatunnel/test.conf;/usr/seatunnel/2.3.7/connectors" --yarnqueue default --yarnjobManagerMemory 1024 --yarntaskManagerMemory 1024 -c org.apache.seatunnel.core.starter.flink.SeaTunnelFlink /usr/seatunnel/2.3.7/starter/seatunnel-flink-15-starter.jar --master yarn-application --config /usr/seatunnel/test.conf --name SeaTunnel

hanhanzhang commented 1 week ago

We can look at capabilities like spark --jar to see if flink can support it

We already do same thing on flink in

https://github.com/apache/seatunnel/blob/90cd46f50ac2f01e11ac5b7002688ea6c657cc82/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractFlinkRuntimeEnvironment.java#L235

The problem is we need upload it every times.

yes,

I tried to solve this problem in yarn deployment mode, using the yarn ship file to solve the configuration file delivery

I think we should face this problem first. Could you created a PR for this? @hanhanzhang

I'm still trying to work it out, yarn ship file will be added to task classpath, but yarn.provided.lib.dirs should be used for flink lib. @Hisoka-X