uncleguanghui / pyflink_learn

基于 PyFlink 的学习文档,通过一个个小实践,便于大家快速入手 PyFlink
269 stars 97 forks source link

2_udf:报错 #6

Closed FankLi closed 3 years ago

FankLi commented 3 years ago

运行flink run -m localhost:8081 -py batch.py时,报错,错误信息如下:

`2021-01-04 20:03:03 org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386) at sun.reflect.GeneratedMethodAccessor79.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.RuntimeException: Failed to create stage bundle factory! at org.apache.flink.python.AbstractPythonFunctionRunner.createStageBundleFactory(AbstractPythonFunctionRunner.java:197) at org.apache.flink.python.AbstractPythonFunctionRunner.open(AbstractPythonFunctionRunner.java:164) at org.apache.flink.table.runtime.runners.python.scalar.AbstractGeneralPythonScalarFunctionRunner.open(AbstractGeneralPythonScalarFunctionRunner.java:65) at org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator$ProjectUdfInputPythonScalarFunctionRunner.open(AbstractStatelessFunctionOperator.java:186) at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:143) at org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:131) at org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:88) at org.apache.flink.table.runtime.operators.python.scalar.AbstractRowDataPythonScalarFunctionOperator.open(AbstractRowDataPythonScalarFunctionOperator.java:80) at org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.open(RowDataPythonScalarFunctionOperator.java:64) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Failed to execute the command: python -c import pyflink;import os;print(os.path.join(os.path.abspath(os.path.dirname(pyflink.file)), 'bin')) output: Traceback (most recent call last): File "", line 1, in ModuleNotFoundError: No module named 'pyflink'

at org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:198)
at org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:141)
at org.apache.flink.python.env.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:179)
at org.apache.flink.python.AbstractPythonFunctionRunner.createPythonExecutionEnvironment(AbstractPythonFunctionRunner.java:249)
at org.apache.flink.table.runtime.runners.python.AbstractPythonStatelessFunctionRunner.createExecutableStage(AbstractPythonStatelessFunctionRunner.java:158)
at org.apache.flink.python.AbstractPythonFunctionRunner.createStageBundleFactory(AbstractPythonFunctionRunner.java:195)
... 16 more`
uncleguanghui commented 3 years ago

最后一行已经显示了报错了:ModuleNotFoundError: No module named 'pyflink' 说明 pyflink 没有安装,请检查 pyflink 是否正确安装

FankLi commented 3 years ago

使用conda create --name pyflink_learn python=3.6创建的虚拟环境,并在当前环境下运行的:flink run -m localhost:8081 -py batch.py

环境详细如下: `(pyflink_learn) xiaodeMBP:5_online_machine_learning xiaoli$ pip list Package Version


apache-beam 2.19.0 apache-flink 1.11.2 avro-python3 1.9.1 backports.shutil-get-terminal-size 1.0.0 certifi 2020.12.5 chardet 4.0.0 click 7.1.2 cloudpickle 1.2.2 crcmod 1.7 cssselect2 0.4.1 dill 0.3.1.1 docopt 0.6.2 Faker 5.3.0 fastavro 0.21.24 Flask 1.1.2 Flask-Cors 3.0.9 future 0.18.2 grpcio 1.34.0 hdfs 2.5.8 httplib2 0.12.0 idna 2.10 itsdangerous 1.1.0 Jinja2 2.11.2 joblib 1.0.0 jsonpickle 1.2 kafka-python 2.0.2 lxml 4.6.2 MarkupSafe 1.1.1 mock 2.0.0 numpy 1.19.4 oauth2client 3.0.0 opencv-python 4.5.1.48 pandas 0.25.3 pbr 5.5.1 Pillow 8.1.0 pip 20.3.3 protobuf 3.14.0 py4j 0.10.8.1 pyarrow 0.15.1 pyasn1 0.4.8 pyasn1-modules 0.2.8 pydot 1.4.1 pyflink 1.0 pymongo 3.11.2 pyparsing 2.4.7 python-dateutil 2.8.0 pytz 2020.5 redis 3.5.3 reportlab 3.5.58 reprint 0.5.2 requests 2.25.1 rsa 4.6 scikit-learn 0.24.0 scipy 1.5.4 setuptools 51.0.0.post20201207 six 1.15.0 svglib 1.0.1 text-unidecode 1.3 threadpoolctl 2.1.0 tinycss2 1.1.0 typing 3.7.4.3 typing-extensions 3.7.4.3 urllib3 1.26.2 webencodings 0.5.1 Werkzeug 1.0.1 wheel 0.36.2 `

FankLi commented 3 years ago

虚拟环境已安装: apache-flink 1.11.2 pyflink 1.0

uncleguanghui commented 3 years ago

pyflink 这个模块你是怎么安装上的?问题应该出在这里,你可以尝试 uninstall 一下。

我的 pip list 里面只有 apache-flink 1.11.2 。

FankLi commented 3 years ago

重新建了一个虚拟环境,没有pyflink,只有apache-flink 1.11.2,还是报同样的错,摘取了错误信息的重要部分,你有什么思路吗

Caused by: java.io.IOException: Failed to execute the command: python -c import pyflink;import os;print(os.path.join(os.path.abspath(os.path.dirname(pyflink.file)), 'bin'))

uncleguanghui commented 3 years ago

思来想去还是环境的问题,我也是在 MBP 上用 conda 来管理环境,没有出现你的问题……

实在不行,可以试试对虚拟环境进行打包(已知你的虚拟环境名称是 pyflink_learn ),然后在运行时指定所用的 Python 解释器。

第一步:找到当前虚拟环境所在的目录。使用 which python,可以看到类似于下面的输出:/Users/xxxx/miniconda3/envs/pyflink_learn/bin/python,然后 cd 到 envs 目录 cd /Users/xxxx/miniconda3/envs

第二步:打包虚拟环境。使用 zip venv.zip -r pyflink_learn 可以将虚拟环境打包到 venv.zip 里,然后使用 pwd 命令获取压缩包的完整路径 pwd venv.zip

第三步:运行 flink 脚本,额外加上 pyarch 参数和 pyexec 参数,用于告知 flink 应该用哪个 python 解释器。如下:

flink run -m localhost:8081 \
  -pyarch xxxx/venv.zip \
  -pyexec xxxx/venv.zip/pyflink_learn/bin/python3 \
  -py batch.py
PGMonster commented 3 years ago

我这里也出现运行报错,报错内容大概是找不到python command,根据大佬的想法,我将命令flink run -py batch.py 改为 flink run -pyexec /opt/miniconda3/bin/python -py batch.py ,就是需要指定下python路径,就可以正常执行了,感谢感谢!!

FankLi commented 3 years ago

已解决,非常感谢耐心的回答

uncleguanghui commented 3 years ago

很高兴帮助到大家