j3-signalroom / apache_flink-kickstarter

Examples of Apache Flink® applications showcasing the DataStream API, Table API in Java and Python, and Flink SQL, featuring AWS, GitHub, Terraform, Streamlit, and Apache Iceberg.
https://linkedin.com/in/jeffreyjonathanjennings
MIT License
1 stars 0 forks source link

`ModuleNotFoundError: No module named 'kafka_client_properties_lookup'`. #185

Closed j3-signalroom closed 2 months ago

j3-signalroom commented 2 months ago
The Flink App stopped during the reading of the custom data source stream because of the following: An error occurred while calling o58.hasNext.
: java.lang.RuntimeException: Failed to fetch next result
        at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:129)
        at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:100)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:569)
        at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.io.IOException: Failed to fetch job execution result
        at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:187)
        at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:123)
        at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:126)
        ... 12 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 682d2d1de56a433232797c7488c1f3b8)
        at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
        at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2096)
        at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:185)
        ... 14 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 682d2d1de56a433232797c7488c1f3b8)
        at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:130)
        at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
        at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
        at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:302)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
        at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
        at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
        at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$35(RestClusterClient.java:901)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
        at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
        at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
        at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:302)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
        at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
        at java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:614)
        at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1163)
        at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        ... 1 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
        at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:128)
        ... 23 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
        at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:219)
        at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailureAndReport(ExecutionFailureHandler.java:166)
        at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:121)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:281)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:272)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:265)
        at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:787)
        at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:764)
        at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
        at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:515)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:569)
        at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:318)
        at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
        at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:316)
        at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:229)
        at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:88)
        at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:174)
        at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
        at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
        at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
        at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
        at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
        at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
        at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
        at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
        at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
        at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
        at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
        at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
        at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.completeProcessing(SourceStreamTask.java:368)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:340)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
        at org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:124)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.advanceToEndOfEventTime(SourceStreamTask.java:176)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:692)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.lambda$completeProcessing$0(SourceStreamTask.java:365)
        at org.apache.flink.util.function.FunctionUtils.lambda$asCallable$5(FunctionUtils.java:126)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
        at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:101)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:414)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:383)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:368)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
        at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.RuntimeException: Error while waiting for BeamPythonFunctionRunner flush
        at org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:107)
        at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.processWatermark(AbstractPythonFunctionOperator.java:189)
        at org.apache.flink.streaming.api.operators.python.process.ExternalPythonProcessOperator.processWatermark(ExternalPythonProcessOperator.java:117)
        at org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:122)
        ... 18 more
