nsidc / earthaccess

Python Library for NASA Earthdata APIs
https://earthaccess.readthedocs.io/
MIT License
402 stars 81 forks source link

Document distributed workflow, e.g. "dask delayed for local cluster on AWS EC2 instance *at scale*" tutorial #499

Open DeanHenze opened 6 months ago

DeanHenze commented 6 months ago

Hi all,

Dean here, currently on the Data Publication Team at PO.DAAC. This is my first GitHub issue so please feel free to let me know how I can improve this post. I have a basic case of using earthaccess with dask that I am wondering if you all want to accommodate in future versions.

I start by searching for some MUR granules on PO.DAAC, which are simple gridded netcdf files with a variable for sea surface temperature

import earthaccess
import xarray as xr

earthaccess.login()
granule_info = earthaccess.search_data(short_name="MUR25-JPL-L4-GLOB-v04.2", count=10)

which works fine. Then I define a function to open a file and compute mean SST for one MUR granule:

def sstmean_1file(gran_info_single): # Takes an earthaccess.results.DataGranule object.
    fileobj = earthaccess.open([gran_info_single])[0]
    data = xr.open_dataset(fileobj)
    return data['analysed_sst'].mean().item()

Calling this function with the following code works fine both on my laptop and an AWS EC2 instance:

print(sstmean_1file(granule_info[0]))

Next, using this function with dask delayed works fine on my laptop with the following code:

from dask.distributed import Client
from dask import delayed
import dask.array as da

# Start cluster:
client = Client(n_workers=2, threads_per_worker=1)

# Process several granules in parallel using Dask:
sstmean_1file_parallel = delayed(sstmean_1file)
tasks = [sstmean_1file_parallel(gi) for gi in granule_info[:2]]
results = da.compute(*tasks) 
print(results)

However, it fails with the following output when I run it from an EC2 instance:

2024-03-21 21:08:07,913 - distributed.worker - WARNING - Compute Failed
Key:       sstmean_1file-732177b9-01ce-49e6-b3ee-043b12d06117
Function:  sstmean_1file
args:      (Collection: {'Version': '4.2', 'ShortName': 'MUR25-JPL-L4-GLOB-v04.2'}
Spatial coverage: {'HorizontalSpatialDomain': {'Geometry': {'BoundingRectangles': [{'WestBoundingCoordinate': -180, 'SouthBoundingCoordinate': -90, 'EastBoundingCoordinate': 180, 'NorthBoundingCoordinate': 90}]}}}
Temporal coverage: {'RangeDateTime': {'EndingDateTime': '2002-09-01T21:00:00.000Z', 'BeginningDateTime': '2002-08-31T21:00:00.000Z'}}
Size(MB): 1.8629217147827148
Data: ['https://archive.podaac.earthdata.nasa.gov/podaac-ops-cumulus-protected/MUR25-JPL-L4-GLOB-v04.2/20020901090000-JPL-L4_GHRSST-SSTfnd-MUR25-GLOB-v02.0-fv04.2.nc'])
kwargs:    {}
Exception: 'AttributeError("\'NoneType\' object has no attribute \'open\'")'

2024-03-21 21:08:07,915 - distributed.worker - WARNING - Compute Failed
Key:       sstmean_1file-2bb2c549-d507-484d-a45d-f805bf90cf1c
Function:  sstmean_1file
args:      (Collection: {'Version': '4.2', 'ShortName': 'MUR25-JPL-L4-GLOB-v04.2'}
Spatial coverage: {'HorizontalSpatialDomain': {'Geometry': {'BoundingRectangles': [{'WestBoundingCoordinate': -180, 'SouthBoundingCoordinate': -90, 'EastBoundingCoordinate': 180, 'NorthBoundingCoordinate': 90}]}}}
Temporal coverage: {'RangeDateTime': {'EndingDateTime': '2002-09-02T21:00:00.000Z', 'BeginningDateTime': '2002-09-01T21:00:00.000Z'}}
Size(MB): 1.8608589172363281
Data: ['https://archive.podaac.earthdata.nasa.gov/podaac-ops-cumulus-protected/MUR25-JPL-L4-GLOB-v04.2/20020902090000-JPL-L4_GHRSST-SSTfnd-MUR25-GLOB-v02.0-fv04.2.nc'])
kwargs:    {}
Exception: 'AttributeError("\'NoneType\' object has no attribute \'open\'")'

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
Cell In[2], line 36
     34 sstmean_1file_parallel = delayed(sstmean_1file)
     35 tasks = [sstmean_1file_parallel(gi) for gi in granule_info[:2]]
