Open jbusecke opened 7 months ago
Ok I got an example running here (successful submission log [here]()). This is running on LEAP dataflow, but might be not to hard to run locally? I was a bit hesitant for now since it will cache about 100GB of data. I guess Ill wait for a few hours and see if it gets stuck!
I can probably from there test/probe a bit more to reduce the complexity further.
Ok it has been running for about an hour, and has scaled up to ~6, then back to 3 workers, but they all seem to idle around after a while
FWIW the console indicates that this is stuck on the rechunk stage
But from experience I know that these graphical representations can often be misleading.
I think this is a good candidate! Will report back after a few hours.
Some strategies to further simplify this:
In the other hanging recipe, it seemed like about three months of daily data would result in hanging.
Ill see tomorrow if there is an upper bound to it not hanging in this case, but I am not convinced this is representative of a 'stall'. Jobs been running for 4 hours and basically doing nothing AFAICT:
If (and thats a big if, see above) we can believe the graph diagram, it is actually stuck on the group by key stage 🤔
I am moving the example to a separate repo to keep things more neat.
Also I just cancelled the job from yesterday after it running for 19 hours without really doing anything... For later reference job link(not publically accessible atm).
Currently trying out some variations of my failcase here. Will report back on what I find.
Ok this is interesting. It seems that even only using 3 files (of 33) is enough to trigger this! If that holds true this would make this example a lot more minimal.
Got 4 examples in the oven
I ll have to wait ~1 hour to be sure things are stuck.
Ok it seems that the complex rechunking structure has something to do with the problem (as I previously suspected), when running the recipe with chunks along the time axis only both recipes succeeded!
This also gives us an approximate time that things "should take"? So from now on I will assume that any recipe with runtime of many hours is actually stuck.
I am also calling that the short example is in fact stuck!
That is nice and reduces the size of the problem considerably!
Next I will try to run the test_short_dynamic_chunking
locally. Onwards.
I went into a different direction and I think I am now able to reproduce the problem without my dynamical rechunking. Simply setting the chunking 'orthogonal' (all chunks along lon, while concatting along time) does reproduce the behavior with only 2! files.
Notes on cases to test:
@norlandrhagen and I are in the middle of a pair coding session. I will jot down some notes (and edit later if needed).
We are currently trying to run two recipes locally
'time_only'
only time chunks (this succeeded on dataflow)'lon_only'
only lon chunks (this requires a full rechunk and is hanging on dataflow)Ok I just ran both examples with the direct runner via pgf-runner (on the LEAP Jupyterhub with 16 cores) like this (Thx to @ranchodeluxe for the instructions):
Install dependencies
mamba create -n runner0102 python=3.11 -y
conda activate runner0102
pip install pangeo-forge-runner==0.10.2 --no-cache-dir
Create a config.py:
c.Bake.prune = 0
c.Bake.bakery_class = 'pangeo_forge_runner.bakery.local.LocalDirectBakery'
c.LocalDirectBakery.num_workers = 16
BUCKET_PREFIX = "/home/jovyan/AREAS/PGF/hanging_bug/output"
c.TargetStorage.fsspec_class = "fsspec.implementations.local.LocalFileSystem"
c.TargetStorage.root_path = f"{BUCKET_PREFIX}/{{job_name}}/output"
c.InputCacheStorage.root_path = f"{BUCKET_PREFIX}/cache/"
c.InputCacheStorage.fsspec_class = "fsspec.implementations.local.LocalFileSystem"
I then ran 'time_only'
with:
pangeo-forge-runner bake \
--repo=https://github.com/jbusecke/pgf_cmip_hanging_mre \
--ref=main \
--feedstock-subdir='feedstock' \
--Bake.job_name=test_short_time_only_chunks\
--Bake.recipe_id=test_short_time_only_chunks\
-f config.py
and 'lon_only'
with:
pangeo-forge-runner bake \
--repo=https://github.com/jbusecke/pgf_cmip_hanging_mre \
--ref=main \
--feedstock-subdir='feedstock' \
--Bake.job_name=test_short_lon_only_chunks \
--Bake.recipe_id=test_short_lon_only_chunks\
-f config.py
These both worked! So I am not sure if my problem is in fact the same problem that @norlandrhagen and @ranchodeluxe encountered in #710.
I wonder if somebody can run this on flink to add another datapoint to our fail/runner matrix?
Ok so with input from @cisaacstern I tried the same thing except I used a gcs bucket for cache and output
c.Bake.prune = 0
c.Bake.bakery_class = 'pangeo_forge_runner.bakery.local.LocalDirectBakery'
c.LocalDirectBakery.num_workers = 16
BUCKET_PREFIX = "gs://leap-scratch/jbusecke/pgf-hanging-bug-testing"
c.TargetStorage.fsspec_class = "gcsfs.GCSFileSystem"
c.TargetStorage.root_path = f"{BUCKET_PREFIX}/{{job_name}}/output"
c.InputCacheStorage.fsspec_class = "gcsfs.GCSFileSystem"
c.InputCacheStorage.root_path = f"{BUCKET_PREFIX}/cache/"
which also did not fail
Ok @norlandrhagen and I just wrapped up. We devised a few new recipe_ids to test some other things:
test_short_lon_only_chunks_load
This is the same as test_short_lon_only_chunks
but sets load=True
in OpenWithXarray
:
test_short_time_only_chunks_load
. PR Job The above really surprised me and so I wanted to double check if load=True
affects the case that previously worked in the same way. THIS ALSO FAILS. I AM COMPLETELY CONFUSED BY that
test_full_nowrite
PR Job Just to confirm that only opening the files lazily does not cause an error. This indeed succeeded.
Ill pick this up next week, but any feedback is very welcome.
From the worker logs of the OG test_short_dynamic_chunks
:
Operation ongoing in bundle process_bundle-2974329341843856250-42 for PTransform{name=Creating CMIP6.CMIP.CMCC.CMCC-ESM2.historical.r1i1p1f1.3hr.pr.gn.v20210114|OpenURLWithFSSpec|OpenWithXarray|Preprocessor|StoreToZarr|ConsolidateDimensionCoordinates|ConsolidateMetadata/StoreToZarr/Rechunk/MapTuple(combine_fragments)-ptransform-46, state=process-msecs} for at least 5827.01 seconds without outputting or completing.
Current Traceback:
File "/usr/local/lib/python3.10/threading.py", line 973, in _bootstrap
self._bootstrap_inner()
File "/usr/local/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
self.run()
File "/usr/local/lib/python3.10/site-packages/apache_beam/utils/thread_pool_executor.py", line 53, in run
self._work_item.run()
File "/usr/local/lib/python3.10/site-packages/apache_beam/utils/thread_pool_executor.py", line 37, in run
self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 385, in task
self._execute(
File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 311, in _execute
response = task()
File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 386, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 650, in do_instruction
return getattr(self, request_type)(
File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 688, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1113, in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 237, in process_encoded
self.output(decoded_value)
File "/tmp/cf09f9f279ec355c109713fd5ed97e51c62f0daeb39deea8ba13555949327808qx0kye0e/lib/python3.10/site-packages/apache_beam/transforms/core.py", line 2040, in <lambda>
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/pangeo_forge_recipes/rechunking.py", line 240, in combine_fragments
ds_combined = xr.combine_nested(dsets_to_concat, concat_dim=concat_dims_sorted)
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/combine.py", line 577, in combine_nested
return _nested_combine(
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/combine.py", line 356, in _nested_combine
combined = _combine_nd(
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/combine.py", line 232, in _combine_nd
combined_ids = _combine_all_along_first_dim(
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/combine.py", line 267, in _combine_all_along_first_dim
new_combined_ids[new_id] = _combine_1d(
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/combine.py", line 290, in _combine_1d
combined = concat(
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/concat.py", line 250, in concat
return _dataset_concat(
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/concat.py", line 631, in _dataset_concat
combined_var = concat_vars(
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/variable.py", line 2925, in concat
return Variable.concat(variables, dim, positions, shortcut, combine_attrs)
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/variable.py", line 1693, in concat
data = duck_array_ops.concatenate(arrays, axis=axis)
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/duck_array_ops.py", line 356, in concatenate
return _concatenate(as_shared_dtype(arrays), axis=axis)
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/duck_array_ops.py", line 232, in as_shared_dtype
arrays = [asarray(x, xp=xp) for x in scalars_or_arrays]
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/duck_array_ops.py", line 232, in <listcomp>
arrays = [asarray(x, xp=xp) for x in scalars_or_arrays]
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/duck_array_ops.py", line 219, in asarray
return data if is_duck_array(data) else xp.asarray(data)
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/indexing.py", line 693, in __array__
return np.asarray(self.get_duck_array(), dtype=dtype)
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/indexing.py", line 696, in get_duck_array
self._ensure_cached()
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/indexing.py", line 690, in _ensure_cached
self.array = as_indexable(self.array.get_duck_array())
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/indexing.py", line 664, in get_duck_array
return self.array.get_duck_array()
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/indexing.py", line 551, in get_duck_array
array = self.array[self.key]
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/backends/h5netcdf_.py", line 51, in __getitem__
return indexing.explicit_indexing_adapter(
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/indexing.py", line 858, in explicit_indexing_adapter
result = raw_indexing_method(raw_key.tuple)
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/backends/h5netcdf_.py", line 58, in _getitem
return array[key]
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/h5netcdf/core.py", line 347, in __getitem__
return self._h5ds[key]
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/h5py/_hl/dataset.py", line 758, in __getitem__
return self._fast_reader.read(args)
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/fsspec/spec.py", line 1856, in readinto
data = self.read(out.nbytes)
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/fsspec/spec.py", line 1846, in read
out = self.cache._fetch(self.loc, self.loc + length)
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/fsspec/caching.py", line 189, in _fetch
self.cache = self.fetcher(start, end) # new block replaces old
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/gcsfs/core.py", line 1921, in _fetch_range
return self.gcsfs.cat_file(self.path, start=start, end=end)
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/fsspec/asyn.py", line 118, in wrapper
return sync(self.loop, func, *args, **kwargs)
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/fsspec/asyn.py", line 91, in sync
if event.wait(1):
File "/usr/local/lib/python3.10/threading.py", line 607, in wait
signaled = self._cond.wait(timeout)
File "/usr/local/lib/python3.10/threading.py", line 324, in wait
gotit = waiter.acquire(True, timeout)
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/fsspec/asyn.py", line 91, in sync if event.wait(1):
File "/usr/local/lib/python3.10/threading.py", line 607, in wait signaled = self._cond.wait(timeout)
File "/usr/local/lib/python3.10/threading.py", line 324, in wait gotit = waiter.acquire(True, timeout)
Given this beautiful traceback I say the next steps to reproduce are:
combine_fragments
to mimic that data read patternfsspec
caching @moradology found this: https://github.com/fsspec/gcsfs/issues/379
I just confirmed that the problem is not resolved when only running on one worker. @moradology and I just stumbled upon some output that might a bug in the groupby logic of rechunk. We will continue on this later and report back.
Ok I think @moradology and I have honed in on the problem earlier today, and I was able to produce what I think is unintended behavior from the rechunking stage locally. How and why that would only stall on dataflow still eludes me, but I think regardless this is actually a pretty significant bug that needs to be dealt with before I can move on.
Ok let me back up. So my main hypothesis is that the combining of fragments(subfragments?) is not actually properly working!
I have evidence from the above example but I think it is more instructive to look at this new tiny example that shares the main attribute with the CMIP test examples: Target chunks that are orthogonal to the concat dim, requiring a complete rechunk.
Take these two example datasets:
import xarray as xr
a = xr.DataArray(
[[0,1],[2,3]],
dims=['x', 'time'],
coords={'x':[0,1], 'time':[0,1]}
).to_dataset(name='var').chunk({'time':1})
b = xr.DataArray(
[[4,5],[6,7]],
dims=['x', 'time'],
coords={'x':[0,1], 'time':[2,3]}
).to_dataset(name='var').chunk({'time':1})
I am saving both of those files locally as netcdfs
a.to_netcdf('/home/jovyan/AREAS/PGF/hanging_bug/simple_example/file_a.nc')
b.to_netcdf('/home/jovyan/AREAS/PGF/hanging_bug/simple_example/file_b.nc')
and 'simulate' the desired output zarr dataset:
ds_out = xr.concat(
[
xr.open_dataset(path) for path in [
'/home/jovyan/AREAS/PGF/hanging_bug/simple_example/file_a.nc',
'/home/jovyan/AREAS/PGF/hanging_bug/simple_example/file_b.nc'
]
],
dim='time'
)
ds_out.chunk({'x':1, 'time':-1})
Note how this bad boy has ONLY TWO output chunks!
I then set up some boilerplate feedstock (I can make another repo if folks want to run this locally):
with the following files:
feedstock/meta.yaml
:
title: "Simple Test"
description: ""
recipes:
- id: test
object: "recipe:recipe"
provenance:
providers:
- name: "Zenodo"
description: "Zenodo"
roles:
- host
url: https://zenodo.org/records/10261274
- name: "TBD"
description: "TODO"
roles:
- producer
- licensor
license: "unknown"
maintainers:
- name: "Julius Busecke"
orcid: 0000-0001-8571-865X
github: jbusecke
feedstock/recipe.py
:
import apache_beam as beam
from pangeo_forge_recipes.patterns import pattern_from_file_sequence
from pangeo_forge_recipes.transforms import (
ConsolidateDimensionCoordinates,
ConsolidateMetadata,
OpenURLWithFSSpec,
OpenWithXarray,
StoreToZarr
)
urls = [
"/home/jovyan/AREAS/PGF/hanging_bug/simple_example/file_a.nc",
"/home/jovyan/AREAS/PGF/hanging_bug/simple_example/file_b.nc",
]
pattern = pattern_from_file_sequence(urls, concat_dim='time')
recipe = (
beam.Create(pattern.items())
| OpenURLWithFSSpec()
# do not specify file type to accomodate both ncdf3 and ncdf4
| OpenWithXarray(xarray_open_kwargs={'use_cftime': True})
| StoreToZarr(
store_name=f'simple_test.zarr',
combine_dims=pattern.combine_dim_keys,
target_chunks={'x': 1},
)
| ConsolidateDimensionCoordinates()
| ConsolidateMetadata()
)
and requirements.txt
:
pangeo-forge-esgf==0.1.1
pangeo-forge-recipes==0.10.6
dynamic-chunks==0.0.2
gcsfs
apache-beam[gcp]
I also have a config_local.py
in the root directory:
c.Bake.prune = 0
c.Bake.bakery_class = 'pangeo_forge_runner.bakery.local.LocalDirectBakery'
c.LocalDirectBakery.num_workers = 16
BUCKET_PREFIX = "/home/jovyan/AREAS/PGF/hanging_bug/simple_example/output"
c.TargetStorage.fsspec_class = "fsspec.implementations.local.LocalFileSystem"
c.TargetStorage.root_path = f"{BUCKET_PREFIX}/{{job_name}}/output"
c.InputCacheStorage.root_path = f"{BUCKET_PREFIX}/cache/"
c.InputCacheStorage.fsspec_class = "fsspec.implementations.local.LocalFileSystem"
Not fully reproducible yet, but should be enough to get an idea.
I then run the following in the terminal
mamba create -n runner0102 python=3.11 -y
conda activate runner0102
pip install pangeo-forge-runner==0.10.2 --no-cache-dir
cd /home/jovyan/AREAS/PGF/hanging_bug/simple_example # need to fix all these local paths...
pangeo-forge-runner bake \
--repo=. \
--ref=main \
--feedstock-subdir='feedstock' \
--Bake.job_name=simple_test\
--Bake.recipe_id=test\
-f config_local.py
The recipe runs with the following output (This is ugly AF and only for completeness. Ill fish out the important parts below):
So this 'finishes' but the output looks wrong!
There are 4! output chunks.
xr.open_dataset(
'/home/jovyan/AREAS/PGF/hanging_bug/simple_example/output/simple_test/output/simple_test.zarr',
engine='zarr',
chunks={}
)
Digging a bit further into the log output from above I see something that Nathan and I were wondering about earlier:
"Combining group = ((\'time\', 1), (\'x\', 1)), containing fragments = [({Dimension(name=\'time\', operation=<CombineOp.CONCAT: 2>): IndexedPosition(value=2, indexed=True, dimsize=4), Dimension(name=\'x\', operation=<CombineOp.CONCAT: 2>): IndexedPosition(value=1, indexed=True, dimsize=2)}, <xarray.Dataset> Size: 40B\nDimensions: (x: 1, time: 2)\nCoordinates:\n * x (x) int64 8B 1\n * time (time) int64 16B 2 3\nData variables:\n var (x, time) int64 16B ...)]" instruction_id: "bundle_82" transform_id: "Create|OpenURLWithFSSpec|OpenWithXarray|StoreToZarr|ConsolidateDimensionCoordinates|ConsolidateMetadata/StoreToZarr/Rechunk/MapTuple(combine_fragments)"
If I read this correctly this means that this is not actually combining any of the (sub)fragments, since it only contains a single fragment to begin with. This should not happen in this case.
I drew up a sketch of what I expecte to happen in this case
I think that the grouping (of exactly two fragments, which are different in time
) for each chunk in x
)
What seems to happen instead is that it writes the smallest unit (is this a fragment or subfragment - I am very confused by the terminology) as chunks to the target zarr store.
This will results in a TON of writes of tiny chunks in certain scenarios which I guess could at least explain why jobs are slow and maybe how the risk of some random hang gets exponentially increased?
phew this was a long one, but in summary I think that something is broken in the groupby/combine_fragments logic which does not actually produce the correct groups/lists/tuples of fragments that should be combined in an output chunk.
nice write up and graphics @jbusecke 🥳 , well explained
Nice digging Julius!
I think that something is broken in the groupby/combine_fragments logic which does not actually produce the correct groups/lists/tuples of fragments that should be combined in an output chunk.
The logic these functions is unit-tested relatively thoroughly in test_rechunking.py. If you think there is a bug here, the next step would be to eliminate the complexity of the recipe execution and just isolate the logical error via a unit test.
In general, we try to follow the beam-recommended practices for our PTtransforms
Expose large, non-trivial, reusable sequential bits of the transform’s code, which others might want to reuse in ways you haven’t anticipated, as a regular function or class library. The transform should simply wire this logic together.
That means we shouldn't have to run beam code at all to debug and diagnose the sort of issue you're describing. You should be able to take your toy example and turn it into a simple test.
Happy to help with this.
Target chunks that are orthogonal to the concat dim, requiring a complete rechunk
Also, FWIW, the rechunking algorithm in Pangeo Forge is really not optimized for this extreme case. It means basically splitting up every pixel of a dataset into a individual Xarray Dataset. However, it should definitely work, even if the performance is very poor.
What seems to happen instead is that it writes the smallest unit (is this a fragment or subfragment - I am very confused by the terminology) as chunks to the target zarr store.
This is the expected behavior for target_chunks={'x': 1}
.
Have you tried explicitly specifying the time chunks, i.e. target_chunks={'x': 1, 'time': 4}
?
What seems to happen instead is that it writes the smallest unit (is this a fragment or subfragment - I am very confused by the terminology) as chunks to the target zarr store.
This is the expected behavior for
target_chunks={'x': 1}
.Have you tried explicitly specifying the time chunks, i.e.
target_chunks={'x': 1, 'time': 4}
?
Oh yikes! Then I might have jumped down the wrong rabbithole😱. Ill double check this asap. In any case let me roll back some of the strong language above, since I think I might habe been overly immersed in my dynamic chunking function (CMIP) world where the behavior is to return len(dim) for any dimension that is not specified. Faster subway, I need to find out hahaha.
Ohhh damn, @rabernat was right. I was making a wrong assumption here. Testing with target_chunks={'x': 1, 'time': 4}
gives 2 resulting chunks as expected!
Well that is very good (in the sense that we did not have a big bug in recipes!) but also sets us back in finding the reason for this bug.
The only thing I think I can say at this point is that the issue seems to be somehow triggered by producing a lot of fragments or grouping them (since these jobs seems to generally get stuck on a read not a write I am less suspicious of the steps after generating / combining fragments).
Welp, onwards...tomorrow.
FWIW I think it would be helpful to keep working on a schematic like the one above and label the following terms in there:
I might have missed/double counted some terms, but having those enumerated might help with future debugging/refactoring.
Target chunks that are orthogonal to the concat dim, requiring a complete rechunk
Also, FWIW, the rechunking algorithm in Pangeo Forge is really not optimized for this extreme case. It means basically splitting up every pixel of a dataset into a individual Xarray Dataset. However, it should definitely work, even if the performance is very poor.
Point well taken. I think that none of my dynamic rechunkings will ever be this extreme, but a full 'pixel-level' rechunk should still work as you say, and stress testing this might be helpful here to reveal the (still mysterious) underlying reason for my stalls.
I think the ultimate point of confusion is the logger entry coming from this line.
Am I wrong to assume that this should receive all the fragments that should 'land' in a given target chunks, and thus in our case here It should display more than one fragment to actually combine? There is basically a special case where all the rechunking does is 'splitting' into smaller fragment and then each one is written into a target chunk, but that should not be the case here (anymore 😆).
For my little toy example (updated version with explicit chunks) I get:
"Combining group = ((\'time\', 1), (\'x\', 1)), containing fragments = [({Dimension(name=\'time\', operation=<CombineOp.CONCAT: 2>): IndexedPosition(value=2, indexed=True, dimsize=4), Dimension(name=\'x\', operation=<CombineOp.CONCAT: 2>): IndexedPosition(value=1, indexed=True, dimsize=2)}, <xarray.Dataset> Size: 40B\nDimensions: (x: 1, time: 2)\nCoordinates:\n * x (x) int64 8B 1\n * time (time) int64 16B 2 3\nData variables:\n var (x, time) int64 16B ...)]"
Which I interpret as only a single fragment?
This would be helpful to get some clarification on before I continue on this.
Which I interpret as only a single fragment?
That is the correct interpretation. There is a list of fragments of length 1. Is that from the previous example (target_chunks={'x': 1}
) or the updated version (target_chunks={'x': 1, 'time': 4}
)?
It is from the updated version with explicit chunks for all existing dimensions!
We saw similar one fragment lists in the more complex recipes (even though I still need to double check there to exclude my error from above). Seeing only a single fragment in there is what led me down the path of making the toy example above in the first place.
Ill look further into the keys and grouping when I have the time.
It is from the updated version with explicit chunks for all existing dimensions!
But then how is the dataset being written correctly? Could you show the full logging output?
I'd still encourage you to remove the PTransforms and Pipelines from the mix to focus on the core rechunking logic, which is quite simple:
@rabernat can you confirm that the basic assumption here is correct:
In the case sketched out above + target_chunks={'x':1, 'time':4} we would expect a list of 2 fragments to be combined to be shown in that log statement?
can you confirm that the basic assumption here is correct:
Not without a simpler reproducer that just involves the functions split_fragment
and combine_fragments
. 😉 What you're saying sounds generally correct, but there is so much complexity around the full end-to-end recipe that I hesitate to make any claims with certainty.
I am sorry there must be something clearly wrong with the way I am testing this locally (on the LEAP hub). I just reran the example and we get the expected (2 fragement) logging call:
"Combining group = ((\'x\', 0),), containing fragments = [({Dimension(name=\'time\', operation=<CombineOp.CONCAT: 2>): IndexedPosition(value=2, indexed=True, dimsize=4), Dimension(name=\'x\', operation=<CombineOp.CONCAT: 2>): IndexedPosition(value=0, indexed=True, dimsize=2)}, <xarray.Dataset> Size: 40B\nDimensions: (x: 1, time: 2)\nCoordinates:\n * x (x) int64 8B 0\n * time (time) int64 16B 2 3\nData variables:\n var (x, time) int64 16B ...), ({Dimension(name=\'time\', operation=<CombineOp.CONCAT: 2>): IndexedPosition(value=0, indexed=True, dimsize=4), Dimension(name=\'x\', operation=<CombineOp.CONCAT: 2>): IndexedPosition(value=0, indexed=True, dimsize=2)}, <xarray.Dataset> Size: 40B\nDimensions: (x: 1, time: 2)\nCoordinates:\n * x (x) int64 8B 0\n * time (time) int64 16B 0 1\nData variables:\n var (x, time) int64 16B ...)]"
Somehow things are persisting across running the recipe multiple times from a local repo, which might be related to git or how things are saved/not saved on the hub 😡 . Strong point for what you said above about testing without beam!
I am very sorry for all the noise and loss of focus. I think I have convinced myself that the rechunking logic works as intended here and that the issue lies elsewhere.
I just wanted to reset and confirm that my (still complex) reproducer recipe test_short_lon_only_chunks
now with explicit chunks for all dimensions
target_chunks = {'lon': 10, 'time': 29200, 'bnds': 2, 'lat': 192} # this only splits the array on lon, all other chunks are == len(dim)
still hangs!
The pertaining log output is here
Operation ongoing in bundle process_bundle-764632957537901303-1759 for PTransform{name=Creating CMIP6.CMIP.CMCC.CMCC-ESM2.historical.r1i1p1f1.3hr.pr.gn.v20210114|OpenURLWithFSSpec|OpenWithXarray|Preprocessor|StoreToZarr|ConsolidateDimensionCoordinates|ConsolidateMetadata/StoreToZarr/Rechunk/MapTuple(combine_fragments)-ptransform-46, state=process-msecs} for at least 1175.68 seconds without outputting or completing.
Current Traceback:
File "/usr/local/lib/python3.10/threading.py", line 973, in _bootstrap
self._bootstrap_inner()
File "/usr/local/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
self.run()
File "/usr/local/lib/python3.10/site-packages/apache_beam/utils/thread_pool_executor.py", line 53, in run
self._work_item.run()
File "/usr/local/lib/python3.10/site-packages/apache_beam/utils/thread_pool_executor.py", line 37, in run
self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 385, in task
self._execute(
File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 311, in _execute
response = task()
File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 386, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 650, in do_instruction
return getattr(self, request_type)(
File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 688, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1113, in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 237, in process_encoded
self.output(decoded_value)
File "/tmp/cf09f9f279ec355c109713fd5ed97e51c62f0daeb39deea8ba13555949327808yktlp6da/lib/python3.10/site-packages/apache_beam/transforms/core.py", line 2040, in <lambda>
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/pangeo_forge_recipes/rechunking.py", line 240, in combine_fragments
ds_combined = xr.combine_nested(dsets_to_concat, concat_dim=concat_dims_sorted)
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/combine.py", line 577, in combine_nested
return _nested_combine(
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/combine.py", line 356, in _nested_combine
combined = _combine_nd(
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/combine.py", line 232, in _combine_nd
combined_ids = _combine_all_along_first_dim(
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/combine.py", line 267, in _combine_all_along_first_dim
new_combined_ids[new_id] = _combine_1d(
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/combine.py", line 290, in _combine_1d
combined = concat(
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/concat.py", line 250, in concat
return _dataset_concat(
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/concat.py", line 631, in _dataset_concat
combined_var = concat_vars(
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/variable.py", line 2925, in concat
return Variable.concat(variables, dim, positions, shortcut, combine_attrs)
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/variable.py", line 1693, in concat
data = duck_array_ops.concatenate(arrays, axis=axis)
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/duck_array_ops.py", line 356, in concatenate
return _concatenate(as_shared_dtype(arrays), axis=axis)
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/duck_array_ops.py", line 232, in as_shared_dtype
arrays = [asarray(x, xp=xp) for x in scalars_or_arrays]
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/duck_array_ops.py", line 232, in <listcomp>
arrays = [asarray(x, xp=xp) for x in scalars_or_arrays]
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/duck_array_ops.py", line 219, in asarray
return data if is_duck_array(data) else xp.asarray(data)
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/indexing.py", line 693, in __array__
return np.asarray(self.get_duck_array(), dtype=dtype)
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/indexing.py", line 696, in get_duck_array
self._ensure_cached()
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/indexing.py", line 690, in _ensure_cached
self.array = as_indexable(self.array.get_duck_array())
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/indexing.py", line 664, in get_duck_array
return self.array.get_duck_array()
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/indexing.py", line 557, in get_duck_array
array = array.get_duck_array()
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/coding/variables.py", line 75, in get_duck_array
return self.func(self.array.get_duck_array())
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/indexing.py", line 551, in get_duck_array
array = self.array[self.key]
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/backends/h5netcdf_.py", line 51, in __getitem__
return indexing.explicit_indexing_adapter(
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/indexing.py", line 858, in explicit_indexing_adapter
result = raw_indexing_method(raw_key.tuple)
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/backends/h5netcdf_.py", line 58, in _getitem
return array[key]
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/h5netcdf/core.py", line 347, in __getitem__
return self._h5ds[key]
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/h5py/_hl/dataset.py", line 758, in __getitem__
return self._fast_reader.read(args)
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/fsspec/spec.py", line 1856, in readinto
data = self.read(out.nbytes)
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/fsspec/spec.py", line 1846, in read
out = self.cache._fetch(self.loc, self.loc + length)
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/fsspec/caching.py", line 189, in _fetch
self.cache = self.fetcher(start, end) # new block replaces old
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/gcsfs/core.py", line 1921, in _fetch_range
return self.gcsfs.cat_file(self.path, start=start, end=end)
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/fsspec/asyn.py", line 118, in wrapper
return sync(self.loop, func, *args, **kwargs)
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/fsspec/asyn.py", line 91, in sync
if event.wait(1):
File "/usr/local/lib/python3.10/threading.py", line 607, in wait
signaled = self._cond.wait(timeout)
File "/usr/local/lib/python3.10/threading.py", line 324, in wait
gotit = waiter.acquire(True, timeout)
Which I interpret as being stuck on reading something...would love to hear how others think about this. Yesterday @moradology and I were wondering what parts of the cached files this line actually reads:
ds_combined = xr.combine_nested(dsets_to_concat, concat_dim=concat_dims_sorted)
Does this eagerly load the data to be combined, or just read the metadata at this point?
Is the issue that we are trying to read the same metadata from 'too many' processes? Or do we simply need to allow fsspec to fail and retry? Disable caching?
One more possibly important piece of information here is that this does not seem to be just a single read being hung up. At least that is how I interpret all of these log messages that have a similar content as above but different 'bundles':
The frustrating thing about being back at this end of the equation is that this example only fails on dataflow (@ranchodeluxe was able to run it on flink, and I ran it locally) making the debugging very very hard.
I would love nothing more than to really dig deeper into this today but have too much other things on my plate unfortunately. Maybe a little breather is also appropriate, this issue is a frustrating one.
Very curious if others have ideas how to proceed.
Just a quick update here:
copy_to_local
and/or load
on OpenWithXarray
also solves the problem, but so far I was not successful. I am reporting on some of the issues in https://github.com/pangeo-forge/pangeo-forge-recipes/issues/722Ok so some preliminary conclusions: At least for my case, setting copy_to_local=True
does not fix the problem, it just moves it?
I was able to get around the issues with the local disk by specifying a larger attached disk size (see runner PR), but now instead of errors I am seeing all the usual stalling behavior again:
and warning messages like this:
Operation ongoing in bundle process_bundle-557078944501211849-694 for PTransform{name=Creating CMIP6.CMIP.CMCC.CMCC-ESM2.historical.r1i1p1f1.3hr.pr.gn.v20210114|OpenURLWithFSSpec|OpenWithXarray|Preprocessor|StoreToZarr/OpenWithXarray/Open with Xarray-ptransform-87, state=process-msecs} for at least 1486.46 seconds without outputting or completing.
Current Traceback:
File "/usr/local/lib/python3.10/threading.py", line 973, in _bootstrap
self._bootstrap_inner()
File "/usr/local/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
self.run()
File "/usr/local/lib/python3.10/site-packages/apache_beam/utils/thread_pool_executor.py", line 53, in run
self._work_item.run()
File "/usr/local/lib/python3.10/site-packages/apache_beam/utils/thread_pool_executor.py", line 37, in run
self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 371, in task
self._execute(
File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 297, in _execute
response = task()
File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 372, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 625, in do_instruction
return getattr(self, request_type)(
File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 663, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1056, in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 237, in process_encoded
self.output(decoded_value)
File "/tmp/ab3ce7b5c6a409796cee8c934cce7cfd4b3d5546a1c4bd716293176fd8cfc1a1ikgl03o0/lib/python3.10/site-packages/apache_beam/transforms/core.py", line 2040, in <lambda>
File "/tmp/ab3ce7b5c6a409796cee8c934cce7cfd4b3d5546a1c4bd716293176fd8cfc1a1ikgl03o0/lib/python3.10/site-packages/pangeo_forge_recipes/transforms.py", line 233, in <lambda>
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/pangeo_forge_recipes/openers.py", line 233, in open_with_xarray
_copy_btw_filesystems(url_or_file_obj, target_opener)
File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/pangeo_forge_recipes/storage.py", line 33, in _copy_btw_filesystems
with output_opener as target:
dataflow job&authuser=1).
I am not sure if this is the same issue of deadlocking in fsspec? But unfortunately for now it seems like copy_to_local
is not a solution to the issue.
EDIT: As per convo with @rabernat over in https://github.com/pangeo-forge/pangeo-forge-recipes/issues/722 i tried setting both copy_to_local=True
and load=True
and got something that I understand even less:
This one sounds curious:
Failed to read inputs in the data plane.
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/data_plane.py", line 652, in _read_inputs
for elements in elements_iterator:
File "/usr/local/lib/python3.10/site-packages/grpc/_channel.py", line 541, in __next__
return self._next()
File "/usr/local/lib/python3.10/site-packages/grpc/_channel.py", line 967, in _next
raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.UNKNOWN
details = "Exception iterating requests!"
debug_error_string = "None"
>
Def out of my wheelhouse, but maybe @ranchodeluxe or @moradology have an idea?
Yeah, this is one of the things that can make it difficult in the extreme to debug distributed systems. The seemingly meaningless trace here is a symptom of the generality and the abstraction that is at the heart of their execution model. Your functions and data have to get passed around inside something capable of executing them in various contexts. A rough translation of what this looks like to me is: 'status was set to cancelled while in the process of iterating through the data we need to operate over'. Could well be there's an underlying trace further up or swallowed but I wouldn't be shocked to see this as output from killing the master process
As discussed in the last community meeting and on the call with @moradology and @ranchodeluxe today, I will try to boil down one of the CMIP6 feedstock examples that I observed to 'hang' (not completing within several hours, even though the caching was already complete!).
The dataset unique identifier is
'CMIP6.CMIP.CMCC.CMCC-ESM2.historical.r1i1p1f1.3hr.pr.gn.v20210114'
and the file urls are:A stripped down recipe to reproduce this could look like this:
(to be revised, just leaving this here as a starting point)
@ranchodeluxe do you mind sending me the steps you use to run this on a jupyterhub?
Will fill this in as I go along.