radio-astro-tools / spectral-cube

Library for reading and analyzing astrophysical spectral data cubes
http://spectral-cube.rtfd.org
BSD 3-Clause "New" or "Revised" License
97 stars 65 forks source link

dask write issue: dask doesn't like writing when using `multiprocessing` #792

Closed keflavich closed 2 years ago

keflavich commented 2 years ago

Trying to write what I think is a simple cube (FITS cube read with dask, writing to FITS), I get:

Traceback (most recent call last):
  File "/blue/adamginsburg/adamginsburg/repos/spectral-cube/spectral_cube/io/fits.py", line 271, in write_fits_cube
    hdulist.writeto(filename, overwrite=overwrite)
  File "/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/astropy/utils/decorators.py", line 536, in wrapper
    return function(*args, **kwargs)
  File "/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/astropy/io/fits/hdu/hdulist.py", line 953, in writeto
    hdu._writeto(hdulist._file)
  File "/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/astropy/io/fits/hdu/base.py", line 686, in _writeto
    self._writeto_internal(fileobj, inplace, copy)
  File "/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/astropy/io/fits/hdu/base.py", line 692, in _writeto_internal
    data_offset, data_size = self._writedata(fileobj)
  File "/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/astropy/io/fits/hdu/base.py", line 624, in _writedata
    size += self._writedata_internal(fileobj)
  File "/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/astropy/io/fits/hdu/image.py", line 626, in _writedata_internal
    return self._writeinternal_dask(fileobj)
  File "/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/astropy/io/fits/hdu/image.py", line 712, in _writeinternal_dask
    output.store(outarr, lock=True, compute=True)
  File "/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/array/core.py", line 1597, in store
    r = store([self], [target], **kwargs)
  File "/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/array/core.py", line 1076, in store
    compute_as_if_collection(Array, store_dsk, map_keys, **kwargs)
  File "/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/base.py", line 315, in compute_as_if_collection
    return schedule(dsk2, keys, **kwargs)
  File "/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/multiprocessing.py", line 219, in get
    result = get_async(
  File "/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/local.py", line 495, in get_async
    fire_tasks(chunksize)
  File "/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/dask/local.py", line 477, in fire_tasks
    dumps((dsk[key], data)),
  File "/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 602, in dump
    return Pickler.dump(self, obj)
TypeError: cannot pickle '_thread.lock' object

full code is here, but it's not MWE:

import time
import warnings
from astropy.table import Table
from spectral_cube import SpectralCube
from astropy.io import fits
import dask

from statcont.cont_finding import c_sigmaclip_scube

import glob

import tempfile

import os

# for zarr storage
os.environ['TMPDIR'] = '/blue/adamginsburg/adamginsburg/tmp'

if __name__ == "__main__":
    # need to be in main block for dask to work
    #from dask.distributed import Client
    #if os.getenv('SLURM_MEM_PER_NODE'):
    #    memlim_total = int(os.getenv('SLURM_MEM_PER_NODE')) / 1024 # GB
    #    ntasks = int(os.getenv('SLURM_NTASKS'))
    #    memlim = memlim_total / ntasks
    #    print(f"Memory limit is {memlim} GB")
    #else:
    #    memlim = 1
    #    ntasks = 8
    #client = Client(memory_limit=f'{memlim}GB', n_workers=ntasks)
    #nworkers = len(client.scheduler_info()['workers'])
    #print(f"Client scheduler info: {client.scheduler_info()['services']}")
    #print(f"Number of workers: {nworkers}  (should be equal to ntasks={ntasks})")
    #print(f"Client scheduler info: {client.scheduler_info()}")
    #print(f"Client vers: {client.get_versions(check=True)}")
    if os.getenv('ENVIRONMENT') == 'BATCH':
        pass
    else:
        from dask.diagnostics import ProgressBar
        pbar = ProgressBar()
        pbar.register()

    nthreads = os.getenv('SLURM_NTASKS')
    if nthreads:
        nthreads = int(nthreads)
        dask.config.set(scheduler='threads')
    else:
        dask.config.set(scheduler='synchronous')

    scheduler = dask.config.get('scheduler')
    print(f"Using {nthreads} threads with the {scheduler} scheduler")

    assert tempfile.gettempdir() == '/blue/adamginsburg/adamginsburg/tmp'

    redo = False

    basepath = '/orange/adamginsburg/ALMA_IMF/2017.1.01355.L/imaging_results'
    basepath = '/blue/adamginsburg/adamginsburg/almaimf/workdir'

    tbl = Table.read('/orange/adamginsburg/web/secure/ALMA-IMF/tables/cube_stats.ecsv')

    def get_size(start_path='.'):
        total_size = 0
        for dirpath, dirnames, filenames in os.walk(start_path):
            for f in filenames:
                fp = os.path.join(dirpath, f)
                # skip if it is symbolic link
                if not os.path.islink(fp):
                    total_size += os.path.getsize(fp)

        return total_size

    # simpler approach
    #sizes = {fn: get_size(fn) for fn in glob.glob(f"{basepath}/*_12M_spw[0-9].image")}
    filenames = list(glob.glob(f"{basepath}/*_12M_sio.JvM.image.pbcor.fits"))

    # use tbl, ignore 7m12m
    sizes = {ii: get_size(fn)
             for ii, fn in enumerate(filenames)
             if '_12M_' in fn and os.path.exists(fn)
            } # ignore 7m12m

    for ii in sorted(sizes, key=lambda x: sizes[x]):

        fn = filenames[ii]

        outfn = fn[:-5]+'.statcont.cont.fits'
        outcube = fn[:-5]+'.statcont.contsub.fits'

        if (not os.path.exists(outfn) and not os.path.exists(outcube)) or redo:
            t0 = time.time()

            # touch the file to allow parallel runs
            with open(outfn, 'w') as fh:
                fh.write("")

            print(f"{fn}->{outfn}, size={sizes[ii]/1024**3} GB")

            target_chunk_size = int(1e5)
            print(f"Target chunk size is {target_chunk_size}")
            cube = SpectralCube.read(fn, target_chunk_size=target_chunk_size, use_dask=True)
            # JvM is already minimized I hope
            # print(f"Minimizing {cube}")
            # with warnings.catch_warnings():
            #     warnings.simplefilter("ignore")
            #     cube = cube.minimal_subcube()
            print(cube)

            with warnings.catch_warnings():
                warnings.simplefilter("ignore")
                with cube.use_dask_scheduler('multiprocessing'):
                    print("Calculating noise")
                    noise = cube.std()

                    print("Sigma clipping")
                    result = c_sigmaclip_scube(cube, noise,
                                               verbose=True,
                                               save_to_tmp_dir=True)
                    print("Running the compute step")
                    data_to_write = result[1].compute()

                    print(f"Writing to FITS {outfn}")
                    fits.PrimaryHDU(data=data_to_write.value,
                                    header=cube[0].header).writeto(outfn,
                                                                   overwrite=True)

                    print(f"Writing contsub cube to {outcube}")
                    cube.allow_huge_operations=True
                    scube = cube - data_to_write
                    scube.write(outcube, overwrite=True)
            print(f"{fn} -> {outfn} in {time.time()-t0}s")
        else:
            print(f"Skipped {fn}")
keflavich commented 2 years ago

right, this could well just be a problem with the multiprocessing scheduler. If that's it, it's good to be aware of; we can list it among 'known issues'

keflavich commented 2 years ago

I solved this by changing the scheduler to threads