---> 36 results = da.compute(*tasks) 
     37 print(results)

File /opt/coiled/env/lib/python3.11/site-packages/dask/base.py:628, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    625     postcomputes.append(x.__dask_postcompute__())
    627 with shorten_traceback():
--> 628     results = schedule(dsk, keys, **kwargs)
    630 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

Cell In[2], line 20, in sstmean_1file()
     18 def sstmean_1file(gran_info_single): # Takes an earthaccess.results.DataGranule object.
     19     #earthaccess.login() # intentionally commented out
---> 20     fileobj = earthaccess.open([gran_info_single])[0]
     21     data = xr.open_dataset(fileobj)
     22     return data['analysed_sst'].mean().item()

File /opt/coiled/env/lib/python3.11/site-packages/earthaccess/api.py:212, in open()
    200 """Returns a list of fsspec file-like objects that can be used to access files
    201 hosted on S3 or HTTPS by third party libraries like xarray.
    202 
   (...)
    209     a list of s3fs "file pointers" to s3 files.
    210 """
    211 provider = _normalize_location(provider)
--> 212 results = earthaccess.__store__.open(granules=granules, provider=provider)
    213 return results

AttributeError: 'NoneType' object has no attribute 'open'

A couple follow up points

  1. If I instead have sstmean_1file() take a file-like object (e.g. the output from earthaccess.open()) rather than a earthaccess.results.DataGranule object, then the dask delayed code works even on an EC2 instance. In fact I posted a tutorial on our PO.DAAC Cookbook with does this: https://podaac.github.io/tutorials/notebooks/Advanced_cloud/basic_dask.html
  2. However, I think the function as I've written it here should also work. I imagine there are cases where it is better to have the function take the granule info rather than the file-like object.

Thanks all for the fantastic package, let me know what you think.

Dean

andypbarrett commented 6 months ago

It looks as if you are trying to calculate a time series of global SST; for each time step calculate the mean analysed_sst over lat and lon.

You shouldn't need to use dask explicitly for this because xarray uses dask for these tasks. You may be trying to do something different. If so, I apologize for suggesting "I wouldn't start from here" instead of answering your question.

That said, a more complicated workflow might require dask.delayed.

import earthaccess
import xarray as xr
%%time
earthaccess.login()
granule_info = earthaccess.search_data(short_name="MUR25-JPL-L4-GLOB-v04.2", count=10)

files = earthaccess.open(granule_info)
data = xr.open_mfdataset(files)
mean_sst = data.analysed_sst.mean(dim=['lat','lon'])

mean_sst.plot()
Granules found: 7869
Opening 10 granules, approx size: 0.02 GB
using endpoint: https://archive.podaac.earthdata.nasa.gov/s3credentials
QUEUEING TASKS | : 100%
10/10 [00:00<00:00, 463.19it/s]
PROCESSING TASKS | : 100%
10/10 [00:00<00:00, 2.78it/s]
COLLECTING RESULTS | : 100%
10/10 [00:00<00:00, 1290.28it/s]
CPU times: user 1.31 s, sys: 60.5 ms, total: 1.37 s
Wall time: 6.03 s

The search, access, lazy-load and computation (plotting initiates a compute on the task graph) take 6 s.

mean_sst.values
array([287.01715, 287.01108, 287.0128 , 287.01627, 287.0176 , 287.02246,
       287.0267 , 287.0374 , 287.04288, 287.03943], dtype=float32)

The MUR files are loaded lazily and held as dask.arrays

<xarray.Dataset>
Dimensions:           (time: 10, lat: 720, lon: 1440)
Coordinates:
  * time              (time) datetime64[ns] 2002-09-01T09:00:00 ... 2002-09-1...
  * lat               (lat) float32 -89.88 -89.62 -89.38 ... 89.38 89.62 89.88
  * lon               (lon) float32 -179.9 -179.6 -179.4 ... 179.4 179.6 179.9
