j3-signalroom / apache_flink-kickstarter

Examples of Apache Flink® applications showcasing the DataStream API, Table API in Java and Python, and Flink SQL, featuring AWS, GitHub, Terraform, Streamlit, and Apache Iceberg.
https://linkedin.com/in/jeffreyjonathanjennings
MIT License
1 stars 0 forks source link

`ModuleNotFoundError: No module named 'aws_services`. #208

Closed j3-signalroom closed 2 months ago

j3-signalroom commented 2 months ago
root@1a9ede850708:/opt/flink/python_apps# flink run -py kickstarter/flight_importer_app.py --aws_s3_bucket tf_snowflake_user --pyFiles kickstarter/python_scripts.zip
Traceback (most recent call last):
  File "/opt/flink/python_apps/kickstarter/flight_importer_app.py", line 12, in <module>
    from helper.kafka_properties import get_kafka_properties
  File "/opt/flink/python_apps/kickstarter/helper/kafka_properties.py", line 8, in <module>
    from aws_services import AwsServices
ModuleNotFoundError: No module named 'aws_services'
org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
        at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:569)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:356)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:113)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270)
        at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367)
        at java.base/java.security.AccessController.doPrivileged(AccessController.java:712)
        at java.base/javax.security.auth.Subject.doAs(Subject.java:439)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
        at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
        at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
        ... 17 more
j3-signalroom commented 2 months ago

I resolved the problem by referencing the helper subpackage in the import call.

j3-signalroom commented 2 months ago
image
j3-signalroom commented 2 months ago

I used the incorrect return type hint in the get_kafka_properties. (Reminder to self, do not let Copilot choose the hint without making sure it is right!)

j3-signalroom commented 2 months ago
root@eaa1d928e755:/opt/flink/python_apps# flink run -py kickstarter/flight_importer_app.py --aws_s3_bucket tf_snowflake_user --pyFiles kickstarter/python_scripts.zip

 Kafka Property Table Schema:--->
(
  `property_key` STRING,
  `property_value` STRING
)

 Kafka Property Table Data:--->
Job has been submitted with JobID 1596da45d93c6cae30111f3e3aa05e05
Traceback (most recent call last):
  File "/opt/flink/python_apps/kickstarter/flight_importer_app.py", line 99, in <module>
    main(known_args)
  File "/opt/flink/python_apps/kickstarter/flight_importer_app.py", line 35, in main
    consumer_properties = get_kafka_properties(tbl_env, True, args.s3_bucket_name)
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/flink/python_apps/kickstarter/helper/kafka_properties.py", line 101, in get_kafka_properties
    func_results.execute().print()
  File "/opt/flink/opt/python/pyflink.zip/pyflink/table/table_result.py", line 219, in print
  File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
  File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 146, in deco
  File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o106.print.
: java.lang.RuntimeException: Failed to fetch next result
        at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:129)
        at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:100)
        at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:247)
        at org.apache.flink.table.utils.print.TableauStyle.print(TableauStyle.java:120)
        at org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:163)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:569)
        at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.io.IOException: Failed to fetch job execution result
        at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:187)
        at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:123)
        at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:126)
        ... 15 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 1596da45d93c6cae30111f3e3aa05e05)
        at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
        at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2096)
        at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:185)
        ... 17 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 1596da45d93c6cae30111f3e3aa05e05)
        at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:130)
        at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
        at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
        at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:302)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
        at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
        at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
        at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$35(RestClusterClient.java:901)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
        at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
        at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
        at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:302)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
        at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
        at java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:614)
        at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1163)
        at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        ... 1 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
        at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:128)
        ... 23 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
        at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:219)
        at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailureAndReport(ExecutionFailureHandler.java:166)
        at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:121)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:281)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:272)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:265)
        at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:787)
        at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:764)
        at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
        at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:515)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:569)
        at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:318)
        at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
        at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:316)
        at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:229)
        at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:88)
        at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:174)
        at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
        at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
        at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
        at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
        at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
        at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
        at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
        at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
        at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
        at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
        at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
        at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
        at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.completeProcessing(SourceStreamTask.java:368)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:340)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
        at org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:124)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.advanceToEndOfEventTime(SourceStreamTask.java:176)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:692)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.lambda$completeProcessing$0(SourceStreamTask.java:365)
        at org.apache.flink.util.function.FunctionUtils.lambda$asCallable$5(FunctionUtils.java:126)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
        at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:101)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:414)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:383)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:368)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
        at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.RuntimeException: Error while waiting for BeamPythonFunctionRunner flush
        at org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:107)
        at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.processWatermark(AbstractPythonFunctionOperator.java:189)
        at org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:122)
        ... 18 more
