Open yuvipanda opened 2 years ago
My current thinking is to disqualify the Spark Runner, try to validate my assumptions about Kinesis Data Analytics, and if not, run this on Flink on k8s (which seems to have decent support) on EKS.
π @charlesa101 @lerms we met at the Beam Summit in Austin this past July after I attended your talk Implementing Cloud Agnostic Machine Learning Workflows With Apache Beam on Kubernetes. As I mentioned then, we are currently running Beam on Dataflow, but would really like to support AWS as well. You two seemed to have this infra really dialed, so I thought I'd reach out in hopes that our team can learn from your experience.
Any insights you could offer would be greatly appreciated! π As one question to start, are there any particular open repos in https://github.com/MavenCode which would be helpful for us to take a look at?
@yuvipanda I did some initial research on option 2 and 3 when we began the Beam refactoring efforts. To simplify development in the near term @cisaacstern wanted to initially focus efforts on ensuring that our Beam based recipes would work correctly with Dataflow so we paused our work on Beam executor "bakeries" for other cloud providers, but now looks like a good time to start revisiting this.
I agree that the documentation around Kinesis Data Analytics (KDA) as a Flink runner with respect to Python based Beam pipelines is quite sparse. This presentation seems to claim that KDA is a fully featured managed Flink platform which should be capable of running our portable pipelines but I have not seen any tangible examples of this in the public space. @batpad is planning on trying to investigate this option this week and the AWS folks we work with on the ASDI initiative are going to try to put us in contact with some of the KDA team. With @batpad I proposed we
The Flink on k8s landscape seems to be a bit fragmented as well. To avoid complexity (as I'm not a deep k8s person π ) I was hoping that I might find a straightforward helm chart, but the operators you listed above seemed to be the only publicly available repos and the architecture for flink-kubernetes-operator
seemed somewhat intimidating for our usage.
If it functions for our use case, I like the simplicity and lack of infrastructure management of a fully managed service like KDA. The one benefit of Flink on k8s is that we would also have a consistent Beam executor model for use on Azure as well.
@sharkinsspatial I think investigating KDA to see if it works, and falling back to a flink operator on EKS seems the right way to approach this.
Doing this with flink + k8s is definitely more complex than a fully managed service, but I want to reassure you that that architecture diagram isn't so bad - JupyterHub is probably about just as complex :)
@sharkinsspatial I was thinking what would we need to do to make sure it works on KDA:
So if we can get these two to work, then we can just use KDA!
Supports running Python with our custom container image (very important here!)
Spent a bit of time trying to figure out how this might work. I might be being a bit naive here, just coming into this situation and missing something, but I don't see any way we can run a custom container using KDA.
The way I'm understanding KDA:
You create a Kinesis stream and then setup a Kinesis Data Analytics Application on AWS. When setting this up, you pick a version of Apache Flink it will run. You configure this application by giving it a Kinesis stream to read events / data from, and a path to s3 that contains your application code in either Java or Python. Here is the example for a python application: https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/tree/master/python . It looks like, for example, you need to specify the python packages your app needs with something like https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/tree/master/python/PythonPackages .
With this configured, one could then send messages to the Kinesis stream, and the AWS managed service will run your code against data coming through the stream. I am seeing absolutely no way to do something like specify a custom container image / where exactly one would submit jobs similar to how we are using DataFlow.
I feel like I don't completely understand our setup and maybe missing something. I could go ahead and create a KDA on AWS and try doing things, but it does seem like a bit of work and currently I do not see any pathway at all to getting it to do what we need it to.
@sharkinsspatial let me know if it helps to chat.
@batpad That matches what my understanding is too from reading docs. But I think you and @sharkinsspatial have more AWS experience. If we can confirm to both your satisfaction that this isn't possible, we can tick that box and move on to EKS.
π @charlesa101 @lerms we met at the Beam Summit in Austin this past July after I attended your talk Implementing Cloud Agnostic Machine Learning Workflows With Apache Beam on Kubernetes. As I mentioned then, we are currently running Beam on Dataflow, but would really like to support AWS as well. You two seemed to have this infra really dialed, so I thought I'd reach out in hopes that our team can learn from your experience.
Any insights you could offer would be greatly appreciated! π As one question to start, are there any particular open repos in https://github.com/MavenCode which would be helpful for us to take a look at?
Hi @cisaacstern We ran our setup on Kubernetes with Spark Master/Worker containers created as Statefulsets and we had the Jobservice pod that connects to the Spark Master to submit our beam jobs.
SparkOperator on Kubernetes option worked as well but the version of Spark currently supported on SparkOperator won't work with the Jobservice because we wanted to use the PortableRunner
@lerms and I will open source what we have on our git repo https://github.com/MavenCode in the next few days, Sorry it's been a bit busy here
@charlesa101 thanks so much for these insights! I believe we will want to use PortableRunner as well, because we want to use our own Docker images (and I think that's how you do that in Beam).
Thank you so much for being willing to open source your work! If it's not too much to ask, could you post a link to the repo here once it's up? Again, can't thank you enough for sharing this experience. π
Sure! You're welcome! we need more contributors and people to kick it around as well to make it better
Looping in @briannapagan.
Merged #21 that runs on AWS! Here's a tutorial on how to use it: https://pangeo-forge-runner.readthedocs.io/en/latest/tutorial/flink.html.
Need folks to try it out though.
Hey @KevinGG! @pabloem recommended that I reach out to you for questions on Beam's Flink runner for Python. @yuvipanda can probably recreate the problems we're facing better than I can, but to summarize:
parallelism
argument we can set, but after investigating for a while, we've found that this isn't supported / respected in the Python Flink runner (which uses the Portable Beam Runner). Reshuffle()
to specify parallel stages of execution (and to prevent operator fusion). Does Flink treat reshuffles in the same way? We recognize that the problem at hand could be that all operations are fused into one, and thus only one thread of execution is needed. If this is the case, what's the right way to restructure our pipeline?
- When running a Beam pipeline on Flink (for this project), we notice that only one worker is ever used (parallelism=1)
This is true. To be more precisely, one task per step.
- We've noticed that the Apache Beam documentation for Flink says there is a
parallelism
argument we can set, but after investigating for a while, we've found that this isn't supported / respected in the Python Flink runner (which uses the Portable Beam Runner).
If the pipeline option is added (I'm doing it in https://github.com/apache/beam/pull/23271), it will be recognized by the Java job server (flink runner). But one caveat is that the parallelism is applied to the whole pipeline, so all the steps will share the same parallelism. Relatedly, there is also a max_parallelism option.
- Our Beam pipeline runs in parallel on Dataflow. Here, it makes extensive use of
Reshuffle()
to specify parallel stages of execution (and to prevent operator fusion). Does Flink treat reshuffles in the same way? We recognize that the problem at hand could be that all operations are fused into one, and thus only one thread of execution is needed. If this is the case, what's the right way to restructure our pipeline?
Yes and you have to explicitly apply Reshuffle when using Flink, otherwise, elements fanned out will always stay on one Task Manager (worker); while in Dataflow, sometimes it knows how to rebalance data to more VMs without an explicit Reshuffle.
Looks like Beam 2.42.0 was released in October, including the merged PR to support parallelism https://github.com/apache/beam/pull/23271 So maybe we just need a passthrough here in the runner?
Woohoo, finally success running Beam pipelines at scale on AWS!
We first deployed the Kubernetes cluster running Apache Flink on AWS using the @yuvipanda Terraform deployment method. The environment we used to run the terraform stuff is described here.
Then to execute pangeo-forge beam recipes from the command line we created this environment
name: runner
channels:
- conda-forge
dependencies:
- python=3.9.13
- pangeo-forge-recipes=0.10.0
- apache-beam=2.42.0
- pandas<2.0
- s3fs
- pip:
- git+https://github.com/pangeo-forge/pangeo-forge-runner.git@main
with this aws_config.py
:
c.TargetStorage.root_path = "s3://esip-qhub/testing/pangeo_forge/{job_name}"
c.TargetStorage.fsspec_class = "s3fs.S3FileSystem"
c.TargetStorage.fsspec_args = { "key":"xxxxxxxxxxxxx", "secret":"xxxxxxxxxxxx", "client_kwargs":{"region_name":"us-west-2"}}
c.InputCacheStorage.fsspec_class = c.TargetStorage.fsspec_class
c.InputCacheStorage.fsspec_args = c.TargetStorage.fsspec_args
c.InputCacheStorage.root_path = "s3://esip-qhub/testing/pangeo-forge/pangeo_forge/inputcache-data/"
c.Bake.bakery_class = "pangeo_forge_runner.bakery.flink.FlinkOperatorBakery"
Example call from the command line:
pangeo-forge-runner bake --repo https://github.com/pforgetest/gpcp-from-gcs-feedstock/ --ref beam-refactor --config aws_config.py --prune --FlinkOperatorBakery.parallelism=10 --Bake.job_name=awstest02
and here you can see the requested parallelism: including the pods spinning up to meet the request:
OMG THIS IS FUCKING AMAZING @rsignell-usgs!!!!!!
Credit where credit is due: its you and @cisaacstern !!
@rsignell-usgs: I know it's been a couple months. I'm over on this side of town trying to reproduce your success and getting sorta/maybe close with the same recipe but never quite there.
A couple questions:
pangeo-forge-runner
command above is using --prune
. Did you try it without --prune
?For me the task pods are never completing successfully (meaning I expect Python subprocess to exit with a status code of 0 and then quietly be killed without any traceback or error). Sometimes a job will create zarr output but my hunch is that StoreToZarr
isn't completing successfully b/c tasks running the subcommand python -m apache_beam.runners.worker.sdk_worker_main
give me a potentially dreaded Python exited: <nil>
Sometimes a job will create zarr output but my hunch is that
StoreToZarr
isn't completing successfully b/c tasks running the subcommandpython -m apache_beam.runners.worker.sdk_worker_main
give me a potentially dreadedPython exited: <nil>
I realized after I wrote this from the perspective of the parent Golang process <nil>
might be the success status π€ So let me compare outputs
Update: zarr outputs match what the GCP DataFlow integrations tests have. Here are some notes about getting this recipe working on EKS and Flink from Oct 11th-17th that might be helpful for future humanoids β¨ π€
Most of my time has been spent trying to understand if jobs/tasks are actually succeeding. The lack of clear job status updates on the kind: flinkdeployment
has to be a bug. Regarding my last comment Python exited: <nil>
should convey success from the perspective of the Golang parent process. But even if that's returned there's no guarantee that it produced zarr output depending on which branch and version we're using. Here's a breakdown:
produces zarr output |
k8s flinkdeployment status |
pangeo forge runner branch |
flink operator version |
flink version |
apache beam version |
---|---|---|---|---|---|
no | JOB STATUS: "empty" | main | 1.5.0 | 1.15 | 2.42.0 |
yes | JOB STATUS: "empty" | unpin-beam | 1.5.0 | 1.16 | 2.[47-51].0 (all versions listed https://repo.maven.apache.org/maven2/org/apache/beam/beam-runners-flink-1.16/) |
When working with the pangeo-forge-runner@unpin-beam branch and experimenting with custom pangeo-forge-recipe
branches and different apache-beam
versions I've noticed the following things about caches and delays (delays in terms of the JobManager
being delayed in kicking off the TaskManager
):
you don't want to have multiple versions apache-beam
pip installed or the apache-beam
client (which pangeo-forge-runner
delegates to) might choose the wrong jar (~/.apache_beam/cache/jars/*.jar
) and SDK image
if you're playing with different dependencies and versions of pangeo-forge-recipes
you also want to make sure the cache where the apache-beam
client temporarily downloads packages is clean otherwise EVERYTHING has to get uploaded to the flink server and then EVERYTHING gets staged on the workers and then it's not clear what's actually running in the workers. In the pangeo-forge-runner
cli output (toward the end) you should see a line similar to below showing where the cache dir is located on the host as the value of the --dest
flag
Executing command: ['/Users/ranchodeluxe/apps/venv_py39/bin/python', '-m', 'pip', 'download', '--dest', '/var/folders/99/ts9mxkwx1n73mbwqvbjztfbh000
0gr/T/dataflow-requirements-cache', '-r', '/var/folders/99/ts9mxkwx1n73mbwqvbjztfbh0000gr/T/tmps3_6zbwx/tmp_requirements.txt', '--exists-action', 'i
', '--no-deps', '--implementation', 'cp', '--abi', 'cp39', '--platform', 'manylinux2014_x86_64']
some of the delay between JobManager
and TaskManager
is b/c the JobManager
needs a little more resource juice. Been running with this override: --FlinkOperatorBakery.job_manager_resources='{"cpu": 1.0}'
other aspects of the delay seem to be related to the fact that the apache-beam
client has to load the job-server jar and all our requirements to the flink server. Not sure if that can be mitigated
We wanna run on AWS! Specifically, I'm going to try demo this running on AWS at a NASA meeting on September 25, so that's the use case :)
On GCP, we have managed runner with DataFlow, and this solves our problem. nbd.
On AWS, I investigated possible managed beam runner options. The two were:
(1) is only kinda semi-managed - we'll still need to run some additional unmanaged infrastructure to run the Beam jobserver, and that sucks.
(2) is a bit of a mystery, from the inscrutable name (is it only meant for use with Kinesis?) to the complete lack of information on the internet about running beam pipelines with Python on top of this. From what I can gather, it can only run Java pipelines, and there isn't space for the portable runner that'll allow us to run Python.
The other option is to run Apache Flink on Kubernetes, with one of these two Flink operators (https://github.com/apache/flink-kubernetes-operator or https://github.com/GoogleCloudPlatform/flink-on-k8s-operator).
A few questions we need to answer before choosing: