SciTools / iris

A powerful, format-agnostic, and community-driven Python package for analysing and visualising Earth science data
https://scitools-iris.readthedocs.io/en/stable/
BSD 3-Clause "New" or "Revised" License
635 stars 283 forks source link

Netcdf load & save Vs dask mp #6048

Open bblay opened 3 months ago

bblay commented 3 months ago

🐛 Bug Report

The dask multiprocessing scheduler seemingly doesn't work with iris. It raises TypeError: cannot pickle '_thread.lock' object.

How To Reproduce

import warnings
from pathlib import Path

import dask
import iris

def load_save(in_fpath, out_folder, scheduler):
    print(scheduler)
    out_folder.mkdir(parents=True, exist_ok=True)

    cubes = iris.load(in_fpath)
    results = []
    for cube in cubes:
        results.append(
            iris.save(cube, out_folder / f'{scheduler}_{len(results)}.nc', compute=False))

    dask.compute(results, scheduler=scheduler)

if __name__ == '__main__':
    warnings.filterwarnings(action='ignore', message='Ignoring a datum in netCDF load')

    in_fpath = Path('omitted.nc')
    out_folder = Path('out')

    # This works.
    load_save(in_fpath, out_folder, scheduler='threads')

    # This doesn't work: cannot pickle '_thread.lock' object
    # Forcing the cube to load its data before saving seems to make it work for smaller files.
    load_save(in_fpath, out_folder, scheduler='processes')

Environment

pp-mo commented 3 months ago

Well it's great to see someone using the lazy saving !

But I think this never worked with a process-based scheduler, and that is a known problem.

I'm a bit puzzled at the error though, this is supposed to raise an internally managed error "The configured dask array scheduler type is ... not supported by the Iris netcdf saver" : here So maybe this code now is hitting another problem, which maybe didn't even occur when we originally wrote it. ( It's not immediately clear from your request where the problem actually occurs, since you didn't include a traceback )

Since we introduced the lazy saving mechanism (which also added distributed support), we found that we cannot do saves with the old-style process scheduler. In the end we decided not to pursue this, and excluded it from our testing.

The decision not to support this seemed appropriate, largely because dask themselves now recommend that it may often be better to use distributed instead -- and I indeed I have seen this more strongly suggested in less official communications ! In my experience there is a bit of a startup debt to using distributed, but it seems more actively developed now, and it is certainly where all the rich debugging tools are.

So, for the reasons just given, we are reluctant to revisit this and make processes 'work'. But if there is a good reason, it presumably could be fixed. I believe there is an awkward obstacle to this, in that we need a mutex solution which multiple processes can use to ensure that only one at a time may open the output file. dask.distributed provides this in the distributed.Lock class (presumably it uses some IPC). However there is otherwise no obvious available solution, except maybe "filelock".

Also unfortunately, afaict we have failed to add a note about this into our Iris dask documentation, which has now replaced our on-prem "Dask Best Practices" webpages. I have just made an issue to improve on this.

So.. Would distributed be a feasible solution in your case, and if not why not ?