gjoseph92 / stackstac

Turn a STAC catalog into a dask-based xarray
https://stackstac.readthedocs.io
MIT License
250 stars 49 forks source link

stackstac.stack to support one time coordinate per unique datetime? #66

Open thuydotm opened 3 years ago

thuydotm commented 3 years ago

Please see below an example running on the Planetary Computer using Esri 10m Land Cover data, where each STAC item is derived from a mosaic of many images. The output is a 4D cube with the time dimension is 4 and the time coordinates are just 2020-06-01T00:00:00 repeated.

The documentation clearly stated that ``time`` will be equal in length to the number of items you pass in, and indexed by STAC Item datetime. But in a more natural way, it's expected that the dataarray should have one time coordinate per unique datetime in the STAC items. Would stackstac.stack support this feature?

import numpy as np
import planetary_computer as pc
import pystac_client
import stackstac

catalog = pystac_client.Client.open(
    "https://planetarycomputer.microsoft.com/api/stac/v1/"
)
point = {"type": "Point", "coordinates": [-97.807733, 33.2133019]}

io_lulc_search = catalog.search(collections=["io-lulc"], intersects=point)
io_lulc_items = [pc.sign(item).to_dict() for item in io_lulc_search.get_items()]
data = stackstac.stack(io_lulc_items, assets=["data"], epsg=3857)

data.shape, np.unique(data.time)

Output: ((4, 1, 185098, 134598), array(['2020-06-01T00:00:00.000000000'], dtype='datetime64[ns]'))

TomAugspurger commented 3 years ago

As a bit of context, the STAC items in this global IO-LULC dataset all have the same timestamp. AFAICT, if you wanted to load 100 of these items with stackstac, memory usage would be ~100x larger than necessary, since the array would be (100, 1, width, height), rather than (1, 1, width, height).

gjoseph92 commented 3 years ago

What about stackstac.mosaic(data)? It would certainly give you the output you're looking for. But it would probably use more memory than necessary, since you'd briefly create 100 layers of unnecessary NaNs before flattening them (as Tom said above). However, these NaNs would maybe not actually take 100x the memory, since we use this broadcast trick optimization for the out-of-bounds chunks: https://github.com/gjoseph92/stackstac/blob/2c799e3f0075942cbb76a99f96517d4fe15bc98a/stackstac/to_dask.py#L176-L178

I haven't tested the memory usage; I'd be curious what actually happens.

it's expected that the dataarray should have one time coordinate per unique datetime in the STAC items. Would stackstac.stack support this feature?

We could think about it, but it might be tricky to implement during the stacking itself. If a chunk of the array could be sourced from multiple GDAL datasets, that would require rethinking a lot of the dask graph generation logic.

I might prefer to implement this through optimized mosaic logic instead. The thing we want to skip is generating that all-NaN array for out-of-bounds chunks when we know it will immediately get dropped by a mosaic. There's already logic that avoids even opening an asset for spatial chunks that don't overlap with that asset: https://github.com/gjoseph92/stackstac/blob/2c799e3f0075942cbb76a99f96517d4fe15bc98a/stackstac/prepare.py#L233-L238 If we did #13, I have a feeling mosaic could look at each spatial chunk, pluck out only the assets that overlap with that spatial chunk, and mosaic only those together. However, I'd want to test the current mosaic performance, because between the broadcast trick and short-circuiting the dataset open, performance might already be pretty good.


So if mosaic performance is good (or can be good), I might be in favor of a flatten_dates kwarg to stack, or even a separate function, which is just a convenience for arr.groupby("time").apply(stackstac.mosaic) or something like that.

TomAugspurger commented 3 years ago

So if mosaic performance is good (or can be good), I might be in favor of a flatten_dates kwarg to stack, or even a separate function, which is just a convenience for arr.groupby("time").apply(stackstac.mosaic) or something like that.

👍 either of those sound perfect to me.

gjoseph92 commented 3 years ago

@TomAugspurger or @thuydotm, any interest in trying out mosaic on these cases and seeing how performance is right now?

TomAugspurger commented 3 years ago