Data variables:
    analysed_sst      (time, lat, lon) float32 dask.array<chunksize=(1, 720, 1440), meta=np.ndarray>
    analysis_error    (time, lat, lon) float32 dask.array<chunksize=(1, 720, 1440), meta=np.ndarray>
    mask              (time, lat, lon) float32 dask.array<chunksize=(1, 720, 1440), meta=np.ndarray>
    sea_ice_fraction  (time, lat, lon) float32 dask.array<chunksize=(1, 720, 1440), meta=np.ndarray>
    sst_anomaly       (time, lat, lon) float32 dask.array<chunksize=(1, 720, 1440), meta=np.ndarray>
Attributes: (12/54)
    Conventions:                CF-1.7, ACDD-1.3
    title:                      Daily 0.25-degree MUR SST, Final product
    summary:                    A low-resolution version of the MUR SST analy...
    keywords:                   Oceans > Ocean Temperature > Sea Surface Temp...
    keywords_vocabulary:        NASA Global Change Master Directory (GCMD) Sc...
    standard_name_vocabulary:   NetCDF Climate and Forecast (CF) Metadata Con...
    ...                         ...
    publisher_name:             GHRSST Project Office
    publisher_url:              https://www.ghrsst.org/
    publisher_email:            gpc@ghrsst.org
    file_quality_level:         3
    metadata_link:              http://podaac.jpl.nasa.gov/ws/metadata/datase...
    acknowledgment:             Please acknowledge the use of these data with...
andypbarrett commented 6 months ago

