Open jeremyje opened 1 year ago
I totally agree that we need better documentation of portable runners in general. I've been digging through python and go SDK code for days trying to get portable runners to work and OMG the official documentation was bad.
It seems like you misunderstood LOOPBACK environment. Loopback starts a server on your host machine (not the flink node) and is solely for debugging locally. The beam generated code running on flink will connect to your go code on your machine to run UDFs, so it's bounded by local memory.
As for DOCKER environment, the beam code will try to run the apache/beam_go_SDK image on flink node so you will need to be able to run docker on that machine. To run docker in docker containers, you need to set privileged: true
in your docker-compose config. Although it would be nice to run the SDK container as a side car using the EXTERNAL environment option in containerized environments such as k8s, it looks like the go SDK has not provided worker pool in SDK harness container yet unlike python and java SDKs.
Another important thing to note, you need to mount the job server artifact directory to SDK harness containers as they expect to find artifacts in the same directory.
I'm not a beam expert though, please take these with a grain of salt.
Here's a tip for anyone who stumbles upon similar issues with spark portable runner.
The official document says you should use apache/beam_spark_job_server
docker image with spark 3.2.x
but that didn't work for me. I used apache/beam_spark3_job_server
with spark 3.1.2
instead.
@LibofRelax Do you have any sample configs or basic setup instructions beyond what was said before that would be useful? In particular how are you running the runner image and what version?
Also I cannot find 3.1.2
of https://hub.docker.com/r/apache/beam_spark3_job_server/tags. The versions appear to be modeled after the beam version like 2.49.0
. Also same for, https://hub.docker.com/r/apache/beam_spark_job_server/tags.
@jeremyje It's been a while since I gave up on using Beam in production, so this sample might not be fully correct. I also only used spark, maybe flink runner doesn't have the problems I had.
Here's a sample docker compose config that kinda worked for me. It runs smoothly until UDF execution. There seems to be a version mismatch between the job server and the worker implementation despite they have the same version tag. The worker will expect a log endpoint from the job on spark worker which the spark worker does not seem to expose. You can try it out with --environment EXTERNAL --environment_config beam-python-workers:50000
I suggest you try the docker-in-docker environment option. Maybe the default docker image is consistent with the job implementation. I already set the privileged flag to true in the compose file for that. If using --environment DOCKER
, beam-python-workers
service won't be needed.
version: '3'
volumes:
tmp:
services:
spark:
image: docker.io/bitnami/spark:3.1.2
environment:
- SPARK_MODE=master
ports:
- "8080:8080"
spark-worker:
image: docker.io/bitnami/spark:3.1.2
privileged: true # To run docker SDK harness
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark:7077
- SPARK_WORKER_MEMORY=4g
- SPARK_WORKER_CORES=1
- BEAM_WORKER_POOL_IN_DOCKER_VM=1
- DOCKER_MAC_CONTAINER=1
ports:
- "8081:8081"
- "8100-8200:8100-8200"
volumes:
- tmp:/tmp
- ./work/spark:/opt/bitnami/spark/work
beam-python-workers:
image: apache/beam_python3.10_sdk:2.49.0
command: [ "--worker_pool" ]
environment:
- RUN_PYTHON_SDK_IN_DEFAULT_ENVIRONMENT=1
volumes:
- tmp:/tmp
beam-job-server:
image: apache/beam_spark3_job_server:2.49.0
command: [ "--spark-master-url=spark://spark:7077" ]
ports:
- "4040:4040" # Spark job UI on the driver
- "8099:8099" # Job endpoint
- "8098:8098" # Artifact endpoint
volumes:
- tmp:/tmp
depends_on:
- spark
- spark-worker
Here's a tip for anyone who stumbles upon similar issues with spark portable runner.
The official document says you should use
apache/beam_spark_job_server
docker image with spark3.2.x
but that didn't work for me. I usedapache/beam_spark3_job_server
with spark3.1.2
instead.
@LibofRelax thanks a lot for this answer. I spent 2 weeks trying distinct combinations of Spark and Beam versions without success.
With your tip, it worked!
Has there been any work on this? I'm struggling to understand how to make the Beam work through the Flink task manager on Kubernetes. Obviously, with Kubernetes, DOCKER
is not an option, only EXTERNAL
.
With Python, it looks like you run apache/beam_python_sdk --worker_pool
as a sidecar inside the Flink task manager pod. But the Beam boot command in apache/beam_go_sdk
doesn't have this flag. It's also unclear what you're supposed to pass as arguments (like --id
) to this command.
There are zero examples on doing this with Go. Even after several years of apparent Beam + Go support, nobody seems to have posted any code examples, and there's just a tiny handful of StackOverflow posts that don't have any solutions.
it looks like the go SDK has not provided worker pool in SDK harness container yet unlike python and java SDKs.
It this a prerequisite, then? Are the plans to work on it?
What happened?
Request to update the instructions on how to run Apache Beam portable runner on Flink and consider Go. Also test that the instructions work on newer versions of Flink (1.17 or 1.16) with updated runner docker images.
I followed instructions on running Apache Flink with Apache Beam 2.46.0 Go SDK. I'm hitting all sorts of issues and have not gotten my pipelines to work based on the instructions given at:
At best, I am able to bring up a Flink instance with a pipeline that processes a limited amount of data ~200 MB and then seizes without any feedback. This pipeline runs in
environment_type=LOOPBACK
mode. I'm starting to think there's some weird compatibility issue as I'd really prefer to run in the default-environment_type=DOCKER
mode. It's not clear if this mode supported given the example goes up to Flink 1.14 which is no longer listed on the Flink site.It'd be nice to get updated documentation for running Beam on a Flink cluster and have it target a newer version, ~Flink 1.17.
These are the steps I've taken:
Run the Pipeline with the following parameters:
Only
-environment_type=LOOPBACK
has worked for me to submit jobs but it still fails after processing about 100MiB of data which the memory limits are well above that (see theflink-conf.yaml
below showstaskmanager.memory.process.size: 20g
) With trying the various ways to runjob-server
the most I get is a successful job submit inI've attempted other ways to workaround the various errors I've been seeing such as no docker available in the
apache/beam_flink1.15_job_server:2.46.0
container.Versions
Attempt with
docker-compose
on the job server:The docker image for
jeremyje/beam_flink1.15_job_server:2.46.0
image when trying to get docker mode to work.My
conf/flink-config.yaml
which mainly exposes all the ports outside of the machine so I can submit jobs from another machine.Logs from the jobserver.
Issue Priority
Priority: 3 (minor)
Issue Components