pytroll / trollflow2

Next generation Trollflow. Trollflow is for batch-processing satellite data using Satpy
https://trollflow2.readthedocs.org/
GNU General Public License v3.0
10 stars 15 forks source link

Add callback functionality for dask-delayed dataset saving #168

Closed gerritholl closed 1 year ago

gerritholl commented 2 years ago

Add functionality for a callback function that is called as soon as a file has been written. This could be used to ship out written files as soon as they are done, rather than waiting for all files to be complete.

gerritholl commented 2 years ago

With thanks to @martindurant for the StackOverflow answer at https://stackoverflow.com/a/74354842/974555 pointing out how to do this :)

codecov[bot] commented 2 years ago

Codecov Report

Merging #168 (6bd0218) into main (ae95cb8) will increase coverage by 0.16%. The diff coverage is 99.29%.

@@            Coverage Diff             @@
##             main     #168      +/-   ##
==========================================
+ Coverage   95.56%   95.73%   +0.16%     
==========================================
  Files          13       13              
  Lines        2821     2954     +133     
==========================================
+ Hits         2696     2828     +132     
- Misses        125      126       +1     
Flag Coverage Δ
unittests 95.73% <99.29%> (+0.16%) :arrow_up:

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Changed Coverage Δ
trollflow2/plugins/__init__.py 94.18% <98.36%> (+0.40%) :arrow_up:
trollflow2/tests/test_trollflow2.py 98.51% <100.00%> (+0.10%) :arrow_up:
gerritholl commented 2 years ago

Putting on draft as I'm trying to figure out a good way for the callback function to get the information it may need.

gerritholl commented 2 years ago

Works wonderfully in the unit tests, but in the real world, the files to be written are empty when the callbacks are called, at least for GeoTIFF.

And the trollflow2 launcher complains about missing files.

gerritholl commented 2 years ago

It works for some writers but not others. Example:

import os.path
import xarray as xr
import dask.array as da
from dask import delayed
from pyresample import create_area_def
from satpy.tests.utils import make_fake_scene

fake_area = create_area_def("sargasso", 4326, resolution=1, width=5, height=5, center=(0, 0))
fake_scene = make_fake_scene(
    {"dragon_top_height": (dat := xr.DataArray(
        dims=("y", "x"),
        data=da.arange(25).reshape((5, 5)))),
     "penguin_bottom_height": dat,
     "kraken_depth": dat},
    daskify=True,
    area=fake_area)

def on_done(result, fn):
    print("Size for", fn, "is", os.path.getsize(fn))
    return result

objs = []
for i in range(5):
    fn = f"/tmp/test{i:d}.tif"
    obj = fake_scene.save_dataset("dragon_top_height", filename=fn,
        compute=False)
    objs.append(delayed(on_done)(obj, fn))
print(objs)
da.compute(objs)

For .tif, this results in:

Size for /tmp/test0.tif is 0
Size for /tmp/test4.tif is 0
Size for /tmp/test3.tif is 0
Size for /tmp/test1.tif is 0
Size for /tmp/test2.tif is 0

For .png:

Size for /tmp/test3.png is 100
Size for /tmp/test2.png is 100
Size for /tmp/test0.png is 100
Size for /tmp/test4.png is 100
Size for /tmp/test1.png is 100

For .nc:

Size for /tmp/test4.nc is 22742
Size for /tmp/test1.nc is 22742
Size for /tmp/test2.nc is 22742
Size for /tmp/test0.nc is 22742
Size for /tmp/test3.nc is 22742
gerritholl commented 2 years ago

The difference is that for the NetCDF and simple_image writers, the result of save_dataset is a Delayed object. For geotiff, it's a list of a dask array with a RIODataset object.

gerritholl commented 2 years ago

I also tried to replace

obj = fake_scene.save_dataset("dragon_top_height", filename=fn, compute=False)
obs.append(delayed(on_done)(obj, fn))

with

obj = fake_scene.save_dataset("dragon_top_height", filename=fn, compute=False)
(src, targ) = obj
objs.append(delayed(on_done)(da.store(src, targ, compute=False), fn))

but the problem remains, when on_done is called the files still have size 0.

gerritholl commented 2 years ago

When I add

targ.close()

it results in

Size for /tmp/test0.tif is 551
Size for /tmp/test2.tif is 551
Size for /tmp/test1.tif is 551
Size for /tmp/test4.tif is 551
Size for /tmp/test3.tif is 551

which corresponds to only the header

djhoese commented 2 years ago

When you run this code that you tried:

obj = fake_scene.save_dataset("dragon_top_height", filename=fn, compute=False)
(src, targ) = obj
objs.append(delayed(on_done)(da.store(src, targ, compute=False), fn))

