google / Xee

An Xarray extension for Google Earth Engine
Apache License 2.0
243 stars 28 forks source link

Documentation on how to initialize/authenticate on distributed cluster #99

Closed jterry64 closed 6 months ago

jterry64 commented 10 months ago

When I try to run xarray.open_dataset with chunks set and an active distributed dask cluster, it correctly distributes the chunks to workers. But then it fails because each individual worker needs to ee.Initialize().

Is this a supported use case, and is there documentation or examples on how to set up the authentication/initialization across the whole cluster?

naschmitz commented 10 months ago

Thanks for your question, @jterry64.

We don't have any documentation or examples because running xee across a distributed cluster is something we haven't quite figured out ourselves yet.

As you probably know, the EE client libraries are a bit weird in the sense that methods are patched in after we've made a request to the EE backend. We're working on some changes to eliminate the need for initialization by statically shipping most methods with the Python client libraries, but they won't be ready until Q1 of next year. We'll update this issue and our documentation when those changes are released.

alxmrs commented 10 months ago

Nate, in the meantime: do you think it would be possible to make Initialize pickleable? That may address the issues with both Dask and Beam.

On Thu, Nov 16, 2023 at 1:34 PM Nathaniel Schmitz @.***> wrote:

Thanks for your question, @jterry64 https://github.com/jterry64.

We don't have any documentation or examples because running xee across a distributed cluster is something we haven't quite figured out ourselves yet.

As you probably know, the EE client libraries are a bit weird in the sense that methods are patched in after we've made a request to the EE backend. We're working on some changes to eliminate the need for initialization by statically shipping most methods with the Python client libraries, but they won't be ready until Q1 of next year. We'll update this issue and our documentation when those changes are released.

— Reply to this email directly, view it on GitHub https://github.com/google/Xee/issues/99#issuecomment-1813696650, or unsubscribe https://github.com/notifications/unsubscribe-auth/AARXAB5PLQ5QFBFLN7J7BG3YEV3UVAVCNFSM6AAAAAA7GXV3XOVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTQMJTGY4TMNRVGA . You are receiving this because you are subscribed to this thread.Message ID: @.***>

KMarkert commented 9 months ago

I think there are two problems at play here: 1. initialization of the ee modules and 2. authorization for a worker to make EE calls. The static client library methods will only solve issue one and I am not sure how providing a pickled object will work for the authorized calls.

I think we can add in a check for when xee makes a call to EE if the module is initialized and if so then call ee.Initialize(). This can be a flag when creating a dataset to enable "auto initialize" which a user should only use for distributed jobs.

@naschmitz @alxmrs thoughts?

jterry64 commented 9 months ago

Thanks for the answers! Good to know this is on the roadmap already.

In the meantime, it seems like the library is just making a separate ee.computePixels request per chunk. Since it's not really doing much processing locally (it's mostly happening on GEE servers), I think I could just try to run it locally if I could make Xee send the GEE requests in parallel (up to like the 6000/min limit for the high throughput endpoint).

Is there any good guidance on how I could do this? Would I just need to make a LocalCluster with many threads per worker? If possible I'd like to be hitting close to the maximum request quota if I'm requesting a dataset with a lot of chunks. But maybe I'm misunderstanding how it processes chunks under the hood.

noahgolmant commented 8 months ago

@jterry64 the current implementation is already multithreaded, so not sure a LocalCluster will help things. Reading the dataset into numpy is parallelized with the concurrent module, and saving to zarr saves each chunk in parallel with a threaded Dask scheduler (which is the default behavior of xarray).

Multiprocessing on a local cluster fails because the EE initialized session data is not pickle-able, it is the same reason why the distributed version fails today I think? This still requires some RAM, since computePixels does materialize each chunk's data on the machine before saving it to zarr.

ljstrnadiii commented 7 months ago

Even if we get earth sessions to pickle and dask to "work", are there any concerns with nested threading and keeping a hold on the number of requests per min?

Since dask workers use threading by default, and if each chunk then uses the concurrent threading module to make the computePixel call, then it might be a bit hard to manage the number of requests since dask doesn't directly support a maximum number of tasks, etc.

alxmrs commented 7 months ago

I haven’t tested this out on Dask, but I do have experience running it with xbeam. This is also a multi-threaded, concurrent environment (it uses Dask threads, Beam/Dataflow VMs, and concurrent.futures). Xee works here (and should on a Dask cluster) since it implements retries with exponential backoff.

The cost is that we find out late when a pipeline has a logical error, but the benefit is that we get all the data in a parallel way without explicit throttling.

On Sun, Feb 4, 2024 at 11:16 AM Leonard @.***> wrote:

Even if we get earth sessions to pickle and dask to "work", are there any concerns with nested threading and keeping a hold on the number of requests per min?

Since dask workers use threading by default, and if each chunk then uses the concurrent threading module to make the computePixel call, then it might be a bit hard to manage the number of requests since dask doesn't directly support a maximum number of tasks, etc.

— Reply to this email directly, view it on GitHub https://github.com/google/Xee/issues/99#issuecomment-1925575284, or unsubscribe https://github.com/notifications/unsubscribe-auth/AARXABZTKKNHFWIXDMC4FRLYR4DRZAVCNFSM6AAAAAA7GXV3XOVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTSMRVGU3TKMRYGQ . You are receiving this because you were mentioned.Message ID: @.***>

ljstrnadiii commented 7 months ago

Makes sense! I didn't know dask threads are used with xbeam--havent used it yet.

I have definitely hit limits where even with retries and exponential back off it fails due to longer running ee tasks like calling median over an image collection or hitting our number of concurrent connection quota or number of read requests quota. It's hard to know in advance how far you can push these limits as I'm sure you already know!

I try to control for that with large spatial mosaics by first calling to_zarr with compute=False and then submitting batches of chunks using to_zarr with region={...} where the number of batches is tuned per dataset to a number that does not seem to put too much pressure on our quotas. Submitting these tasks to dask or flyte or beam with some sort of dynamic concurrency might be the most robust, but sounds rather complicated.

Another small thing that could possibly help is the TODO of surfacing the pool size arg as a backend arg for the ThreadExecutor. I am considering picking up that task!

naschmitz commented 6 months ago

I'm going to close this discussion since we've put out an example running an Xee pipeline on Dataflow.