I think we can conclude that stackstac(...).pipe(stackstac.mosaic) works well enough. I was able to build a DataArray with this property (all the same datetime) whose size in memory would be 860.26 GiB. I ran it through stackstac.mosaic, which cut things down to 215.06 GiB and then persisted the result on a cluster with ~256GB of memory. Here's a performance report.

https://gistcdn.githack.com/TomAugspurger/587bd0e00860b52be9c3830e3bbca35c/raw/17323077f808cfae02ef7dad79d1648cd8591499/stackstac-mosaic.html

There is a decent amount of communication, but I'm not sure how concerned to be about that... IIUC, mosaic might need to transfer chunks from one worker to another. I do have a theoretical (unconfirmed) concern that the broadcasting trick with np.nan doesn't play well distributed's worker assignment. When distributed is in decide_worker, do we know what it thinks the size of the all-NaN chunk is? Hopefully it's the reduced size, and not the size of the full array.

Here's the code I ran

import stackstac
from pystac_client import Client
import planetary_computer as pc

bounds = (120.9, 14.8, 120.91, 14.81)

catalog = Client.open("https://planetarycomputer.microsoft.com/api/stac/v1")

search = catalog.search(collections=["io-lulc"], bbox=bounds)

# Check how many items were returned
items = list(search.get_items())
print(f"Returned {len(items)} Items")

signed_items = [pc.sign(item).to_dict() for item in items]

ds = stackstac.stack(signed_items, epsg=32650, chunksize=4096)

# cluster
from dask_gateway import GatewayCluster

cluster = GatewayCluster()
cluster.scale(28)
client = cluster.get_client()

ds2 = stackstac.mosaic(ds).persist()
gjoseph92 commented 3 years ago

Nice to not see any spilling to disk in that performance report!

IIUC, mosaic might need to transfer chunks from one worker to another

It definitely would, just like any other reduction (mean, sum, etc.). I haven't looked at the graphs it makes yet, but they may be a little weird, since it's not a tree-reduce graph, but conceptually more like functools.reduce (since a mosaic is order-dependent, you have to have the mosaic of the previous layers in order to do the next one). There might be a way to improve the parallelism by redesigning it to be tree-like, but preserving order. This isn't memory-related, just another optimization idea. (Not that it looks like that task stream needs any more parallelism?)

I do have a theoretical (unconfirmed) concern that the broadcasting trick with np.nan doesn't play well distributed's worker assignment

Seems like it does!

In [1]: import numpy as np

In [2]: from dask.sizeof import sizeof

In [3]: trick = np.broadcast_to(np.nan, (2048, 2048))

In [4]: sizeof(trick)
Out[4]: 8

In [5]: full = np.full_like(trick, np.nan)

In [6]: sizeof(full)
Out[6]: 33554432

So hopefully decide_worker would schedule the where on the worker that already had one of the real input layers, instead of the one where the all-NaN chunk lived.

One interesting thing to look at would be how the initial chunks of the dataset get prioritized by dask.order. With https://github.com/dask/distributed/pull/4967, if we can get chunks that are spatially next to each other to also have similar priorities, they'll mostly get scheduled to the same worker, which will reduce the transfer necessary for the mosaic. This could actually be an argument for an optimization that skips the all-NaN chunks when building the mosaic graph, since I think dask.order would end up better prioritizing that graph since those irrelevant tasks won't take up space.

RichardScottOZ commented 3 years ago

Thanks! I had been looking at that landcover dataset a few weeks ago and wondering about an approach.

gjoseph92 commented 2 years ago

Following up; is there anything we want to do here?

Do we want to add any convenience functionality:

So if mosaic performance is good (or can be good), I might be in favor of a flatten_dates kwarg to stack, or even a separate function, which is just a convenience for arr.groupby("time").apply(stackstac.mosaic) or something like that.

Or maybe some documentation about this?

Or just close?

RichardScottOZ commented 2 years ago

Convenience functions are good I think Gabe.

hrodmn commented 2 years ago

+1 for a flatten_dates arg to stackstac.stack, though it will be important to make sure the the raster nodata and stackstac.stack fill_value parameters are being used correctly.

gjoseph92 commented 2 years ago

I'm leaning towards that too. I think that with https://github.com/gjoseph92/stackstac/pull/116, some of the hard part is already done. The process would basically be: