google / Xee

An Xarray extension for Google Earth Engine
Apache License 2.0
240 stars 28 forks source link

Scaling Considerations #115

Closed ljstrnadiii closed 8 months ago

ljstrnadiii commented 9 months ago

Jotting down some ideas to scale out with other approaches than Beam/xbeam.

Goal

Leverage Xarray and various backends (most often zarr for us) to write the dataset to zarr. This is often the main theme and is the objective of https://github.com/google/Xee/blob/main/examples/ee_to_zarr.py. We write out in order to get the ee ingest out of the way making it easy to take advantage of the Xarray ecosystem.

Why

Often we build very large datasets either in the time dimension, the spatial dimensions or both. For example, we may want high temporal frequency datasets for time series work or large spatial extent for jurisdiction, state, country or even global analysis. I really like the idea of using this package to export scene level data or composites. The former gives us complete control, but can have a massive impact on size. The latter let's us take advantage of EE for monthly/quarterly/annual/etc composites, which greatly reduces the computation and data storage by letting us take advantage of the engine part of EE. This raises the concern for caching [1].

How

The most common approach to scale Xarray datasets is Dask (for us anyways). In the best case, we can connect to a dask cluster and simply call .to_zarr(). There are two immediate considerations that surface with this package: exporting scene level data will create many nan-chunks [2] and .to_zarr() on a cluster will require our dataset is pickleable [3].

Alternative Approaches

In theory, we simply need to build a "target" dataset with .to_zarr(...,compute=False) and then we can manually write to underlying chunks with something like.to_zarr(..., region="auto") or .to_zarr(..., region={'x': slice(0,1024), 'y':slice(0,1024}). This allows us to build the dataset in parallel over these regions. I am experimenting using Flyte to accomplish writing out each region in parallel with map_task described here, but this could be accomplished in many other ways like Dask, Beam, etc. This brings up two other considerations: implications of io_chunks and using the threadpool [4] and how that might fare with dask chunks [5]. There is also a query-per-second quota on EE, which is another consideration [6]. Predicting the number of requests for any ImageCollection seems challenging and we will need to limit the number of concurrent writes somehow. The last consideration is that within each of these processes that write out a region, we need to open the dataset again [7]. How can we efficiently re-open a sub-region of a dataset assuming we can not pickle the thing? I currently take the hit and open the thing in each task/pod/process and then use .isel() to subset the relevant region that corresponds to the task and write to zarr.

Open Questions

  1. Is there anything I am grosely misunderstanding?
  2. Does anyone know if there are ee plans to pickle ee.Initialize (if that is an issue)?
  3. Is there any good way to re-open a subset of the dataset and avoid taking the hit (if there is one)?

Running list of considerations:

  1. How can we cache expensive/compute heavy outputs from ee? Caching at the zarr chunk level is not an option as far as I know. The EE folks recommended we consider exporting to an asset first and then reading from the asset--I like this option, but am not sure how well it would scale or what the mechanism would look like to automatically discover existing assets (can we append with export to asset?). This is also beyond the scope of this package, but will likely be a consideration for others.
  2. From what I understand, exporting scene level data will cause the primary dimension (defaults to time) to reflect the number of unique scenes. This likely means many many nan chunks, which will likely yield too many dask tasks. There is still no good way to build no-ops for nan chunks in dask as far as I understand and this is a topic that shows up all over.
  3. If we want to use Dask to run .to_zarr() out of the box, we would need to pickle the dataset opened on the client side and send to the workers. I believe that is not possible simply due to the ee.Initialize() constraint. Maybe we can simply ee.Initialize() in each dask worker? There is likely still a pickle issue for the dataset?
  4. io_chunks seems to be what determines how many tasks will be submitted to the TreadPoolExecutor, which currently has no limit set on number of threads, but will [default to the number of processes on the current machine.] (https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor).
  5. We should then probably avoid specifying dask chunks when not using a remote cluster as that will create a thread per chunk, which will itself kick off threads per io_chunk. This wasn't intuitive to me at first and seem to be the culprit behind the issue: Connection pool is full, discarding connection: [oauth2.googleapis.com](http://oauth2.googleapis.com/). Connection pool size: 10
  6. The qps quota is another thing worth considering. We can scale out writing these regions such that there are not too many threads running in each worker/pod/process, but the total number of regions being written in parallel will have an effect on the total qps. EE folks raise this concern in the section Concurrent Active Requests. I have already seen that even with exponential backoff, retries accumulate and things fail if we exceed out qps quota.
  7. Opening the dataset can likely be a heavy operation as EE needs to be queried for ImageCollection metadata to construct the dataset. We need to know how many scenes there are, the bands and dtypes, etc. The downside of opening a dataset in each process/pod/worker is the potential for open_dataset(...,engine='ee') to take quite some time.