apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.81k stars 4.23k forks source link

Python Beam SDK Harness hangs when installing pip packages #20973

Open damccorm opened 2 years ago

damccorm commented 2 years ago

When running a Beam pipeline using Flink as backend, the python sdk harness hangs when trying to install pip packages. Tested using Flink 1.10.3.

Images used: 

apache/beam_python3.7_sdk:2.28.0

apache/flink:1.10.3

Beam args used are:

"--runner=FlinkRunner", "–flink_version=1.10", //same with 1.13 "--flink_master=http://flink-jobmanager.default:8081", f"--artifacts_dir=/mnt/flink", "--environment_type=EXTERNAL", "--environment_config=localhost:50000",

 

Specifically this was tested by running a TFX pipeline which gets submitted and registered as it should, but the SDK Harness hangs when installing:

2021/03/10 12:16:20 Initializing python harness: /opt/apache/beam/boot --id=1-1 --logging_endpoint=localhost:39795 --artifact_endpoint=localhost:34095 --provision_endpoint=localhost:42999 --control_endpoint=localhost:38129 2021/03/10 12:16:20 Found artifact: tfx_ephemeral-0.27.0.tar.gz 2021/03/10 12:16:20 Found artifact: extra_packages.txt 2021/03/10 12:16:20 Installing setup packages ... 2021/03/10 12:16:20 Installing extra package: tfx_ephemeral-0.27.0.tar.gz

and nothing else is shown irregardless how long it is left. I can manually install the TFX package by exec into the container in < 3 min.

The Flink task-manager then waits idling and periodically  logs:

2021-03-10 11:29:26,287 INFO org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory - Still waiting for startup of environment from localhost:50000 for worker id 1-1

Helm charts attached below.

Imported from Jira BEAM-11959. Original Jira may contain additional context. Reported by: ConverJens.

nika-qubit commented 2 years ago

@damccorm Hi Danny, out of curiosity, how did you handle the extra pip dependencies in your use case? Do you have to pip freeze into a requirements file and somehow stage it to the Flink job server through artifacts_dir?

damccorm commented 2 years ago

Hey @KevinGG I didn't actually report this issue, I just imported it from Jira. I believe @ConverJens reported it, there's also more detail in the linked Jira in the description

ConverJens commented 2 years ago

@KevinGG Beam has several options to propagate dependencies, in this case we used the --extra-package flag. We had the same behaviour when submitting setup.py or requirements.txt files though.

nika-qubit commented 2 years ago

@KevinGG Beam has several options to propagate dependencies, in this case we used the --extra-package flag. We had the same behaviour when submitting setup.py or requirements.txt files though.

I don't believe I know the answer. I'm running into other issues when sending a job with a requirements file to a remotely started flink cluster executing in DOCKER environment. I don't understand how a requirements file is staged from job submission to later execution in containers. So I'm trying to see if you've had similar issues.

ConverJens commented 2 years ago

@KevinGG When you specify a requirements file when starting your beam job, beam compresses your file and sends it to the Flink job master. The job master distributes it ti the worker nodes.

But if you have an issue with this, I would encourage you to open a new issue with more information.