Some experimetation with the workflow I described above reveals that this is relatively efficient for 100 to 1000 granules (6 s to 3'ish minutes) but runs into memory issues for larger numbers of files. For the ~7000 granules returned by the search on a 15 GB instance I crash the kernel.

betolink commented 6 months ago

Hi Dean, thanks for opening this issue!

This is happening because local instances are not shared automatically in Dask distributed computations. When we distribute the function to the workers the authenticated earthaccess instance is not copied over so we need a way to also send either the instance or credentials to the workers.

I wrote a small notebook to illustrate how to forward the auth variables https://gist.github.com/betolink/9a96c8cb283d6f37f3c5ebe3c24c5b70

I think in the near future this won't be necessary and earthaccess should deal with this case as well. Let us know if this worked for you!

DeanHenze commented 6 months ago

Some experimetation with the workflow I described above reveals that this is relatively efficient for 100 to 1000 granules (6 s to 3'ish minutes) but runs into memory issues for larger numbers of files. For the ~7000 granules returned by the search on a 15 GB instance I crash the kernel.

Hi Andy thanks for getting back to me! I understand your response, if a time series of SST mean were the end goal then using Xarray in the way you show would be the way to go. This is a toy example, and I have more complicated cases where dask delayed is necessary.

Hi Dean, thanks for opening this issue!

This is happening because local instances are not shared automatically in Dask distributed computations. When we distribute the function to the workers the authenticated earthaccess instance is not copied over so we need a way to also send either the instance or credentials to the workers.

I wrote a small notebook to illustrate how to forward the auth variables https://gist.github.com/betolink/9a96c8cb283d6f37f3c5ebe3c24c5b70

I think in the near future this won't be necessary and earthaccess should deal with this case as well. Let us know if this worked for you!

Hi Luis, very cool I'll give this a try! Thank you.

DeanHenze commented 6 months ago

Being my first Github issue, not sure how this works. Should I close the issue for now? Sounds like you already have plans on getting to it.

betolink commented 6 months ago

I think we should keep it open until one of the proposed solutions works for this use case.

andypbarrett commented 6 months ago

I don't think this approach is scaling. I ran @betolink gist code for 1000 granules and got a KeyError. I suspect this is because there are 1000 hits on CMR. See below.

I also tried earthaccess.open on the full granule list. And then loop through the fileobjects. This runs into memory issues for granules > 4000 on a 15 GB instance. This also happens for the approach @betolink describes above.

It is not clear to me why this is happening yet. But I suspect memory is not getting released once the aggregation (averaging) step is done. I even used a context manager on xarray.open_dataset but the same memory issue occurs.

QUEUEING TASKS | : 100%|██████████| 1/1 [00:00<00:00, 1889.33it/s]
PROCESSING TASKS | : 100%|██████████| 1/1 [00:00<00:00,  7.74it/s]
COLLECTING RESULTS | : 100%|██████████| 1/1 [00:00<00:00, 28149.69it/s]
2024-03-25 18:28:50,840 - distributed.worker - WARNING - Compute Failed
Key:       sstmean_1file-9ecf331c-da16-4d4c-98f1-0e372c8153a1
Function:  sstmean_1file
args:      (Collection: {'Version': '4.2', 'ShortName': 'MUR25-JPL-L4-GLOB-v04.2'}
Spatial coverage: {'HorizontalSpatialDomain': {'Geometry': {'BoundingRectangles': [{'WestBoundingCoordinate': -180, 'SouthBoundingCoordinate': -90, 'EastBoundingCoordinate': 180, 'NorthBoundingCoordinate': 90}]}}}
Temporal coverage: {'RangeDateTime': {'EndingDateTime': '2003-01-28T21:00:00.000Z', 'BeginningDateTime': '2003-01-27T21:00:00.000Z'}}
Size(MB): 1.8240442276000977
Data: ['https://archive.podaac.earthdata.nasa.gov/podaac-ops-cumulus-protected/MUR25-JPL-L4-GLOB-v04.2/20030128090000-JPL-L4_GHRSST-SSTfnd-MUR25-GLOB-v02.0-fv04.2.nc'])
kwargs:    {}
Exception: "KeyError('accessKeyId')"

---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
Cell In[7], line 1
----> 1 results = da.compute(*tasks)

File /srv/conda/envs/notebook/lib/python3.10/site-packages/dask/base.py:665, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    662     postcomputes.append(x.__dask_postcompute__())
    664 with shorten_traceback():
--> 665     results = schedule(dsk, keys, **kwargs)
    667 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

Cell In[5], line 3, in sstmean_1file()
      1 def sstmean_1file(gran_info_single):
      2     earthaccess.login()
----> 3     fileobj = earthaccess.open([gran_info_single])[0]
      4     data = xr.open_dataset(fileobj)
      5     return data['analysed_sst'].mean().item()

File /srv/conda/envs/notebook/lib/python3.10/site-packages/earthaccess/api.py:217, in open()
    207 """Returns a list of fsspec file-like objects that can be used to access files
    208 hosted on S3 or HTTPS by third party libraries like xarray.
    209 
   (...)
    214     a list of s3fs "file pointers" to s3 files.
    215 """
    216 provider = _normalize_location(provider)
--> 217 results = earthaccess.__store__.open(granules=granules, provider=provider)
    218 return results

File /srv/conda/envs/notebook/lib/python3.10/site-packages/earthaccess/store.py:308, in open()
    299 """Returns a list of fsspec file-like objects that can be used to access files
    300 hosted on S3 or HTTPS by third party libraries like xarray.
    301 
   (...)
    305     a list of s3fs "file pointers" to s3 files.
    306 """
    307 if len(granules):
--> 308     return self._open(granules, provider)
    309 return []

File /srv/conda/envs/notebook/lib/python3.10/site-packages/multimethod/__init__.py:315, in __call__()
    313 func = self[tuple(func(arg) for func, arg in zip(self.type_checkers, args))]
    314 try:
--> 315     return func(*args, **kwargs)
    316 except TypeError as ex:
    317     raise DispatchError(f"Function {func.__code__}") from ex

File /srv/conda/envs/notebook/lib/python3.10/site-packages/earthaccess/store.py:351, in _open_granules()
    349 if endpoint is not None:
    350     print(f"using endpoint: {endpoint}")
--> 351     s3_fs = self.get_s3fs_session(endpoint=endpoint)
    352 else:
    353     print(f"using provider: {provider}")

File /srv/conda/envs/notebook/lib/python3.10/site-packages/earthaccess/store.py:259, in get_s3fs_session()
    255     # Include new credentials in the cache
    256     self._s3_credentials[location] = now, creds
    258 return s3fs.S3FileSystem(
--> 259     key=creds["accessKeyId"],
    260     secret=creds["secretAccessKey"],
    261     token=creds["sessionToken"],
    262 )

KeyError: 'accessKeyId'
DeanHenze commented 6 months ago

I think we should keep it open until one of the proposed solutions works for this use case.

I tried out your solution (auth_env() function) for both the toy case and a more complicated case, and it worked. However I haven't tried scaling yet. I will see if I run into the same issues as Andy.

betolink commented 6 months ago

Just keep in mind that for Dask we should try to match the worker number to the available CPUs. The total memory of the VM will be divided equally for each worker, so it's good to have the "workers memory" panel open to monitor memory utilization.

DeanHenze commented 6 months ago

I am running into errors when scaling, but I think those are connected with worker memory release issues, rather than the one in this ticket.

betolink commented 6 months ago

Nice! (the part of the original issue being resolved) but this should be documented. I wonder if we can have a "earthaccess for distributed workflows" tutorial. FWIW, I managed to run through the whole MUR collection in under 10 minutes using a small 14GB RAM instance! the trick was to use the implicit login() and trigger the memory collection manually.

import gc

def sstmean_1file(gran_info_single):
    fileobj = earthaccess.open([gran_info_single])[0]
    data = xr.open_dataset(fileobj)
    mean = data['analysed_sst'].mean().item()
    del fileobj, data
    gc.collect()
    return mean
DeanHenze commented 6 months ago

FWIW, I managed to run through the whole MUR collection in under 10 minutes using a small 14GB RAM instance! the trick was to use the implicit login() and trigger the memory collection manually.

Woohoo! I was also able to process the entire record using the memory collection, took a little longer at 15 minutes but still sounds great to me.

Nice! (the part of the original issue being resolved) but this should be documented. I wonder if we can have a "earthaccess for distributed workflows" tutorial.

I think I have that somewhat started with the Cookbook tutorial (of course you probably want your own one). I will be sure to update it with some notes about applying the auth_env() function and manual memory collection.

One thing when doing a full scale analysis is the output from earthaccess becomes very long. Maybe there could be an option to suppress the output?

betolink commented 6 months ago

Maybe there could be an option to suppress the output?

Certainly, that could be implemented (probably upstream to pqdm)

andypbarrett commented 6 months ago

I ran into the same issue with output. Adding the cell magic %%capture can eliminate that output.

betolink commented 6 months ago

For some reason %%capture didn't work with Dask output.

DeanHenze commented 6 months ago

Another update, garbage collection does not seem to be cutting it for my more complicated case. I have the Dask task graph up and watch the memory per work slowly accumulate as they churn through the files, until finally crashing. Wondering if for the toy case above, the memory per worker was also accumulating, but just slowly enough that the analysis could finish before it became an issue. I'll rerun the toy example above and check out the task graph while running.

betolink commented 6 months ago

If you share your notebook maybe we can take a look at what's happening.

DeanHenze commented 6 months ago

If you share your notebook maybe we can take a look at what's happening.

Here is the more complicated case (I think that's what you are referring to?) https://github.com/DeanHenze/podaac-dpub/blob/main/cowvr_animation.ipynb. Although you won't be able to run the notebook because it uses a data set that is currently restricted.

mfisher87 commented 6 months ago

This is my first GitHub issue so please feel free to let me know how I can improve this post.

While I have little to contribute (aside from re-titling this issue), I do have some feedback for you. This is one of the best first GitHub issues I've seen! :heart: Welcome to the community and thanks so much for your contribution!

DeanHenze commented 5 months ago

I ran another test with the toy example to see if garbage collection and memory cleanup were working, and I am not sure they are. I think it's just that the files and computations are small enough that it doesn't cause any issues. I ran sstmean_1file () on a local cluster (with the del fileobj, data and gc.collect() lines) and monitored the memory per worker. Here is the memory of one worker both after ~2600 files processed, and ~7700 files processed, you can see the majority of memory used is "unmanaged memory", and it increases:

with_gc_1 with_gc_2

I then repeated this run with a new kernel and cluster but removed the del fileobj, data and gc.collect() lines, and it looks like the total and unmanaged memory are the same (in fact it's a little lower for the ~7700 files):

without_gc_1 without_gc_2

So I'm not sure those lines are working, and I imagine that for my more complex case where the files are much larger, the unmanaged memory gets too high.

betolink commented 5 months ago

I'll try to take a look this week, we are of course entering dask/xarray territory, I think having this workflow figured out will benefit all earthaccess users. FWIW, unmanaged memory is not a solved issue with Dask and the manual garbage collection is for what Python can see. @DeanHenze

DeanHenze commented 5 months ago

@betolink , totally understood and agreed, this seems far out of earthaccess's field of view, and not expecting any solutions from you! Just figured I'd follow up with you since you posted the bit about garbage collection earlier in the conversation.