google / Xee

An Xarray extension for Google Earth Engine
Apache License 2.0
234 stars 28 forks source link

FeatureCollection integration with Dask Dataframes #153

Closed alxmrs closed 4 weeks ago

alxmrs commented 4 months ago

I wrote this a while ago and thought I would share it in case there is capacity to add it to EE’s Python team roadmap.

Design doc: https://docs.google.com/document/d/1Ltl6XrZ_uGD2J7OW1roUsxhpFxx5QvNeInjuONIkmL8/edit

I think Xee is a good home for this, though another package (say, dask-ee) would be good, too.

aazuspan commented 3 months ago

I just came across this and it lines up with some old experiments that I recently wrote up for grabbing FCs in parallel with Dask.

I used the same from_map approach outlined in the design doc, although my method for grabbing chunks was a little more crude - relying on geopandas to parse features returned from getInfo. I didn't try to tackle the question of optimal chunking, but I did look at some optimizations like column projection and eagerly grabbing metadata that might be of interest.

alxmrs commented 3 months ago

Amazing! That we’ve identified the same solution means it probably is most fitting for the problem. Thanks so much Aaron.

Would you like to take your approach and add it to Xee? Or, start a new package? I’d be happy to lend a hand. It seems like you’ve figured out the nitty gritty.

aazuspan commented 3 months ago

Yeah, I'd be happy to write up an implementation for Xee (maybe xee.dataframe.read_fc?). Your design doc will be a big help, but there are a few specific design questions from there I'd love your thoughts on to get started:

computeFeatures or getInfo

The deferreds will make the underlying calls to ee.data.computeFeatures() that will produce Pandas Dataframes.

I haven't done any rigorous testing, but empirically I've found getInfo runs consistently faster than computeFeatures, regardless of page size. The main advantage I see of computeFeatures is decoupling the IO chunk size (i.e. page size) from the Dask chunk size. Do you think that's critical, or are there other strong reasons to stick with computeFeatures?

Optimizing chunk size

The tricky part, IIUC, will be calculating the appropriate FC shards.

I'm open to any ideas here. My inclination would be to put a hard limit at 5000 features but otherwise leave this up to the user to tweak since optimal chunk size will be so dataset dependent. In terms of avoiding data limits, I don't see any reliable way to estimate bytes per chunk without grabbing features, so again I'd lean towards making that a user responsibility. If you see an opportunity to automatically optimize though, I'm 100% in favor.

ee.Initialize

To actually use a dask cluster, the EE Team needs to make ee.Initialize() pickleable. Right now, they are working on eliminating the need for this call altogether.

My hacky solution to initializing workers was to shove ee.Initialize into the mapped function. I didn't run into any pickling issues there, so maybe stick with that for the time being?

alxmrs commented 3 months ago

I'd be happy to write up an implementation for Xee

Wow, thank you so so much! I know at least one project where this would be a game changer: https://github.com/wildlife-dynamics/ecoscope.

cc: @walljcg

maybe xee.dataframe.read_fc?

Ah, the hardest part of our jobs — naming. I have a few thoughts here:

Do you think that's critical, or are there other strong reasons to stick with computeFeatures?

No, this is not critical inherently. To the end user, this implementation detail will be hidden. Only the performance characteristics will be noticed. Thus, I wouldn’t sweat the decision too much — it seems reversible. It would be prudent to get input from an EE platform engineer to see if they know something we don’t wrt performance, given they have capacity.

FWIW, I haven’t used the FC API too much. My work with EE primarily involves rasters. Your experience trumps my doc’s suppositions.

The main advantage I see of computeFeatures is decoupling the IO chunk size (i.e. page size) from the Dask chunk size

Hmm. In my experience with rasters, we do want to separate these two concepts. IO is limited by the EE API, whereas users determine the characteristics of the dask scheduler (i.e. CPU-bound computation). The most performant system for a lot of use cases, in my estimation, will be getting the max IO chucks EE will allow as well as even bigger Dask chunks (given memory optimized VMs and a lot of RAM per worker).

Let’s see how this shapes out in your PR. We can discuss the these details there.

My inclination would be to put a hard limit at 5000 features but otherwise leave this up to the user to tweak

That sounds perfect. I totally agree.

I don't see any reliable way to estimate bytes per chunk without grabbing features

