j3-signalroom / apache_flink-kickstarter

Examples of Apache Flink® applications showcasing the DataStream API, Table API in Java and Python, and Flink SQL, featuring AWS, GitHub, Terraform, Streamlit, and Apache Iceberg.
https://linkedin.com/in/jeffreyjonathanjennings
MIT License
1 stars 0 forks source link

`py4j.protocol.Py4JJavaError: An error occurred while calling o8.executeSql. : java.lang.IllegalArgumentException: bucket is null/empty`. #245

Closed j3-signalroom closed 2 months ago

j3-signalroom commented 2 months ago
2024-10-03 10:07:57,484 WARN  org.apache.hadoop.fs.FileSystem                              [] - Failed to initialize fileystem s3a://tf_snowflake_user: java.lang.IllegalArgumentException: bucket is null/empty
Traceback (most recent call last):
  File "/opt/flink/python_apps/kickstarter/flight_importer_app.py", line 706, in <module>
    main(known_args)
  File "/opt/flink/python_apps/kickstarter/flight_importer_app.py", line 607, in main
    tbl_env.execute_sql(f"""
  File "/opt/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 837, in execute_sql
  File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
  File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 146, in deco
  File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o8.executeSql.
: java.lang.IllegalArgumentException: bucket is null/empty
        at org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkArgument(Preconditions.java:144)
        at org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions(S3AUtils.java:1162)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:426)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3469)
        at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
        at org.apache.iceberg.hadoop.Util.getFs(Util.java:56)
        at org.apache.iceberg.hadoop.HadoopCatalog.initialize(HadoopCatalog.java:112)
        at org.apache.iceberg.CatalogUtil.loadCatalog(CatalogUtil.java:256)
        at org.apache.iceberg.flink.CatalogLoader$HadoopCatalogLoader.loadCatalog(CatalogLoader.java:86)
        at org.apache.iceberg.flink.FlinkCatalog.<init>(FlinkCatalog.java:114)
        at org.apache.iceberg.flink.FlinkCatalogFactory.createCatalog(FlinkCatalogFactory.java:166)
        at org.apache.iceberg.flink.FlinkCatalogFactory.createCatalog(FlinkCatalogFactory.java:139)
        at org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:485)
        at org.apache.flink.table.catalog.CatalogManager.initCatalog(CatalogManager.java:320)
        at org.apache.flink.table.catalog.CatalogManager.createCatalog(CatalogManager.java:312)
        at org.apache.flink.table.operations.ddl.CreateCatalogOperation.execute(CreateCatalogOperation.java:70)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1107)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:569)
        at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:840)

org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
        at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:569)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:108)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270)
        at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367)
        at java.base/java.security.AccessController.doPrivileged(AccessController.java:712)
        at java.base/javax.security.auth.Subject.doAs(Subject.java:439)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
        at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
        at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
        ... 17 more
j3-signalroom commented 2 months ago

Updated the FLINK properties in the docker-compose:

fs.s3a.access.key: $AWS_ACCESS_KEY_ID
fs.s3a.secret.key: $AWS_SECRET_ACCESS_KEY
fs.s3a.endpoint: s3.$AWS_REGION.amazonaws.com
j3-signalroom commented 2 months ago

Then, I updated the FlinkSQL statement:

tbl_env.execute_sql(f"""
        CREATE CATALOG {catalog_name} WITH (
            'type' = 'iceberg',
            'catalog-type' = 'hadoop',            
            'warehouse' = 's3a://{args.s3_bucket_name}/warehouse'
            );
    """)
j3-signalroom commented 2 months ago
Traceback (most recent call last):
  File "/opt/flink/python_apps/kickstarter/flight_importer_app.py", line 703, in <module>
    main(known_args)
  File "/opt/flink/python_apps/kickstarter/flight_importer_app.py", line 570, in main
    producer_properties = execute_kafka_properties_udtf(tbl_env, False, args.s3_bucket_name)
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/flink/python_apps/kickstarter/flight_importer_app.py", line 505, in execute_kafka_properties_udtf
    for row in result:
  File "/opt/flink/opt/python/pyflink.zip/pyflink/table/table_result.py", line 240, in __next__
  File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
  File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 146, in deco
  File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o297.hasNext.
: java.lang.RuntimeException: Failed to fetch next result
        at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:129)
        at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:100)
        at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:247)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:569)
        at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.io.IOException: Failed to fetch job execution result
        at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:187)
        at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:123)
        at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:126)
        ... 13 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 271d5a32474a2cfd6fb27dc103d6d160)
        at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
        at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2096)
        at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:185)
        ... 15 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 271d5a32474a2cfd6fb27dc103d6d160)
        at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:130)
        at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
        at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
        at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:302)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
        at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
        at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
        at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$35(RestClusterClient.java:901)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
        at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
        at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
        at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:302)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
        at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
        at java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:614)
        at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1163)
        at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        ... 1 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
        at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:128)
        ... 23 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
        at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:180)
        at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:277)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:268)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:261)
        at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:787)
        at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:764)
        at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
        at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)
        at jdk.internal.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:569)
        at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
        at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
        at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
        at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
        at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
        at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
        at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
        at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
        at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
        at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
        at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
        at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
        at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
        at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
        at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
        at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
        at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
        at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
Caused by: java.lang.RuntimeException: Failed to start remote bundle
        at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.startBundle(BeamPythonFunctionRunner.java:378)
        at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.checkInvokeStartBundle(BeamPythonFunctionRunner.java:359)
        at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.process(BeamPythonFunctionRunner.java:338)
        at org.apache.flink.table.runtime.operators.python.table.PythonTableFunctionOperator.processElementInternal(PythonTableFunctionOperator.java:182)
        at org.apache.flink.table.runtime.operators.python.table.PythonTableFunctionOperator.processElementInternal(PythonTableFunctionOperator.java:50)
        at org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.processElement(AbstractStatelessFunctionOperator.java:99)
        at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
        at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
        at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:425)
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:520)
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:110)
        at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:100)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:113)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:71)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:338)
Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct out-of-memory error has occurred. This can mean two things: either job(s) require(s) a larger size of JVM direct memory or there is a direct memory leak. The direct memory can be allocated by user code or some of its dependencies. In this case 'taskmanager.memory.task.off-heap.size' configuration option should be increased. Flink framework and its dependencies also consume the direct memory, mostly for network communication. The most of network memory is managed by Flink and should not result in out-of-memory error. In certain special cases, in particular for jobs with high parallelism, the framework may require more direct memory which is not managed by Flink. In this case 'taskmanager.memory.framework.off-heap.size' configuration option should be increased. If the error persists then there is probably a direct memory leak in user code or some of its dependencies which has to be investigated and fixed. The task executor has to be shutdown...
        at java.base/java.nio.Bits.reserveMemory(Bits.java:178)
        at java.base/java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:121)
        at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:332)
        at org.apache.beam.vendor.grpc.v1p48p1.io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:649)
        at org.apache.beam.vendor.grpc.v1p48p1.io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:624)
        at org.apache.beam.vendor.grpc.v1p48p1.io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:203)
        at org.apache.beam.vendor.grpc.v1p48p1.io.netty.buffer.PoolArena.tcacheAllocateSmall(PoolArena.java:173)
        at org.apache.beam.vendor.grpc.v1p48p1.io.netty.buffer.PoolArena.allocate(PoolArena.java:134)
        at org.apache.beam.vendor.grpc.v1p48p1.io.netty.buffer.PoolArena.allocate(PoolArena.java:126)
        at org.apache.beam.vendor.grpc.v1p48p1.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:396)
        at org.apache.beam.vendor.grpc.v1p48p1.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
        at org.apache.beam.vendor.grpc.v1p48p1.io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:124)
        at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.netty.NettyWritableBufferAllocator.allocate(NettyWritableBufferAllocator.java:51)
        at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.MessageFramer.writeKnownLengthUncompressed(MessageFramer.java:226)
        at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.MessageFramer.writeUncompressed(MessageFramer.java:168)
        at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.MessageFramer.writePayload(MessageFramer.java:141)
        at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.AbstractStream.writeMessage(AbstractStream.java:65)
        at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerCallImpl.sendMessageInternal(ServerCallImpl.java:171)
        at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerCallImpl.sendMessage(ServerCallImpl.java:153)
        at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:380)
        at org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver.onNext(SynchronizedStreamObserver.java:46)
        at org.apache.beam.runners.fnexecution.control.FnApiControlClient.handle(FnApiControlClient.java:94)
        at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor.newBundle(SdkHarnessClient.java:248)
        at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor.newBundle(SdkHarnessClient.java:197)
        at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.getBundle(DefaultJobBundleFactory.java:569)
        at org.apache.beam.runners.fnexecution.control.StageBundleFactory.getBundle(StageBundleFactory.java:87)
        at org.apache.beam.runners.fnexecution.control.StageBundleFactory.getBundle(StageBundleFactory.java:76)
        at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.startBundle(BeamPythonFunctionRunner.java:368)
        ... 15 more

org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
        at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:569)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:108)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270)
        at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367)
        at java.base/java.security.AccessController.doPrivileged(AccessController.java:712)
        at java.base/javax.security.auth.Subject.doAs(Subject.java:439)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
        at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
        at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
        ... 17 more
j3-signalroom commented 2 months ago
WARN  org.apache.hadoop.fs.FileSystem                              [] - Failed to initialize fileystem s3a://tf_snowflake_user/warehouse: java.lang.IllegalArgumentException: bucket is null/empty
Traceback (most recent call last):
  File "/opt/flink/python_apps/kickstarter/flight_importer_app.py", line 215, in <module>
    main(known_args)
  File "/opt/flink/python_apps/kickstarter/flight_importer_app.py", line 119, in main
    tbl_env.execute_sql(f"""
  File "/opt/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 837, in execute_sql
  File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
  File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 146, in deco
  File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o8.executeSql.
: java.lang.IllegalArgumentException: bucket is null/empty
        at org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkArgument(Preconditions.java:144)
        at org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions(S3AUtils.java:1162)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:426)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3469)
        at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
        at org.apache.iceberg.hadoop.Util.getFs(Util.java:56)
        at org.apache.iceberg.hadoop.HadoopCatalog.initialize(HadoopCatalog.java:112)
        at org.apache.iceberg.CatalogUtil.loadCatalog(CatalogUtil.java:256)
        at org.apache.iceberg.flink.CatalogLoader$HadoopCatalogLoader.loadCatalog(CatalogLoader.java:86)
        at org.apache.iceberg.flink.FlinkCatalog.<init>(FlinkCatalog.java:114)
        at org.apache.iceberg.flink.FlinkCatalogFactory.createCatalog(FlinkCatalogFactory.java:166)
        at org.apache.iceberg.flink.FlinkCatalogFactory.createCatalog(FlinkCatalogFactory.java:139)
        at org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:485)
        at org.apache.flink.table.catalog.CatalogManager.initCatalog(CatalogManager.java:320)
        at org.apache.flink.table.catalog.CatalogManager.createCatalog(CatalogManager.java:312)
        at org.apache.flink.table.operations.ddl.CreateCatalogOperation.execute(CreateCatalogOperation.java:70)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1107)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:569)
        at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:840)

org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
        at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:569)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:108)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270)
        at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367)
        at java.base/java.security.AccessController.doPrivileged(AccessController.java:712)
        at java.base/javax.security.auth.Subject.doAs(Subject.java:439)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
        at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
        at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
        ... 17 more
j3-signalroom commented 2 months ago

The problem was simple! The bucket name the code was referencing had underscores instead of dashes.