apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.18k stars 2.38k forks source link

[SUPPORT]Flink SQL No FileSystem for scheme s3 #4297

Closed ngk2009 closed 2 years ago

ngk2009 commented 2 years ago

Tips before filing an issue

Describe the problem you faced

I used the Hudi0.10 version on the flink1.13.2 version. The flink storage has been successfully set to s3 access, and the storage access based on hdfs/alluxio is also normal, but there will be errors in the problem when the storage based on s3 is performed.

To Reproduce

Steps to reproduce the behavior:

flink set: 1.cd plugins/ 2.mkdir s3-fs-hadoop 3.cd .. 4.cp opt/flink-s3-fs-hadoop-1.13.2.jar plugins/s3-fs-hadoop/ 5.flink-conf.yaml s3.access-key: xxx s3.secret-key: xxx s3.endpoint: xxx.xxx.xxx.xxx:7480 s3.path.style.access: true s3.ssl.enabled: false 6.copy aws-java-sdk-bundle-1.11.134.jar/hadoop-aws-3.0.0-cdh6.3.0.jar to flink lib and set Hadoop class path: export HADOOP_CLASSPATH=hadoop classpath 7.in Flink SQL Client CREATE TABLE hudi_demo( id BIGINT PRIMARY KEY NOT ENFORCED, name STRING, birthday TIMESTAMP(3), ts TIMESTAMP(3), partition VARCHAR(20) ) PARTITIONED BY (partition) WITH ( 'connector' = 'hudi', 'table.type' = 'MERGE_ON_READ', 'path' = 's3://lakehouse/hudi/demo1/' ); 8.do a query select * from hudi_demo;

Expected behavior

Can not execute a Query or Insert: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"

Environment Description

org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Could not start RpcEndpoint jobmanager_2. at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:610) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:180) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.13.2.jar:1.13.2] at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.13.2.jar:1.13.2] at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.13.2.jar:1.13.2] at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.13.2.jar:1.13.2] at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.13.2.jar:1.13.2] at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.13.2.jar:1.13.2] at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.13.2.jar:1.13.2] Caused by: org.apache.flink.runtime.jobmaster.JobMasterException: Could not start the JobMaster. at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:385) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:605) ~[flink-dist_2.11-1.13.2.jar:1.13.2] ... 18 more Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to start the operator coordinators at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:90) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:592) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:955) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:873) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:383) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:605) ~[flink-dist_2.11-1.13.2.jar:1.13.2] ... 18 more Caused by: org.apache.hudi.exception.HoodieIOException: Failed to get instance of org.apache.hadoop.fs.FileSystem at org.apache.hudi.common.fs.FSUtils.getFs(FSUtils.java:104) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0] at org.apache.hudi.util.StreamerUtil.tableExists(StreamerUtil.java:266) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0] at org.apache.hudi.util.StreamerUtil.initTableIfNotExists(StreamerUtil.java:236) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0] at org.apache.hudi.sink.StreamWriteOperatorCoordinator.start(StreamWriteOperatorCoordinator.java:164) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0] at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:194) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:85) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:592) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:955) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:873) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:383) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:605) ~[flink-dist_2.11-1.13.2.jar:1.13.2] ... 18 more Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3" at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3215) ~[hadoop-common-3.0.0-cdh6.3.0.jar:?] at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3235) ~[hadoop-common-3.0.0-cdh6.3.0.jar:?] at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:123) ~[hadoop-common-3.0.0-cdh6.3.0.jar:?] at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3286) ~[hadoop-common-3.0.0-cdh6.3.0.jar:?] at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3254) ~[hadoop-common-3.0.0-cdh6.3.0.jar:?] at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:478) ~[hadoop-common-3.0.0-cdh6.3.0.jar:?] at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361) ~[hadoop-common-3.0.0-cdh6.3.0.jar:?] at org.apache.hudi.common.fs.FSUtils.getFs(FSUtils.java:102) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0] at org.apache.hudi.util.StreamerUtil.tableExists(StreamerUtil.java:266) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0] at org.apache.hudi.util.StreamerUtil.initTableIfNotExists(StreamerUtil.java:236) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0] at org.apache.hudi.sink.StreamWriteOperatorCoordinator.start(StreamWriteOperatorCoordinator.java:164) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0] at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:194) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:85) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:592) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:955) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:873) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:383) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181) ~[flink-dist_2.11-1.13.2.jar:1.13.2] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:605) ~[flink-dist_2.11-1.13.2.jar:1.13.2]

danny0405 commented 2 years ago

