Open rabernat opened 1 year ago
I think you will want to use the InteractiveRunner:
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
# ...
with beam.Pipeline(runner=InteractiveRunner(), options=beam_options) as p:
p | transforms
e.g. https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/interactive/README.md
Thanks Alex! Unfortunately no change. Core dumps if I try to use multi_processing
or multi_threading
.
Hmm... maybe @davidcavazos would know?
What versions of Beam + Python are you running?
Possibly related to https://github.com/pangeo-forge/pangeo-forge-runner/issues/78?
Not sure if it's related, but also getting failures when trying to run a pipeline on a large linux machine.
Running on python==3.9.13
with the beam InteractiveRunner
with beam.Pipeline(runner=InteractiveRunner(), options=beam_options) as p:
p | transforms
Traceback (most recent call last):
File "/opt/conda/envs/nasa-nex/lib/python3.9/runpy.py", line 197, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/opt/conda/envs/nasa-nex/lib/python3.9/runpy.py", line 87, in _run_code
exec(code, run_globals)
File "/opt/conda/envs/nasa-nex/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker_main.py", line 363, in <module>
main(sys.argv)
File "/opt/conda/envs/nasa-nex/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker_main.py", line 204, in main
sdk_pipeline_options) = create_harness(os.environ)
File "/opt/conda/envs/nasa-nex/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker_main.py", line 160, in create_harness
sdk_harness = SdkHarness(
File "/opt/conda/envs/nasa-nex/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 194, in __init__
grpc.channel_ready_future(self._control_channel).result(timeout=60)
File "/opt/conda/envs/nasa-nex/lib/python3.9/site-packages/grpc/_utilities.py", line 162, in result
self._block(timeout)
File "/opt/conda/envs/nasa-nex/lib/python3.9/site-packages/grpc/_utilities.py", line 106, in _block
raise grpc.FutureTimeoutError()
grpc.FutureTimeoutError
@jrmccluskey: have you seen this error before? Is there a quick fix, or is it worth filing an issue upstream?
Not something I'm familiar with, although I wouldn't be surprised if the interactive runner didn't support multithreading.
To be clear, both the interactive runner and the direct runner segfault here when attempting to use either multithreading or multiprocessing.
Double-checked, neither supports multithreading at the moment. You can file a feature request for Beam, but I couldn't tell you if that would be picked up any time soon
Thanks for the reply!
This doc page - https://beam.apache.org/documentation/runners/direct/ - strongly suggests that the python direct runner does work:
direct_running_mode can be one of ['in_memory', 'multi_threading', 'multi_processing'].
Am I misunderstanding something?
I routinely use multithreading on the direct runner (never used interactive) without issue.
Not to mention that we also do this in our integration tests, which are passing! 😄
I strongly suspect the issue here is a beam / python version incompatibility.
@rabernat what beam and python versions are you running when you hit the segfault?
I also wanted to try using beam for parallelism locally (on HPC). The context is parallelising the opening and resaving of a bunch of netcdf files.
I got it to run using this conda env
name: beamrunner
channels:
- conda-forge
dependencies:
- python=3.9.13
- pangeo-forge-recipes=0.10.0
- apache-beam=2.42.0
- pandas<2.0
- netcdf4
- xarray
- kerchunk
- zarr
- ipython
- jupyterlab
- ipykernel
- nb_conda_kernels
- pip
- pip:
- git+https://github.com/pangeo-forge/pangeo-forge-runner.git@main
and ran this test of parallelism:
import apache_beam as beam
import time
def square(x):
return x * x
# Increase the size of the dataset
dataset_size = 1000000 # Adjust this number as needed
# Experiment with different worker counts
worker_counts = [1, 2, 4, 8, 16]
for num_workers in worker_counts:
print(f"Running with {num_workers} workers and dataset size {dataset_size}:")
# Generate a larger list of numbers
input_data = list(range(1, dataset_size + 1))
# Record the start time
start_time = time.time()
with beam.Pipeline(
runner="DirectRunner",
options=beam.pipeline.PipelineOptions(["--num_workers", str(num_workers), "--direct_running_mode", "multi_processing"])) as p:
result = (
p
| "CreateInput" >> beam.Create(input_data) # Generate a list of numbers
| "SquareNumbers" >> beam.Map(square) # Square each number in parallel
)
# Calculate and print the elapsed time
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Elapsed time with {num_workers} workers: {elapsed_time} seconds")
Unfortunately I see no speedup whatsoever, the execution time is constant. Either
(a) my example is not a useful test of parallelism for some reason,
(b) direct_running_mode
doesn't actually do anything.
(b) direct_running_mode doesn't actually do anything.
In experimenting with the example Tom provides myself, my conclusion is that direct_running_mode
definitely does appear to change the running behavior of local execution (single threaded, multi processing, etc.), but I agree that it does not appear based on that example that multi-processing takes any less wall time than single threaded execution. Which is confusing, but perhaps by design if the only point of that execution mode is to test serialization, as the beam docs suggest.
I agree that it does not appear based on that example that multi-processing takes any less wall time than single threaded execution
@TomNicholas and I suspected that if the LocalDirectRunner's multiprocessing mode did not provide a perf benefit, that Beam's DaskRunner would be our next-best-bet for single-machine scaling. Over in https://github.com/pangeo-forge/pangeo-forge-runner/pull/109, I have a code snippet showing this to work for a variation of the simple performance test provided by Tom above, and am working on getting it to run for Pangeo Forge recipes as well.
Thanks very much to Tom for remaining skeptical of the actual performance benefit of the multiprocessing mode. I (seemingly mistakenly) took it for granted that this would have a performance benefit, when it does not appear that it does. It now looks like to me that the DaskRunner will be the best option for single-machine scaling.
I'm late to the party but I wonder if the test here is too IO-bound due to serialization/deserialization across processes. One improvement, from this perspective, might be to generate data on the other side of the serialization boundary and do some slightly more expensive work. https://gist.github.com/moradology/89970afb1ce03580a3334c717b21ec09#file-estimate_pi_cpu_bound-py
I have heard through the grapevine that it's possible to use multiprocessing on a big machine with beam / pangeo forge.
How do you do it?
I'm trying this
from a notebook. But my kernel keeps dying. Same with
direct_running_mode="multi_threading"
.