euroargodev / argopy

A python library for Argo data beginners and experts
https://argopy.readthedocs.io
European Union Public License 1.2
176 stars 38 forks source link

Asynchronous, parallel/concurrent data fetching from a single Argo data server ? #365

Open gmaze opened 2 months ago

gmaze commented 2 months ago

This may be a design already implemented in the test_data CLI used to populate CI tests data in mocked http servers.

However, I wonder if we should do this when fetching a large amount of file from one of the GDAC servers (https and s3) ?

The fsspec http store is already asynchronous but I don't quite understand how is parallelisation implemented for multi-files download:

fs = fsspec.filesystem("http")
out = fs.cat([url1, url2, url3])  # fetches data concurrently

Our current option is to possibly use multithreading with the parallel option of the datafetcher, that is in httpstore.open_mfdataset. With this design, we apply pre/post-processing of Argo data on chunks in parallel, but that is different from downloading in parallel, then processing in parallel (possibly with another mechanism)

eg: https://stackoverflow.com/questions/57126286/fastest-parallel-requests-in-python


async def get(url, session):
    try:
        async with session.get(url=url) as response:
            resp = await response.read()
            print("Successfully got url {} with resp of length {}.".format(url, len(resp)))
    except Exception as e:
        print("Unable to get url {} due to {}.".format(url, e.__class__))

async def main(urls):
    async with aiohttp.ClientSession() as session:
        ret = await asyncio.gather(*(get(url, session) for url in urls))
    print("Finalized all. Return is a list of len {} outputs.".format(len(ret)))

urls = websites.split("\n")
start = time.time()
asyncio.run(main(urls))
end = time.time()
gmaze commented 2 months ago

One small test:

from argopy import ArgoIndex
import fsspec
import xarray as xr
from argopy.stores import httpstore

idx = ArgoIndex(index_file='bgc-s').load().search_wmo_cyc(6903091, np.arange(1,45))
urls = [idx.host + "/dac/" + str(f) for f in idx.search['file']]

Method 1:

%%time
fs = fsspec.filesystem("http")
out = fs.cat(urls)  # fetches data concurrently
results = []
for url in out:
    results.append(xr.open_dataset(out[url]))
>>> CPU times: user 1.2 s, sys: 240 ms, total: 1.44 s
>>> Wall time: 6.95 s

Method 2:

%%time
results = httpstore().open_mfdataset(urls, concat=False)
>>> CPU times: user 1.52 s, sys: 255 ms, total: 1.78 s
>>> Wall time: 5.3 s
gmaze commented 2 months ago

what's taking time is the creation of xarray dataset, not data download,

so may be this is not where to search for performance improvement