Can you add a print to on_done to print out what the argument is? Is it a Delayed object? Or is it None? If it is None then I'm extremely surprised at the size 0 files. Oh but it sounds like you needed to close the file for it to flush everything to disk, maybe?

gerritholl commented 2 years ago

The value of result is (None, None):

Size for /tmp/test4.tif is 551
[None, None]
Size for /tmp/test3.tif is 551
[None, None]
Size for /tmp/test2.tif is 551
[None, None]
Size for /tmp/test0.tif is 551
[None, None]
Size for /tmp/test1.tif is 551
[None, None]

Oh but it sounds like you needed to close the file for it to flush everything to disk, maybe?

Right. Example without any pytroll components:

from dask import delayed
import dask.array as da
import os.path
import os
import h5py

ds = da.arange(100).reshape(10, 10)
out = "/tmp/myfile.h5"
os.unlink(out)
f = h5py.File(out, mode='a')

dset = f.create_dataset("data", ds.shape, dtype='f8')

def on_done(result, fn):
    print("Size for", fn, "is now", os.path.getsize(fn))
    return result

x = da.store(ds, dset, compute=False)

da.compute(delayed(on_done)(x, out))
print("Size after computing for", out, "is", os.path.getsize(out))
f.close()
print("Final size for", out, "is", os.path.getsize(out))

which results in

Size for /tmp/myfile.h5 is now 96
Size after computing for /tmp/myfile.h5 is 96
Final size for /tmp/myfile.h5 is 2848

So the issue here appears to be that computing does not result in the file being flushed/closed. I guess for the other writers, the file gets closed before computation is finished.

We might need to put a .flush() somewhere. Not sure where, and not sure what the impacts of doing so might be.

gerritholl commented 2 years ago

I've added a callback that does a close (there's no flush exposed) and it seems to work for GeoTIFF now, at least the unit tests pass. Will do a live test later today.

gerritholl commented 2 years ago

Passes unit tests, but fails the field test with