you need to set up the s3 config in hadoop core-site file ? do you package the s3 package in the bundle jar ?

ngk2009 commented 2 years ago

you need to set up the s3 config in hadoop core-site file ? do you package the s3 package in the bundle jar ?

Thank you for your reply, I did not find in the guide about flink setting core-site.xml, and I do not plan to use hadoop environment, just build data lake analysis based on S3 storage。 The bundle jar I found in hadoop jars library folder,Can't confirm whether it's packaged in。 Regarding whether Hadoop has a dependency on S3, I think it can be ruled out, because I use the s3 storage for the state.backend configured by flink and run it successfully on yarn.

danny0405 commented 2 years ago

Flink use its own plugin to support filesystems other than HDFS. Hudi adapter to different DFS by extending the FileSystem interface directly.

york-yu-ctw commented 2 years ago

I have some tips

  1. put flink-s3-fs-hadoop into /opt/flink/lib
  2. add hadoop-hdfs-client, hadoop-aws, hadoop-mapreduce-client-core into /opt/flink/lib as well
  3. if there is aws credential issue, try to set up core-site.xml (I had this issue when using hudi 0.9.0 and flink 1.12.2)
ngk2009 commented 2 years ago

Flink use its own plugin to support filesystems other than HDFS. Hudi adapter to different DFS by extending the FileSystem interface directly.

How to solve it?thanks

ngk2009 commented 2 years ago

I have some tips

1. put `flink-s3-fs-hadoop` into `/opt/flink/lib`

2. add `hadoop-hdfs-client`, `hadoop-aws`, `hadoop-mapreduce-client-core` into  `/opt/flink/lib` as well

3. if there is aws credential issue, try to set up core-site.xml (I had this issue when using hudi 0.9.0 and flink 1.12.2)

Unfortunately, I tried your method but did not succeed。Copy those jar packages to the /opt/flink/lib and $FLINK_HOME/lib directories at the same time, and then restart the yarn session : ERROR org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Error while running the Flink session. java.lang.NoSuchFieldError: IGNORE_CLIENT_LOCALITY

This may be caused by the hadoop version that flink depends on. I try to recompile flink-fs-hadoop. If hudi does not rely on hadoop's filesystem, just use s3's sdk, the dependence is too heavy

danny0405 commented 2 years ago

Flink use its own plugin to support filesystems other than HDFS. Hudi adapter to different DFS by extending the FileSystem interface directly.

How to solve it?thanks

Hudi did depend on the hadoop FileSystem interface, what we need to do is adding the aws s3 FileSystem impl codes in the classpath, and it's specific configuration should also be configured in hadoop Configuration, you can reference the StreamerUtil.getHadoopConf how we fetch the hadoop configuration in the flink pipeline.

ngk2009 commented 2 years ago

Flink use its own plugin to support filesystems other than HDFS. Hudi adapter to different DFS by extending the FileSystem interface directly.

How to solve it?thanks

Hudi did depend on the hadoop FileSystem interface, what we need to do is adding the aws s3 FileSystem impl codes in the classpath, and it's specific configuration should also be configured in hadoop Configuration, you can reference the StreamerUtil.getHadoopConf how we fetch the hadoop configuration in the flink pipeline.

hi,I found you changed the class loader in file (https://github.com/apache/hudi/pull/4042) org.apache.hudi.sink.StreamWriteOperatorCoordinator:Thread.currentThread().setContextClassLoader(getClass().getClassLoader()),Will the classpath in the hudi-bundle jar be used? Will the classes under flink/lib fail to load and cause the S3 schema not to be found? so which "the aws s3 FileSystem impl codes in the classpath" package to be add in flink lib? current in flink lib jars are: hadoop-aws-3.0.0-cdh6.3.0.jar hadoop-common-3.0.0-cdh6.3.0.jar hadoop-hdfs-client-3.0.0-cdh6.3.0.jar hadoop-mapreduce-client-core-3.0.0-cdh6.3.0.jar flink-s3-fs-hadoop-1.13.3.jar aws-java-sdk-s3-1.11.836.jar hudi-aws-0.10.0.jar hudi-flink-bundle_2.11-0.10.0.jar

ngk2009 commented 2 years ago

S3:// cannot be used now, and s3a:// can be used. In addition, s3-fs-hadoop under flink will always appear in the INITIALIZING state when the task is submitted. In addition, the dependency of hadoop and core-site.xml must be required. ,This dependency is too heavy, I switched to using alluxio:// to write data to S3 UFS 。 Thanks for help。