Closed MikeBeller closed 1 week ago
Thanks for posting this. A quick thought before I dig into it more: The Dask Array median docs mention that the current algorithm rechunks the reduced axis (time) to a single chunk. Since the data starts out with chunks (1, 1, y_chunks, x_chunks)
, it's a challenging computation. We have to shuffle the entire dataset.
I could imagine a few possible alternatives:
dask.arary.percentile
?)mI can try to spend a bit of time on the second item later in the week. But the basic idea is to have a function that takes a list of STAC items (or COGs?) and a window. It would then read that window out of each STAC COG that returns a DataArray that's contiguous in time but fits in memory
def read_window(cogs, window):
# or use stackstac.stack?
datasets = [rioxarray.open_rasterio(cog).rio.isel_window(window) for cog in cogs]
# somehow set time?
return xr.concat(datasets)
So you would need to have pretty small chunks in the x / y so that the entire timeseries would fit in memory. Then the median computation should be embarrassingly parallel and will run much smoother.
That said, the fact that you're seeing these internal Dask errors indicates a problem that might be worth pursuing if / when we come across a problem that can't be worked around like this.
Hi Tom. Thanks for your response. Following on your suggestion I replaced the call to median
with mean
, just to see if this resulted in a faster, more reliable run. Short answer: no. The first run completed, and ran in roughly the same amount of time as the successful run of the median
computation (RUN 2 in the original email). (actually the same amount of time between the call to PERSIST and the finish -- which is what matters -- not interested in cluster spinup time for this purpose). The second one errored out the same way as "RUN 3" above -- with "set changed size during iteration. So from an admittedly small sample size -- I'd say the problem is not specifically the median.
Your point about setting up the computation to be more efficiently 'chunkable' -- that makes a lot of sense but I think it may be somewhat confounded by the intrinsic overlap of neighboring Sentinel-2 tiles. That is, based on eyeballing the graph of the original program, for smaller areas, and trying this with and without the call to groupby/stackstack.mosaic, the graph looks a lot more complicated because of the "cascading overlaps" that happen from the mosaicing. But I have little experience reading or tweaking dask graphs, so I'm not sure that's really a problem. If that is part of the problem, I'd be fine with dropping the 'overlap areas' from the sentinel tiles in the original stack, to simplify, if I could figure out an efficient way to do that.
Finally -- as regards your more detailed suggestion of rewriting the query -- I will have to do a lot more reading of the code for stackstac in order to even begin to take a shot at that. Happy for any further input there if you find the time.
Cheers
I have an initial prototype that I'll share later today. It's looking somewhat promising but will need a bit of benchmarking and maybe a bit of work on your part to adapt it to your specific workflow.
https://nbviewer.org/gist/TomAugspurger/c027408ae923222fab2a45d5109d114a / https://gist.github.com/TomAugspurger/c027408ae923222fab2a45d5109d114a has the prototype, but it has issues:
Hi Tom Thanks for this. I ran it on a static 24 core cluster and, interestingly, the computation is quite a bit slower for the 'good' solution than for the 'bad' one. (80% more run time, and 50% more total compute for the 'good' one). I recognize that as you scale the computation up, the shuffles in the 'bad' solution grow non-linearly, so the 'good' solution may become better than the 'bad' one at larger scale. (I have yet to test). Nonetheless, some additional thoughts:
One more complication: I note that your test applies only to a single Sentinel-2 tile (in space), tile 10TET
:
import numpy as np
print("Sentinel 2 Unique Tiles:", np.unique([i.properties['s2:mgrs_tile'] for i in items]))
Sentinel 2 Unique Tiles: ['10TET']
I bring this up because in your test you can safely ignore the fact that Sentinel-2 tiles overlap each other. When you use a larger area for the search, you get multiple overlapping tiles among the items. In my original test I ensured that the same pixel does not appear twice in the media, by doing a groupby/map(stackstack.mosaic) in the original call to stack. This would make the graph messier I think.
I was thinking to try to apply your test to my area of interest, but need to consider this overlapping tile issue. Can I just add a group/mosaic in somewhere? I have a feeling it's more complicated than that, but when reading your code, I get a bit lost in the stackstac, pystack, xr.concatenate stuff, so trying to make this change myself to address the overlap might not go well. :-) I'd be willing to just ignore the overlapping sections of the neighboring tiles (rather than actually mosaicing them) if I could figure out how to exclude them as part of the workflow.
@MikeBeller the Set size changed during iteration
was caused by https://github.com/dask/distributed/issues/5552 and fixed in https://github.com/gjoseph92/stackstac/pull/95, which is released in stackstac==0.2.2. Upgrading should eliminate this error; it's a red herring here.
My guess is that the groupby('day').apply(stackstac.mosaic)
is the problem here. Xarray's groupby can lead to some convoluted dask graphs, and mosaic already produces a convoluted graph as it is. Since you're flattening all the data through a median anyway, pre-flattening by day should be nearly superfluous. What happens if you remove it? (I hope you don't get banding/striping where the tiles overlap.) If it turns out you need this, but it is the performance problem, we should look more at https://github.com/gjoseph92/stackstac/issues/66 to make an optimized way to do this sort of thing.
@TomAugspurger your approach here is an interesting workaround—the stackstac-ception is not something I'd ever considered. I haven't read it that carefully, but I bet there's an easier way to do this.
Just doing median("time")
, say dask wants to rechunk from (time=1, band=1, y=2048, x=2048)
to (time=36, band=1, y=256, x=256
. So what if you initially tell stackstac to use chunksize=256
, and ensure that the median uses 256 as well? Then you have a much simpler rechunking operation, where you're just vertically stacking each 256x256 window—basically the same as your workaround—prior to taking the median. I'd imagine that eliminating the spatial rechunking would make the operation much simpler.
Thanks for chiming in Gabe. I'll look into getting the version on planetary-computer images updated.
I do think it's worth exploring just using stackstac.stack
with the small initial chunks. My initial worry is that this would blow up the size of the task graph, but we should actually verify that before going down this rabbit hole.
My guess is that the
groupby('day').apply(stackstac.mosaic)
is the problem here. Xarray's groupby can lead to some convoluted dask graphs, and mosaic already produces a convoluted graph as it is. Since you're flattening all the data through a median anyway, pre-flattening by day should be nearly superfluous. What happens if you remove it? (I hope you don't get banding/striping where the tiles overlap.) If it turns out you need this, but it is the performance problem, we should look more at gjoseph92/stackstac#66 to make an optimized way to do this sort of thing.
Thanks @gjoseph92 this makes a lot of sense. I was concerned that the double-counting of certain overlapping pixels might produce weird effects. But it's certainly worth a try. I will give it a shot.
I gave this a quick try, and using chunksize=256
with stackstac makes 10 million tasks for the initial dataset. So yeah, that's not going to work.
It seems like we are fighting two battles: The first is that stackstac is not organized to provide the data in small-area-large-time order, and the second is large dask graphs (above a few million tasks?) end up swamping the scheduler node. (true?)
And yet our problem is actually embarrassingly parallel in the space dimension. So one simpler-to-implement idea is to take the smaller tile approach, but split the area of interest into 16 large tiles that are run serially through dask, and the final image assembled at the end. like:
for i in range(num_sub_areas):
sub_area_persisted[i] = sub_area[i].persist()
wait(sub_area_persisted[i])
num_sub_areas would need to be small enough to keep the graph size below some threshold Thoughts?
Mike
our problem is actually embarrassingly parallel in the space dimension
This is exactly what @TomAugspurger was observing, and trying to take advantage of in the stackstac-within-dask approach. I agree that it makes it particularly frustrating that this is so hard to do.
the second is large dask graphs (above a few million tasks?) end up swamping the scheduler node
To me this is the real root problem. My rule of thumb is that anything over 500k tasks is a no-go for the distributed scheduler. The scheduler hangs for minutes just unpacking the graph, and is so overloaded (possibly also due to GC because of the sheer number of Python objects https://github.com/dask/distributed/issues/4987) that it can't respond to workers fast enough, causing enormous memory buildup due to https://github.com/dask/distributed/issues/5223 and https://github.com/dask/distributed/issues/5114, and an unresponsive dashboard.
Obviously if the scheduler could handle millions of tasks, this would all be easy!
stackstac is not organized to provide the data in small-area-large-time order
Yeah, stackstac is currently designed around always having 1 chunk per asset. It was both easier to design this way, and felt important for task-culling reasons (if you sub-select the time/bands dimensions, it would feel pretty silly to load entire files that you don't need!). But it does lead to lots of tasks.
I think it would be both reasonable and feasible to extend the chunksize=
argument to allow you to specify a chunking for the time and band dimensions though (https://github.com/gjoseph92/stackstac/issues/106). While dask can't handle this large of graphs (not likely to change anytime soon), this would open up a lot of options for doing bigger computations. In your case, you could have stackstac load everything along the time dimension into one chunk, since you need that anyway for the median, which would give you the "split the area of interest into 16 large tiles" behavior you need, while staying nicely in xarray-land.
These seem like excellent future enhancements. Meanwhile to get something working now, I tried a middle ground where I do the time medians on a per-sentinel-2-tile basis. I.e. I group the items returned by the stac search into groups by sentinel tile identifier. Then I compute the medians on a tile by tile basis in serial (to keep the graphs small). This process is quite reliable and reasonably fast.
After computing the per-tile median tiles, I use xr.concat and stackstac.mosaic to combine the persisted tiles. I have a feeling this is not a great strategy as the resulting mosaic has 188,000 tasks in it's DAG, whereas the individual tiles each have only a few hundred or at most a few thousand.
https://gist.github.com/MikeBeller/64cd01bef75e2dfc1b4d46d99a2f0b61
Thoughts on how I can combine the persisted per-tile medians without blowing up the DAG? (This would allow me to do a larger area which is my actual goal. For the example I restricted it to one district instead of the whole AOI.)
@MikeBeller I think part of the problem is that in your stackstac.stack
, you're using bounds_latlon=aoi_bounds
for every tile. You only load a few items per array, but each array ends up being the same dimensions (the size of your full AOI) and is mostly NaNs. That results in tons of extraneous tasks to produce those chunks of all NaNs—in fact, after concatenating I think you'll end up with the exact same number of tasks as when you just loaded all the items into one array.
This is easier to see visually. Both arrays have the same overall footprint: Just data in different places. And the vast majority of that data is NaN:
Also, looks like xr.concat
adds a lot of steps. After removing bounds_latlon
and concatenating just two images (the input nanmedian
arrays only had one layer each), I get this:
It sounds like xarray isn't actually good at combining/mosaicing many "tiles" of arrays into one big one: https://github.com/pydata/xarray/issues/4213#issuecomment-811135190.
Your best bet will probably be to ensure the tiles don't have any overlap, and are perfectly abutting each other, then combine them using something like xr.combine_nested
.
You could do that by splitting your input AOI up into tiles (some simple linspace
/meshgrid
code probably). Then for each tile, do a stackstac.stack(all_items, bounds_latlon=tile_bounds)
. Stackstac will automatically drop the items that fall outside the tile. Then you can persist each tile as you're doing now, and finally try combining them together with xr.combine_nested
.
In general, a way to combine overlapping tiles with xarray feels like a pretty basic need for the geospatial community, and something it would be possible to add (xref https://github.com/pangeo-data/cog-best-practices/issues/4).
(Sidenote, @TomAugspurger just want to mention how awesome it is to be able to launch the PC hub, run someone's notebook from a gist, and have everything just work! Amazing to not have to spend any time fiddling with dependencies, launching clusters, or dealing with auth.)
Thanks @gjoseph92 -- you've provided a lot of ideas and references for me to use in taking another stab at it. (And also a couple of new tools I hadn't seen before so thanks for that). Unfortunately sentinel tiles do overlap by 4900km on each side, but maybe there is a way for me to just slice that off the pre-generated median tiles before persisting them.
Cheers
Mike
I mean they overlap by 4900m, not km. Oops.
@MikeBeller I don't think the overlap matters. I'm saying that you don't need to group the STAC items by sentinel-2 tiles anymore. Just write some linspace
code to divide up your AOI into your own tiles that touch each other, and pass in the entire list of STAC items, plus each of those tiles as the bounds_latlon
, in a for-loop. Then you'll end up with non-overlapping DataArrays, which hopefully you can combine more efficiently with xr.combine_nested
.
@MikeBeller I don't think the overlap matters. I'm saying that you don't need to group the STAC items by sentinel-2 tiles anymore. Just write some
linspace
code to divide up your AOI into your own tiles that touch each other, and pass in the entire list of STAC items, plus each of those tiles as thebounds_latlon
, in a for-loop. Then you'll end up with non-overlapping DataArrays, which hopefully you can combine more efficiently withxr.combine_nested
.
Ah -- thanks yes that's helpful. I was hung up on potential reading efficiencies of "one tile at a time", but what you describe is a lot simpler to code up for sure. I will give it a shot.
I tried your suggestion @gjoseph92 of using combine_nested -- took a couple of stabs first using a 2 x 2 grid here:
https://gist.github.com/MikeBeller/2fd43ca9393d775b8190cce7d3be9259
But ran into a problem where the x-coordinates were not lining up between the tiles -- and so they ended up 'doubling' in the final image. I.e. the output image had twice as many x coordinates as the full-sized image would be if created directly. And you can see from the notebook it's because the coordinates don't exactly line up -- as the values on the X index have non-even intervals. Perhaps I just made some kind of simple error in my attempt? More likely, I have a feeling that getting around this issue is what @TomAugspurger was doing in his original example above, where he slices by pixel, but I am having trouble following the steps.
For what it's worth: A second attempt with just vertical strips: https://gist.github.com/MikeBeller/cf2c479d259a01af0791a08735825474 but this one doubles in the Y direction. Bummer.
I reran the prototype in https://github.com/microsoft/PlanetaryComputer/issues/12#issuecomment-989193621 (slightly refactored to https://nbviewer.org/gist/TomAugspurger/7b4d8448c5d4ad29af036043b7527bba). Here's the performance report: https://gistcdn.githack.com/TomAugspurger/8c22e5bbcbf2974f52d10dd073a49b04/raw/afe94ae9b48181edfbe2832c9eabf84fdd6ccc34/good-median.html. As expected, there's zero communication and memory use is stable:
I haven't actually checked the output, and my note above said that there was an off-by-one error somewhere, but this might be an idea worth pursing a bit more. @MikeBeller are you able to check if that method works on your own use-case? Note that this is on our staging hub with and updated stackstack (0.2.2). I'll try to get that updated today on the production hub.
Edit: opened https://github.com/dask/dask/issues/8526 to discuss whether Dask can do the rechunking in the I/O layer. I'd love to preserve the stackstac.stack(...).median(dim="time")
spelling of this operation.
@TomAugspurger thanks for the refactored code. It works for a small area (modulo the off-by-one) but not a larger one, as you can see from the following gist: https://gist.github.com/MikeBeller/22f41bf27b5588840c6b525ad99c4fe0
(The gist is mostly just your notebook with the bbox replaced with a couple of different size options.)
At first look, the boundary between "works" and "doesn't work" seems to be when the AOI spans multiple Sentinel 2 tiles.
It wasn't immediately obvious to me how to adjust for that, and I had limited time to look in to it today, but I wanted to get you feedback in a timely manner.
@TomAugspurger for the off-by-one error, my guess is that you're not accounting for the fact that bounds include the width of the rightmost pixel, so it's one extra step of size resolution
larger than the distance between the first and last x coordinate. stackstac.array_bounds
will do this for you.
The ValueError: cannot reindex or align along dimension 'time' because the index has duplicate values
isn't too surprising for this method. The bigger problem is that different spatial regions will have different numbers of images within them. Taken to the extreme, if you want to mosaic the entire world, you'll obviously have a different number of scenes (and different timestamps) in Japan vs Spain. xr.concat(..., dim=y)
wants the arrays to be the same shape in all dimensions except y
, but in general they won' be the same shape along time until you do some operation to coerce them to a consistent number of images for every tile.
I think the better solution is to move the median into read_window
. So rather than concatenating tiles of shape (T, X, Y)
(where T
may be different every time), you're concatenating (1, X, Y)
which will work.
Also just FYI, I've been working on https://github.com/gjoseph92/stackstac/issues/106 which hopefully will be an easier solution for this.
In general though @TomAugspurger I think you're onto something with this approach. The pattern of "split the AOI up into tiles, apply the same embarrassingly-parallel logic to every tile" is very common in this field because it's both conceptually simple and works for most cases. I could see value in a higher-level library that makes it easy to split the AOI into tiles, then create 1 dask task per tile, where the task itself:
@gjoseph92 thanks. I'd like to point out two things that you may wish to take into consideration in building the 'easier solution' you mention:
(1) Sometimes "the embarrassingly parallel logic" is slightly more complicated than a median over time. In my case, I need to mask each band for clouds using the QA band. It might make sense if the user could supply a function to be applied to each tile to create the composite.
(2) To make the solution work for large areas of interest, it would need to still function correctly if tiles are in different UTM zones -- will the indexing calculations work if the calculation has to revert to epsg 4326 ? (or some other solution)
Just randomly chiming in that a colleague programmed this task last year (pre-stac and pre-planetary computer but using the hls cogs from blob storage and an aks instance roughly equivalent to the PC). As for the approach here, we processed all images within a year for a single HLS tile, and saved the median of each tile individually to blob storage. I then created a VRT mosaic and transformed to a single geotiff using gdal. Yes, it was hacky, but we couldn't come up with another solution. As I recall it took a day or two to produce an annual mosaic for the conterminous U.S. I think we were using a 64 node cluster.
Anyway, the relevant xarray code is here
Hi @MikeBeller @jessjaco @gjoseph92 @TomAugspurger, using the R sits
package we have created large scale mosaics of Sentinel-2 data in a reasonable sized VM (64 cores, 256 GB RAM). Please see the following code, that creates a 16-day time series of an yearly data set of Sentinel-2/2A images over the state of Rondonia/Brazil, which has 40 S2 tiles. The data has about the same size mentioned by @MikeBeller.
After the full data set has been created, you can merge them using GDAL.
#### parameters ####
period <- "P16D"
output_dir <- "/data/cubes/RO_20m_P16D"
if (!dir.exists(output_dir))
dir.create(output_dir, recursive = TRUE, mode = "0775")
stopifnot(dir.exists(output_dir))
start_date <- "2020-06-01"
end_date <- "2021-09-11"
bands <- c("B02", "B03", "B04", "B05", "B08", "B8A", "B11", "B12", "CLOUD")
tile_names <- c("19LHH", "19LHJ", "20LKM", "20LKN", "20LKP", "20LLM", "20LLN",
"20LLP", "20LLQ", "20LLR", "20LMM", "20LMN", "20LMP", "20LMQ",
"20LMR", "20LNL", "20LNM", "20LNN", "20LNP", "20LNQ", "20LNR",
"20LPL", "20LPM", "20LPN", "20LPP", "20LPQ", "20LQL", "20LQM",
"20LQN", "20LRN", "20LRM", "20LRL", "20LQK", "20LPR", "20MNS",
"20MMS", "20LKQ", "19LHK", "19LGK", "20LKR")
#### source ####
library(sits)
s2_cube_2021 <- sits_cube(
source = "MSPC",
collection = "SENTINEL-2-L2A",
bands = bands,
tiles = tile_names,
start_date = start_date,
end_date = end_date,
multicores = 32
)
# regularize and create a time series of two month mosaics
reg_cube <- sits_regularize(
cube = s2_cube_2021,
period = period,
res = 20,
output_dir = output_dir,
multicores = 10,
memsize = 128
)
The R code above uses the gdalcubes
R package. Our experience is that, even though the Azure VM has 64 cores, we got a better result using 10 cores. We think the reason is because there might be an I/O limit set by the Planetary Computer back-end. In other words, more cores means more I/O requests, which result in blocking by the back-end. In the above case, we obtained a 237,000 km2 data cube with 24 instances in 24 hours.
Closed
due to inactivity, feel free to reopen if you would like to continue this discussion.
I am working on creating a mosaic of sentinel data for a sizable area of interest -- around 600km by 400km. The goal is to get a median of the non-cloudy pixels over a 2 month period. My experience is that, with the code I've developed, it's not doable reliably even on a 400-core cluster.
To dig in to this I have a reproducible example of a smaller area -- 1/16 of the above example (roughly 150km x 100km), running on a 100 core cluster. This works about 1 out of 3 runs, otherwise dying with various errors. Below is the code, and then the output of 3 runs. The first died with a missing dependent, the second worked, and the third died with set size changed. (Note as regards my previously filed issue #11 -- this is with a static cluster size where I wait for all workers before proceeding.)
Just to clarify why the code is organized the way it is -- I am applying the median over time of each band, after NaNing out pixels that are identified in the Sentinel-2 SCL band as being clouds, I.e. I'm using pixel-by-pixel cloud removal instead of using a stac query based on overall percentage cloudiness, in order to maximize the information I can get from the tiles. This is probably contributing to the complexity. But it also seems like a very reasonable thing to do.
My questions are
Side note as regards scheduler: For larger areas (e.g. the full 600km x 400km) I can't even reliable get the computation going, even with 400 cores. I think it's overwhelming the scheduler. Should I try to (can I) increase memory to the scheduler somehow?
Or perhaps the real problem is the computation is just out of scope unless it's done in a more algorithmicly aligned manner.
Very interested in your thoughts.
RUN 1 -- missing dependent
RUN 2 -- successful
RUN 3 -- set changed size error