google / xarray-beam

Distributed Xarray with Apache Beam
https://xarray-beam.readthedocs.io
Apache License 2.0
126 stars 7 forks source link

Adding support for `MergeDim`s and Split Variables to `FilePatternToChunks` transform. #39

Closed alxmrs closed 6 months ago

alxmrs commented 2 years ago

Fixes #29 and #38.

alxmrs commented 2 years ago

I'm testing this PR end-to-end with a script that uses this file pattern for a Grib 2 dataset (uses the cfgrib backend in XArray), deployed on GCP's Dataflow. Right now I'm getting what appears to be a deadline. My pipeline ends with this error:

Root cause: The worker lost contact with the service.

Traces in logs show that threads are acquiring a lock, though it's unclear if it's just a big dataset and thus taking some time.

log 1 ``` "Operation ongoing for over 1665.73 seconds in state process-msecs in step FilePatternToChunks/FlatMapTuple(_open_chunks)-ptransform-764 without returning. Current Traceback: File "/usr/local/lib/python3.8/threading.py", line 890, in _bootstrap self._bootstrap_inner() File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner self.run() File "/usr/local/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py", line 53, in run self._work_item.run() File "/usr/local/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py", line 37, in run self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs)) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 356, in task self._execute( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 284, in _execute response = task() File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 601, in do_instruction return getattr(self, request_type)( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 639, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 996, in process_bundle input_op_by_transform_id[element.transform_id].process_encoded( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 222, in process_encoded self.output(decoded_value) File "/Users/alxrsngrtn/Github/xarray-beam/xarray_beam/_src/pangeo_forge.py", line 152, in _open_chunks with self._open_dataset(path) as dataset: File "/usr/local/lib/python3.8/contextlib.py", line 113, in __enter__ return next(self.gen) File "/usr/local/lib/python3.8/site-packages/xarray_beam/_src/pangeo_forge.py", line 137, in _open_dataset local_file = fsspec.open_local( File "/usr/local/lib/python3.8/site-packages/fsspec/core.py", line 487, in open_local with of as files: File "/usr/local/lib/python3.8/site-packages/fsspec/core.py", line 184, in __enter__ self.files = fs.open_many(self) File "/usr/local/lib/python3.8/site-packages/fsspec/implementations/cached.py", line 394, in return lambda *args, **kw: getattr(type(self), item).__get__(self)( File "/usr/local/lib/python3.8/site-packages/fsspec/implementations/cached.py", line 503, in open_many self.fs.get(downpath, downfn) File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 91, in wrapper return sync(self.loop, func, *args, **kwargs) File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 59, in sync if event.wait(1): File "/usr/local/lib/python3.8/threading.py", line 558, in wait signaled = self._cond.wait(timeout) File "/usr/local/lib/python3.8/threading.py", line 306, in wait gotit = waiter.acquire(True, timeout) ```
log 2 ``` Operation ongoing for over 743.75 seconds in state process-msecs in step FilePatternToChunks/FlatMapTuple(_open_chunks)-ptransform-224 without returning. Current Traceback: File "/usr/local/lib/python3.8/threading.py", line 890, in _bootstrap self._bootstrap_inner() File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner self.run() File "/usr/local/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py", line 53, in run self._work_item.run() File "/usr/local/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py", line 37, in run self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs)) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 356, in task self._execute( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 284, in _execute response = task() File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 601, in do_instruction return getattr(self, request_type)( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 639, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 996, in process_bundle input_op_by_transform_id[element.transform_id].process_encoded( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 222, in process_encoded self.output(decoded_value) File "/Users/alxrsngrtn/Github/xarray-beam/xarray_beam/_src/pangeo_forge.py", line 185, in _open_chunks new_key, chunk.compute(num_workers=num_threads) File "/usr/local/lib/python3.8/site-packages/xarray/core/dataset.py", line 1031, in compute return new.load(**kwargs) File "/usr/local/lib/python3.8/site-packages/xarray/core/dataset.py", line 865, in load evaluated_data = da.compute(*lazy_data.values(), **kwargs) File "/usr/local/lib/python3.8/site-packages/dask/base.py", line 570, in compute results = schedule(dsk, keys, **kwargs) File "/usr/local/lib/python3.8/site-packages/dask/threaded.py", line 79, in get results = get_async( File "/usr/local/lib/python3.8/site-packages/dask/local.py", line 506, in get_async for key, res_info, failed in queue_get(queue).result(): File "/usr/local/lib/python3.8/site-packages/dask/local.py", line 134, in queue_get return q.get() File "/usr/local/lib/python3.8/queue.py", line 170, in get self.not_empty.wait() File "/usr/local/lib/python3.8/threading.py", line 302, in wait waiter.acquire() ```

This to me seems like an instance of https://github.com/pydata/xarray/issues/4591. Right now, I'm going to experiment with changing the scheduler to use a single thread in the compute method of _open_chunks().

alxmrs commented 2 years ago

I should mention: The Dataflow diagnostics for the above report is showing unresponsive threads, making a dead-lock scenario more sound.

grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "keepalive watchdog timeout" debug_error_string = "{"created":"@1630579134.284653312","description":"Error received from peer ipv6:[::1]:12371","file":"src/core/lib/surface/call.cc","file_line":1062,"grpc_message":"keepalive watchdog timeout","grpc_status":14}" >
at _next (/usr/local/lib/python3.8/site-packages/grpc/_channel.py:803)
at __next__ (/usr/local/lib/python3.8/site-packages/grpc/_channel.py:416)
at run (/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:251)
at main (/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker_main.py:182)
shoyer commented 2 years ago

Dead-locking seems plausible, but this is different from pydata/xarray#4591 which describes a serialization failure.

shoyer commented 6 months ago

This has gone stale.