Traceback (most recent call last):
  File "/opt/pytroll/pytroll_inst/mambaforge/envs/pytroll-py310/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/opt/pytroll/pytroll_inst/mambaforge/envs/pytroll-py310/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/pytroll/pytroll_inst/mambaforge/envs/pytroll-py310/lib/python3.10/site-packages/trollflow2/launcher.py", line 377, in process
    cwrk.pop('fun')(job, **cwrk)
  File "/opt/pytroll/pytroll_inst/mambaforge/envs/pytroll-py310/lib/python3.10/site-packages/trollflow2/plugins/__init__.py", line 334, in save_datasets
    compute_writer_results(objs)
  File "/opt/pytroll/pytroll_inst/mambaforge/envs/pytroll-py310/lib/python3.10/site-packages/satpy/writers/__init__.py", line 536, in compute_writer_results
    da.compute(delayeds)
  File "/opt/pytroll/pytroll_inst/mambaforge/envs/pytroll-py310/lib/python3.10/site-packages/dask/base.py", line 602, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/opt/pytroll/pytroll_inst/mambaforge/envs/pytroll-py310/lib/python3.10/site-packages/dask/threaded.py", line 89, in get
    results = get_async(
  File "/opt/pytroll/pytroll_inst/mambaforge/envs/pytroll-py310/lib/python3.10/site-packages/dask/local.py", line 511, in get_async
    raise_exception(exc, tb)
  File "/opt/pytroll/pytroll_inst/mambaforge/envs/pytroll-py310/lib/python3.10/site-packages/dask/local.py", line 319, in reraise
    raise exc
  File "/opt/pytroll/pytroll_inst/mambaforge/envs/pytroll-py310/lib/python3.10/site-packages/dask/local.py", line 224, in execute_task
    result = _execute_task(task, data)
  File "/opt/pytroll/pytroll_inst/mambaforge/envs/pytroll-py310/lib/python3.10/site-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/opt/pytroll/pytroll_inst/mambaforge/envs/pytroll-py310/lib/python3.10/site-packages/trollflow2/plugins/__init__.py", line 974, in callback_close
    obj[1].close()
AttributeError: 'tuple' object has no attribute 'close'

inspection reveals that obj here is:

[(array([[[ 27,  27,  20, ...,  29,  29,  29],
        [ 27,  27,  20, ...,  28,  28,  28],
        [ 27,  27,  20, ...,  28,  28,  28],
        ...,
        [ 24,  23,  23, ..., 103, 103, 100],
        [ 23,  23,  23, ..., 103, 103, 100],
        [ 23,  23,  23, ..., 100,  97,  97]]], dtype=uint8), 6, 119), (<trollimage.xrimage.RIODataset object at 0x7effbea3d480>, <trollimage.xrimage.RIOTag object at 0x7effbea3eef0>, <trollimage.xrimage.RIOTag object at 0x7eff7109c910>)]

meaning that rather than target being a single RIODataset, it is actually a tuple of (RIODataset, RIOTag, RIOTag) objects… now how/why does that happen?

gerritholl commented 2 years ago

A quick test shows that for this case, everything seems to work as expected if I loop through obj[1] and close all members, but why is obj[1] sometimes a RIODataset and sometimes a tuple?

djhoese commented 2 years ago

I mentioned the reasoning for the multi-element target on slack (tiff tags using dask arrays) and that we should maybe add a wrapper object in XRImage. For the on_done callback, should it maybe be doing the closing? Or maybe be chained after another function that does the closing?

gerritholl commented 2 years ago

It seems to pass a system test now, with one caveat: when we use the callback_move plugin, the launcher complains about missing files.

gerritholl commented 2 years ago

The mover callback could use staging_zone, so that we can have a 1-1 mapping between temporary path and final path. Then, in save_datasets, instead of the line job['produced_files'].put(fmat_config['filename']), we could already put the post-moving filename such that this is where check_datasets will look later. But then they would somehow have to know that early moving is going to happen.

gerritholl commented 2 years ago

Converting back to draft because I'm still getting empty/black/tiny files being produced, despite passing unit tests… I thought it was working when I tested it earlier…

gerritholl commented 2 years ago

If I pass

delayed(testlog)(da.store(src, targ, compute=False), ...)

then it seems to work, but so far I am passing

delayed(testlog)(scn.save_dataset(..., compute=False), ...)

and then it doesn't work, in fact the file remains full of zeroes even at the end, even if the result of delayed(testlog) is passed to compute_writer_results.

gerritholl commented 2 years ago

That means that we have to make changes to plugins.save_datasets in trollflow2 and/or compute_writer_results in Satpy.

gerritholl commented 2 years ago

But, if we wrap da.store, the wrapper still only gets called when all files are done, rather than after each file? That would defeat the whole point of the exercise.

gerritholl commented 2 years ago

I wonder what the performance different would be between:

da.compute(da.store(sources, targets, compute=False))

and

da.compute([da.store(src, targ, compute=False) for (src, targ) in zip(sources, targets)])

We may have to do the latter for early moving to work for GeoTIFF.

gerritholl commented 2 years ago

I mentioned the reasoning for the multi-element target on slack (tiff tags using dask arrays) and that we should maybe add a wrapper object in XRImage.

It's not only tiff tags using dask arrays. It's also multiple files. We get

[<trollimage._xrimage_rasterio.RIODataset object at 0x7fc43ea23bb0>, <trollimage._xrimage_rasterio.RIODataset object at 0x7fc43ebab010>, <trollimage._xrimage_rasterio.RIODataset object at 0x7fc43ea47d90>]

Although I could close all three of them, the whole point of the exercise is that I would move each file as soon as it's finished. A callback_move would have to be called three times, once for each file. That's not how Satpy's compute_writer_results calls da.store.

djhoese commented 2 years ago

Ah ok, so the writer returns multiple files, but XRImage only deals with one file at a time.

gerritholl commented 2 years ago

Needs https://github.com/pytroll/satpy/pull/2281

gerritholl commented 2 years ago

It seems to work now with https://github.com/pytroll/satpy/pull/2281, but there appears to be a problem with resource usage. There is a large increase in RAM usage and full disc processing that previously used 80 GB is running out of RAM on a 128 GB machine.

gerritholl commented 2 years ago

Calculation for FCI over Europe without callbacks:

tf2-dask-without-callbacks

with callbacks:

tf2-dask-with-callbacks
gerritholl commented 2 years ago

In a Satpy context, I don't find much of a performance impact, see https://github.com/pytroll/satpy/pull/2281#issuecomment-1319003086. But when I make a comparison through trollflow2, I see major differences. MCVE:

import os
import copy
from trollflow2.plugins import save_datasets
from unittest import mock
from dask.diagnostics import Profiler, ResourceProfiler, visualize
from sattools.io import plotdir
from glob import glob
from satpy import Scene

input_mda = {}

job = {}
job['input_mda'] = input_mda

seviri_files = glob("/media/nas/x21308/scratch/SEVIRI/202103300900/H-000*")
sc = Scene(filenames={"seviri_l1b_hrit": seviri_files})
names = sc.available_dataset_names()
sc.load(names)

def noop(obj, src, targ, job, fmat_config):
    """Do nothing."""
    return obj

form = [
    {"writer": "geotiff", "fill_value": 0},
]

#call_on_done = None
call_on_done = [noop]
product_list = {
    "fname_pattern": "{productname}-{writer}.tif",
    "output_dir": os.fspath(plotdir()),
    "areas": {
        "sargasso": {
            "products": {
                nm: {
                    "productname": nm,
                    "formats": copy.deepcopy(form)}
                for nm in names
                },
            }
        }
    }
if call_on_done:
    product_list["call_on_done"] = call_on_done
job["product_list"] = {"product_list": product_list}
job['resampled_scenes'] = {"sargasso": sc}
job['produced_files'] = mock.MagicMock()

if call_on_done:
    d = "with"
else:
    d = "without"
with Profiler() as prof, ResourceProfiler(dt=0.05) as rprof:
    save_datasets(job)
visualize([prof, rprof], show=False, save=True, filename=os.fspath(plotdir() /
    f"dask-profile-{d:s}-call-on-done.html"))

Without the wrapper, it takes 15.1 seconds and uses 1.4 GB RAM.

image

With the wrapper, it takes 18.5 seconds and uses 2.5 GB RAM.

image

the dask graphs look strange, with the profiler showing 8 tasks in parallel at a time that the resource profiles shows 100% CPU, and the profiler shows nothing happening after 3 or 6 seconds even if the resource profiler shows there is still ongoing work...

djhoese commented 2 years ago

It looks like you are chaining the delayed objects being computed. Is this necessary? Or can you do delayeds.extend( call and have the delayed handling return a list of delayed objects ([callback(delayed, ...) for callback in callbacks])? I'm wondering if the chaining is making it so the result of one Delayed function is being held on to because it is "needed" by the next one and so on.

gerritholl commented 2 years ago

The dask graph looks very different between without and with the callback, and the SVG increases in size from 6.9M to 8.3M. Profiling also shows that _compute_quantile (in xarray) gets called 8 times without and 16 times with the callback, even though it only appears 8 times in the dask graph.

gerritholl commented 2 years ago

Is this necessary?

I'm wondering if the chaining is making it so the result of one Delayed function is being held on to because it is "needed" by the next one and so on.

I think this is necessary. The first wrapper is closing. The closing must be completed before the moving, because closing flushes data to disk so only after closing I know that the writing is complete.

gerritholl commented 2 years ago

But the chaining cannot explain the problem I observe here, because in this test I am only applying a single callback. And the callback functions themselves are doing very very little, so one waiting for the other should not be an issue.

gerritholl commented 2 years ago

dask graph with noop-wrapper:

dask-delayed-tf2-test

dask graph without noop-wrapper:

dask-delayed-tf2-test-without-wrapper

I cannot (yet) reproduce this big difference in the dask graph outside of my trollflow2 changes, so it may be due to something I've done wrongly.

gerritholl commented 2 years ago

Not passing src only partly solves the problem. I no longer get big increases in RAM for large productions. However, I get an increase in RAM for small productions and an increase in run-time for large productions.

Taking the full FCI test dataset to produce 16 channels and 7 RGBs, all resampled to either a 1 km or a 2 km grid covering Germany, Europe (roughly LAC4) or the full disc. Peak RAM and wall clock time usage without or with callbacks that close the file, move it, and write a log message:

Area RAM without time without RAM with time with
Germany 2.2 GB 0:24.2 3.0 GB 0:24.7
Europe 20.0 GB 02:06.7 20.4 GB 02:40.9
Full disc 84.0 GB 08:39.4 65.9 GB 11:32.3
gerritholl commented 2 years ago

When I use the synchronous scheduler for debugging purposes, producing FCI over Europe (LAC4) without enhancing or resampling is faster with callbacks than without. It's slightly faster when the callback does nothing, but it's much faster when the callback is closing files.

What RAM, baseline time, baseline RAM, close/log time, close/log RAM, noop time, noop
default scheduler 3.4 GB 0:48 3.7 GB 1:02 3.3 GB 1:13
synchronous scheduler 2.9 GB 2:11 2.8 GB 1:22 2.8 GB 2:00
gerritholl commented 1 year ago

Tests are failing because it depends on merged but unreleased satpy capabilities.

pnuu commented 1 year ago

I guess it makes sense that for large area the RAM usage is lower when the fully used data can be released and cleaned.

mraspaud commented 1 year ago

@gerritholl I made an attempt at fixing the merge conflicts

pnuu commented 1 year ago

I fixed the two logger names that were not present in my logging update already merged in main.

pnuu commented 1 year ago

I fixed the couple merge conflicts. At least I hope I did.