If we can get something like a PyArrow schema on the entire collection (via getInfo()), then I bet we could produce a “good enough” estimate of the bytes per page. This is worthy of investigation after the MVP. For now, I agree, we can delegate this to the user. Later, it would be nice to calculate this up-front, to make an educated guess as a default. (I took the same approach with the Xarray extension).

My hacky solution to initializing workers was to shove ee.Initialize into the mapped function.

I think this is a great solution. In fact, @KMarkert just used this tactic in Xee proper. It was off my radar when I wrote the doc, and I think we should adopt the pattern for dataframes as well. Let’s use Kel’s idioms in the function. As I understand it, both the pickle solution and this “hack” are short term solutions since the EE team intends to get rid of ee.Initialize() altogether.

Thanks again, Aaron.

alxmrs commented 3 months ago

my method for grabbing chunks was a little more crude - relying on geopandas to parse features returned from getInfo

Is this crude? I thought it was elegant. Though, I don't know if we want to add that dependency.

aazuspan commented 3 months ago

Thanks for the detailed feedback, Alex! This is all very helpful. Some responses below, but I also wanted to revisit the question of having this be integral to Xee vs. packaged separately.

I'm now leaning towards a separate package, both in the interest of modularity (e.g. using read_ee without the required geospatial dependencies of Xee) and reducing any maintenance burden on the Earth Engine team. If you're still open to that option, how do you feel about using dask-ee for the package name, as you discussed in the design doc? Your logic of following the dask-bigquery example seems the most straightforward to me, but I don't want to take the name you suggested and claim it on PyPI unless you're fully on board with what. I'm happy to consider other names if you'd rather keep that open. In any case, I would still be excited for you to engage with the package at whatever level you're available and interested.

To mimic BQ, doesn’t it make sense to read_ee? EE is the container for a table just as BQ is.

I'm convinced - read_ee makes sense and is consistent with precedent. I could potentially see loading Image Collection metadata as a dataframe through the same API, so keeping the function type-agnostic and dispatching internally sounds like a good approach.

In fact, @KMarkert just used this tactic in Xee proper. It was off my radar when I wrote the doc, and I think we should adopt the pattern for dataframes as well. Let’s use Kel’s idioms in the function.

Sounds great. I wasn't sure how the approach would scale, so the fact that it's working internally in Xee is reassuring.

Though, I don't know if we want to add that dependency.

Agreed! Parsing the GeoJSON with Pandas shouldn't be a problem. I'm imagining geopandas as an optional dependency hidden behind an as_geodataframe (or similar) flag to read_ee, but I'm open to other ideas.

alxmrs commented 3 months ago

I'm now leaning towards a separate package, both in the interest of modularity (e.g. using read_ee without the required geospatial dependencies of Xee) and reducing any maintenance burden on the Earth Engine team.

That makes a lot of sense to me. I started a repo over here for this purpose, but it is just boilerplate right now.

https://github.com/alxmrs/dee

I’d be happy to change the license/authorship to make us jointly own this repo.

how do you feel about using dask-ee for the package name, as you discussed in the design doc?

I think dask-ee is a better name for the project than dee, especially given that I can’t publish dee on PyPI due to name similarities. If you wanted to use the above repo, I can change the name. Otherwise, feel free to create something new!

I could potentially see loading Image Collection metadata as a dataframe through the same API, so keeping the function type-agnostic and dispatching internally sounds like a good approach.

This is interesting! This hasn’t occurred to me before. What is the use case for a dataframe of IC metadata? What types of queries would folks do on this?

I'm imagining geopandas as an optional dependency hidden behind an as_geodataframe (or similar) flag to read_ee, but I'm open to other ideas.

Consider making the optional dependency dask-geopandas: https://dask-geopandas.readthedocs.io/en/stable/

Cheers, Aaron.

aazuspan commented 3 months ago

Great, I'm happy working with your existing repo. I'll make a PR there once I have an MVP together.

What is the use case for a dataframe of IC metadata? What types of queries would folks do on this?

It's not something I've ever needed to do, but I could see it being helpful for granule-level analysis of cloud cover or data coverage. Definitely not a high priority feature, though.

Consider making the optional dependency dask-geopandas: https://dask-geopandas.readthedocs.io/en/stable/

Yes, this looks perfect.

alxmrs commented 4 weeks ago

As discussed in this issue, the integration now exists: https://github.com/alxmrs/dask-ee