Open nitinlkoin1984 opened 2 years ago
Similar issue is reported here https://issues.apache.org/jira/browse/BEAM-11957
The following keeps executing and does not stop ...
@nitinlkoin1984 That sounds like kubernetes restarting the job over and over again. Could you check that pls.
2022/11/01 22:13:16 Failed to obtain provisioning information: failed to dial server at localhost:43225
The error you are seeing in the SDK container logs give hint to the core of the problem. The SDK harness expects to talk to the Spark worker on localhost
. This is by design, if communication would involve expensive network traffic performance would be unacceptable.
This is also an issue when testing the SDK harness with Spark on Docker on a Mac (or Windows). Docker doesn't support host networking on these platforms, hence communication between containers using localhost
isn't possible.
There's two environment options to allow that for testing purposes, see https://github.com/apache/beam/issues/23440#issuecomment-1271567347. However, that obviously limits you to using a single worker only.
I recommend you build a custom Spark image that also runs the SDK harness inside the same image. Otherwise you would need a custom operator to run worker pods that contain both a Spark worker and the SDK harness so that they can communicate via localhost.
This requires better documentation (#23438)!
What is the point of environment_type=EXTERNAL
if it can only communicate through localhost? If I wanted to communicate through localhost, there is the DOCKER or PROCESS environment_type. If remote communication is not allowed by design, then EXTERNAL mode should be outright removed. It seems to me like your SDK Harness Container needs to be configured correctly, specifically by setting the provision_endpoint to the externally accessible ip address of the Spark worker container, instead of localhost.
@angoenka would you be able to explain? I'm not very familiar with the SDK harness, particularly any design decisions in that area.
What is the purpose of SDK service? Is the service used to upload python dependencies to the spark cluster? Or the actual code is executed on the sdk service?
Also does it mean that each spark worker node should have it's own beam sdk service? We have our spark cluster is in a kubernates based environment, how should we deploy the sdk service?
Is there a difference between how sdk service should be deployed while running on Flink vs Spark cluster?
At least on the RDD-based Spark runner (i.e. not Dataset/Structured Streaming):
What is the purpose of SDK service?
All Beam pipelines are converted into a Java Spark RDD pipeline - if you are writing your DoFns in Python, Java RDDs cannot execute your Python code. So the SDK Harness contains your Python environment and Spark executes Python logic code there.
does it mean that each spark worker node should have it's own beam sdk service?
Spark workers communicate with SDK Harnesses via the gRPC Fn API. It's better to deploy them on the same host as the Spark worker in order to minimize network IO (data has to be sent back and forth between worker and SDK Harness for processing). You can deploy them on the same node as a Docker container or a process (see the --environment_type
option). However --environment_type EXTERNAL
has its own advantages, as the SDK Harness does not have to share resources (such as CPU and memory) with the Spark worker.
Thanks for answering @cozos 👍
We have our spark cluster is in a kubernates based environment, how should we deploy the sdk service?
The general idea is to deploy the SDK harness as sidecar to each Spark worker node. Have a look at sidecars
in the 8s sparkoperator.
Is there a difference between how sdk service should be deployed while running on Flink vs Spark cluster?
It's more or less the same. Have a look at Spotify's guide how to run Beam with the Flink operator and SDK harness sidecars. That should also help to better understand how to use Spark with sidecars.
thanks for reply.
One of our requirement is to perform large scale parallel/distributed ML inference. Our ML models need GPUs to do quick inference. I was wondering how the data is transferred between spark worker node and SDK service. Seems like converting data and send it back and forth between spark worker and sdk service may involve lot of overhead. Will a native Spark job be faster then a Beam Job? Is there a performance hit when we write jobs in Beam instead of Native Spark?
I am evaluating Beam because it provides with RunInference API to do large scale distributed ML inference. I cannot find such functionality in Native Spark. I will have to write my own custom component if I have to go with Spark.
Where is the inference code executed. Is it executed in the SDK harness service. If so can that service use the underlying GPUs. Also can I run any pytorch and HuggingFace Transformed model using RunInference.
Where is the inference code executed. Is it executed in the SDK harness service
Yes it is executed in the SDK Harness which is your case is a Docker container
If so can that service use the underlying GPUs. Also can I run any pytorch and HuggingFace Transformed model using RunInference.
You need to install dependencies such as pytorch
available in the Docker container. Same for GPUs - you need to install CUDA drivers and whatnot. You also need to do the needful for making GPUs accessible from Docker (I don't know how - probably here: https://docs.docker.com/config/containers/resource_constraints/#gpu). See instructions on how to build container here: https://beam.apache.org/documentation/runtime/environments/#custom-containers
Seems like converting data and send it back and forth between spark worker and sdk service may involve lot of overhead
Sending data back and forth through the Fn API involves serialization/deserialization and sending your data through the transport/network layer so yes there is overhead.
Will a native Spark job be faster then a Beam Job? Is there a performance hit when we write jobs in Beam instead of Native Spark?
You'd have to benchmark your pipeline. But my guess would be that native Spark is faster. In addition to the Fn API overhead, if you can use the higher level Spark APIs (Spark SQL, Dataframe/Dataset), Spark can apply additional optimizations (vectorization, codegen) to your transforms - whereas Beam Transforms/DoFns are a blackbox and cannot be optimized.
What happened?
I have deployed a Spark v3.1.2 cluster on kubernetes. My beam job server and beam sdk container are running on 2 separate linux virtual machines. The following keeps executing and does not stop
op = PipelineOptions([ "--runner=PortableRunner", "--job_endpoint=localhost:8099", "--environment_type=EXTERNAL", "--environment_config=vm2-hostname::50000", "--artifact_endpoint=localhost:8098" ] )
with beam.Pipeline(options=op) as p: p | beam.Create([1, 2, 3, 10]) | beam.Map(lambda x: x+1) | beam.Map(print)
The docker logs for the sdk container show the following error"
_Starting worker with command ['/opt/apache/beam/boot', '--id=1-1', '--logging_endpoint=localhost:43541', '--artifact_endpoint=localhost:44461', '--provision_endpoint=localhost:43225', '--control_endpoint=localhost:35475'] E1101 22:11:16.787804592 22 forkposix.cc:76] Other threads are currently calling into gRPC, skipping fork() handlers 2022/11/01 22:13:16 Failed to obtain provisioning information: failed to dial server at localhost:43225 caused by: context deadline exceeded
Issue Priority
Priority: 3
Issue Component
Component: sdk-py-harness