Caused by: java.lang.RuntimeException: Failed to close remote bundle
        at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:423)
        at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:407)
        at org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.lambda$invokeFinishBundle$0(AbstractExternalPythonFunctionOperator.java:86)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        ... 1 more
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction 1: Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 295, in _execute
    response = task()
               ^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 370, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 629, in do_instruction
    return getattr(self, request_type)(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 660, in process_bundle
    bundle_processor = self.bundle_processor_cache.get(
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 491, in get
    processor = bundle_processor.BundleProcessor(
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 903, in __init__
    self.ops = self.create_execution_tree(self.process_bundle_descriptor)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 982, in create_execution_tree
    return collections.OrderedDict([(
                                   ^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 985, in <listcomp>
    get_operation(transform_id))) for transform_id in sorted(
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 818, in wrapper
    result = cache[args] = func(*args)
                           ^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 964, in get_operation
    transform_consumers = {
                          ^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 965, in <dictcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 965, in <listcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
          ^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 818, in wrapper
    result = cache[args] = func(*args)
                           ^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 969, in get_operation
    return transform_factory.create_operation(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1266, in create_operation
    return creator(self, transform_id, transform_proto, payload, consumers)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 132, in create_data_stream_keyed_process_function
    return _create_user_defined_function_operation(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 218, in _create_user_defined_function_operation
    return beam_operation_cls(
           ^^^^^^^^^^^^^^^^^^^
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 234, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 130, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 239, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation
  File "/usr/local/lib/python3.11/site-packages/pyflink/fn_execution/datastream/process/operations.py", line 83, in __init__
    ) = extract_stateless_function(
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/pyflink/fn_execution/datastream/process/operations.py", line 176, in extract_stateless_function
    user_defined_func = pickle.loads(user_defined_function_proto.payload)
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/pyflink/fn_execution/pickle.py", line 29, in loads
    return cloudpickle.loads(payload)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
ModuleNotFoundError: No module named 'kafka_client_properties_lookup'

        at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
        at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
        at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:61)
        at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
        at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:607)
        at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:421)
        ... 7 more
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 1: Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 295, in _execute
    response = task()
               ^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 370, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 629, in do_instruction
    return getattr(self, request_type)(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 660, in process_bundle
    bundle_processor = self.bundle_processor_cache.get(
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 491, in get
    processor = bundle_processor.BundleProcessor(
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 903, in __init__
    self.ops = self.create_execution_tree(self.process_bundle_descriptor)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 982, in create_execution_tree
    return collections.OrderedDict([(
                                   ^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 985, in <listcomp>
    get_operation(transform_id))) for transform_id in sorted(
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 818, in wrapper
    result = cache[args] = func(*args)
                           ^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 964, in get_operation
    transform_consumers = {
                          ^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 965, in <dictcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 965, in <listcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
          ^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 818, in wrapper
    result = cache[args] = func(*args)
                           ^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 969, in get_operation
    return transform_factory.create_operation(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1266, in create_operation
    return creator(self, transform_id, transform_proto, payload, consumers)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 132, in create_data_stream_keyed_process_function
    return _create_user_defined_function_operation(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 218, in _create_user_defined_function_operation
    return beam_operation_cls(
           ^^^^^^^^^^^^^^^^^^^
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 234, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 130, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 239, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation
  File "/usr/local/lib/python3.11/site-packages/pyflink/fn_execution/datastream/process/operations.py", line 83, in __init__
    ) = extract_stateless_function(
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/pyflink/fn_execution/datastream/process/operations.py", line 176, in extract_stateless_function
    user_defined_func = pickle.loads(user_defined_function_proto.payload)
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/pyflink/fn_execution/pickle.py", line 29, in loads
    return cloudpickle.loads(payload)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
ModuleNotFoundError: No module named 'kafka_client_properties_lookup'

        at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:180)
        at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:160)
        at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:262)
        at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
        at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
        at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:332)
        at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:315)
        at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:834)
        at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
        ... 3 more

org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
        at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:569)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:356)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:113)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270)
        at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367)
        at java.base/java.security.AccessController.doPrivileged(AccessController.java:712)
        at java.base/javax.security.auth.Subject.doAs(Subject.java:439)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
        at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
        at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
        ... 17 more
j3-signalroom commented 2 months ago

I am go to use Java in this case (read below):

The statement from pyflink.java_gateway import get_gateway is used in PyFlink, the Python API for Apache Flink, to import the get_gateway function from the pyflink.java_gateway module.

What Does get_gateway Do?

Java-Python Bridge: PyFlink relies on Py4J, a library that enables Python programs to communicate with Java virtual machines (JVM). The get_gateway function returns a JavaGateway object, which acts as a bridge between Python and the JVM where Flink is running. Access to Java Classes and Methods: With the JavaGateway object, you can invoke Java methods and access Java classes directly from your Python code. This is particularly useful for advanced use cases where you need functionality that's available in the Java API but not exposed in the Python API. When to Use get_gateway

Advanced Configurations: If you need to configure Flink in ways that aren't supported by the standard PyFlink APIs. Custom Extensions: When you're implementing custom connectors, formats, or functions that require interaction with Java classes. Debugging: For troubleshooting issues that require access to the underlying Java objects. Example Usage

python Copy code from pyflink.java_gateway import get_gateway

Get the Java Gateway

gateway = get_gateway()

Access a Java class (e.g., org.apache.flink.api.java.ExecutionEnvironment)

ExecutionEnvironment = gateway.jvm.org.apache.flink.api.java.ExecutionEnvironment

Use the Java class

env = ExecutionEnvironment.getExecutionEnvironment() Important Considerations

Complexity: Direct interaction with the Java gateway can make your code more complex and harder to maintain. Compatibility: Ensure that the Java classes and methods you are accessing are compatible with the version of Flink you are using. Performance: Cross-language calls can introduce overhead. Use them judiciously to minimize performance impacts. Conclusion

The get_gateway function is a powerful tool in PyFlink for advanced users who need direct access to Java functionalities within Apache Flink. It provides flexibility but should be used with caution due to the added complexity and potential performance implications.