Caused by: java.lang.RuntimeException: Failed to close remote bundle
        at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:423)
        at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:407)
        at org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.lambda$invokeFinishBundle$0(AbstractExternalPythonFunctionOperator.java:86)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        ... 1 more
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction 1: Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 295, in _execute
    response = task()
               ^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 370, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 629, in do_instruction
    return getattr(self, request_type)(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 660, in process_bundle
    bundle_processor = self.bundle_processor_cache.get(
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 491, in get
    processor = bundle_processor.BundleProcessor(
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 903, in __init__
    self.ops = self.create_execution_tree(self.process_bundle_descriptor)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 982, in create_execution_tree
    return collections.OrderedDict([(
                                   ^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 985, in <listcomp>
    get_operation(transform_id))) for transform_id in sorted(
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 818, in wrapper
    result = cache[args] = func(*args)
                           ^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 964, in get_operation
    transform_consumers = {
                          ^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 965, in <dictcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 965, in <listcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
          ^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 818, in wrapper
    result = cache[args] = func(*args)
                           ^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 969, in get_operation
    return transform_factory.create_operation(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1266, in create_operation
    return creator(self, transform_id, transform_proto, payload, consumers)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 58, in create_table_function
    return _create_user_defined_function_operation(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 218, in _create_user_defined_function_operation
    return beam_operation_cls(
           ^^^^^^^^^^^^^^^^^^^
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 234, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 130, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 241, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation
  File "/usr/local/lib/python3.11/site-packages/pyflink/fn_execution/table/operations.py", line 150, in __init__
    super(TableFunctionOperation, self).__init__(serialized_fn)
  File "/usr/local/lib/python3.11/site-packages/pyflink/fn_execution/table/operations.py", line 86, in __init__
    self.func, self.user_defined_funcs = self.generate_func(serialized_fn)
                                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/pyflink/fn_execution/table/operations.py", line 160, in generate_func
    operation_utils.extract_user_defined_function(serialized_fn.udfs[0])
  File "/usr/local/lib/python3.11/site-packages/pyflink/fn_execution/utils/operation_utils.py", line 148, in extract_user_defined_function
    user_defined_func = pickle.loads(user_defined_function_proto.payload)
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/pyflink/fn_execution/pickle.py", line 29, in loads
    return cloudpickle.loads(payload)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
ModuleNotFoundError: No module named 'helper'

        at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
        at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
        at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:61)
        at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
        at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:607)
        at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:421)
        ... 7 more
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 1: Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 295, in _execute
    response = task()
               ^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 370, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 629, in do_instruction
    return getattr(self, request_type)(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 660, in process_bundle
    bundle_processor = self.bundle_processor_cache.get(
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 491, in get
    processor = bundle_processor.BundleProcessor(
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 903, in __init__
    self.ops = self.create_execution_tree(self.process_bundle_descriptor)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 982, in create_execution_tree
    return collections.OrderedDict([(
                                   ^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 985, in <listcomp>
    get_operation(transform_id))) for transform_id in sorted(
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 818, in wrapper
    result = cache[args] = func(*args)
                           ^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 964, in get_operation
    transform_consumers = {
                          ^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 965, in <dictcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 965, in <listcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
          ^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 818, in wrapper
    result = cache[args] = func(*args)
                           ^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 969, in get_operation
    return transform_factory.create_operation(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1266, in create_operation
    return creator(self, transform_id, transform_proto, payload, consumers)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 58, in create_table_function
    return _create_user_defined_function_operation(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 218, in _create_user_defined_function_operation
    return beam_operation_cls(
           ^^^^^^^^^^^^^^^^^^^
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 234, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 130, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 241, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation
  File "/usr/local/lib/python3.11/site-packages/pyflink/fn_execution/table/operations.py", line 150, in __init__
    super(TableFunctionOperation, self).__init__(serialized_fn)
  File "/usr/local/lib/python3.11/site-packages/pyflink/fn_execution/table/operations.py", line 86, in __init__
    self.func, self.user_defined_funcs = self.generate_func(serialized_fn)
                                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/pyflink/fn_execution/table/operations.py", line 160, in generate_func
    operation_utils.extract_user_defined_function(serialized_fn.udfs[0])
  File "/usr/local/lib/python3.11/site-packages/pyflink/fn_execution/utils/operation_utils.py", line 148, in extract_user_defined_function
    user_defined_func = pickle.loads(user_defined_function_proto.payload)
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/pyflink/fn_execution/pickle.py", line 29, in loads
    return cloudpickle.loads(payload)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
ModuleNotFoundError: No module named 'helper'

        at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:180)
        at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:160)
        at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:262)
        at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
        at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
        at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:332)
        at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:315)
        at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:834)
        at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
        ... 3 more

org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
        at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:569)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:356)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:113)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270)
        at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367)
        at java.base/java.security.AccessController.doPrivileged(AccessController.java:712)
        at java.base/javax.security.auth.Subject.doAs(Subject.java:439)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
        at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
        at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
        ... 17 more
j3-signalroom commented 2 months ago

The code still needs to be fixed with the UDTF calling functions in other modules. So, to resolve the matter, I put all code in the same module.