azavea / noaa-hydro-data

NOAA Phase 2 Hydrological Data Processing
11 stars 3 forks source link

Write forum post asking about how to convert Zarr to Parquet #89

Closed lewfish closed 1 year ago

vlulla commented 2 years ago

In our attempts to convert the NWM retrospective zarr data to parquet we've run into a snag. We have been unable to convert the zarr to parquet using JupyterHub on a dask cluster. We have run into issues with the jupyter kernel crashing, workers getting killed (in dask-worker), and cancelled errors (in dask-scheduler). It appears that these errors are from the different parts of the libraries/systems we are using.

The jupyter kernel crashes are due to the kernel running out of memory! Initially, we started with a small jupyterhub instance (8GiB) which crashed the kernel for even very small selections/subsets (even a month would crash the kernel...we could only select 14 days worth of data for the 122_256 feature_ids that are in the subuset)! But now that Justin has provisioned a larger instance (16GiB) on which to run the jupyterhub we can make selections/subsets over larger time horizon (3 years without any problems). However, there still appears to be the central problem (very aptly articulated by Justin) that the statement ds.sel(time=slice('1990-01-01','1994-12-31')).to_pandas() runs in the jupyter notebook and not on the cluster! This defeats the whole purpose of using a distributed cluster in the first place!

I have also learned, after reading the documentation more carefully, that the ds.to_dask_dataframe() function does a transpose of the data creating a long table. Whereas, we need a wide table (i.e., each feature_id is its own column) as recommended by participants from the ESIP conference (Terence can provide more context/details). Anyhow, we can achieve such a wide dataframe using the ds.to_pandas function which does not alter the dimensions (also from carefully reading the documentation). It appears that both of these functions materialize a pandas dataframe in the jupyter notebook kernel, instead of generating this dataframe on the cluster, which I suspect were the cause of the kernel crashes I experienced. Here are the source code definitions for the respective functions:

Workaround

I have been able to generate the wide parquets, by year, for our zarr subset by running this code on a r5a.4xlarge ec2 instance. Lewis thinks this is unfeasible, and I agree with him, because our subset is only for 10 years and one huc 2 boundary which yielded a subset of 80 gb. The complete dataset is for 40 years and all the huc 2 boundaries which is about 8 TB! Trying to process this full dataset on anything but a distributed cluster is going to be an exercise in frustration.

Areas to explore

lewfish commented 2 years ago

I would expect to_pandas and to_dataframe to use up memory on the notebook instance since Pandas dataframes are non-lazy, in-memory objects.

Now that we've exhausted a couple of workarounds, do you want to just ask the question in the xArray forum about how to convert from Zarr to Parquet? You could link to this, https://github.com/pydata/xarray/issues/6811, and https://dask.discourse.group/t/workers-dont-have-promised-key-error-and-delayed-computation/936 to show what we've tried so far.

vlulla commented 2 years ago

Good idea Lewis! I had already asked this question on the xarray community discussion forum ... https://github.com/pydata/xarray/discussions/6905 but there has been no response yet. I suspect that the terseness of the question might have something to do with the lack of response. Anyways, what are the norms for editing already mentioned forum questions? Is it alright to edit the original question with these additional links/context? Or is it better to add these as a comment to that question? I'm leaning towards adding a comment.

lewfish commented 2 years ago

Ok, somehow I missed that you posted that. I would add another comment in the thread with the additional information.

vlulla commented 2 years ago

Ok, done. I hope we get some discussion going!