ratt-ru / dask-ms

Implementation of a dask/xarray dataset backed by a CASA MS
https://dask-ms.readthedocs.io
Other
19 stars 7 forks source link

Add `rechunk_by_size` functionality #284

Closed JSKenyon closed 1 year ago

JSKenyon commented 1 year ago

This PR aims to address #283 by adding a utility function which can be invoked to rechunk an xarray.Dataset such that all dask.Arrays have chunk sizes less than or equal to some specified size. This is handy when we want to read from one format but write to another which may have different limitations. The specific motivating case is reading from a measurement set and writing to zarr. This operation may fail when chunks exceed 2GB.

I believe that this first effort works but some additional functionality may be required including:

JSKenyon commented 1 year ago

Currently failing test appears to be minio related. Traceback as follows:

_____________________ ERROR at setup of test_minio_server ______________________

minio_client = PosixPath('/home/runner/.local/bin/mc')
minio_alias = 'testcloud', minio_user_key = 'abcdef1234567890'

    @pytest.fixture
    def minio_admin(minio_client, minio_alias, minio_user_key):
        minio = pytest.importorskip("minio")
        minio_admin = minio.MinioAdmin(minio_alias, binary_path=str(minio_client))
        # Add a user and give it readwrite access
        minio_admin.user_add(minio_user_key, minio_user_key)
>       minio_admin.policy_set("readwrite", user=minio_user_key)

daskms/conftest.py:343: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../.cache/pypoetry/virtualenvs/dask-ms-v9AZFZiS-py3.9/lib/python3.9/site-packages/minio/minioadmin.py:148: in policy_set
    return self._run(
../../../.cache/pypoetry/virtualenvs/dask-ms-v9AZFZiS-py3.9/lib/python3.9/site-packages/minio/minioadmin.py:47: in _run
    proc = subprocess.run(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
input = None, capture_output = True, timeout = None, check = True
popenargs = (['/home/runner/.local/bin/mc', '--json', 'admin', 'policy', 'attach', 'testcloud', ...],)
kwargs = {'env': None, 'stderr': -1, 'stdout': -1, 'text': True}
process = <Popen: returncode: 1 args: ['/home/runner/.local/bin/mc', '--json', 'admin'...>
stdout = '{"status":"error","error":{"message":"`attach` is not a recognized command. Get help using `--help` flag.","cause":{"message":"","error":{}},"type":"fatal"}}\n'
stderr = '', retcode = 1

    def run(*popenargs,
            input=None, capture_output=False, timeout=None, check=False, **kwargs):
        """Run command with arguments and return a CompletedProcess instance.

        The returned instance will have attributes args, returncode, stdout and
        stderr. By default, stdout and stderr are not captured, and those attributes
        will be None. Pass stdout=PIPE and/or stderr=PIPE in order to capture them.

        If check is True and the exit code was non-zero, it raises a
        CalledProcessError. The CalledProcessError object will have the return code
        in the returncode attribute, and output & stderr attributes if those streams
        were captured.

        If timeout is given, and the process takes too long, a TimeoutExpired
        exception will be raised.

        There is an optional argument "input", allowing you to
        pass bytes or a string to the subprocess's stdin.  If you use this argument
        you may not also use the Popen constructor's "stdin" argument, as
        it will be used internally.

        By default, all communication is in bytes, and therefore any "input" should
        be bytes, and the stdout and stderr will be bytes. If in text mode, any
        "input" should be a string, and stdout and stderr will be strings decoded
        according to locale encoding, or by "encoding" if set. Text mode is
        triggered by setting any of text, encoding, errors or universal_newlines.

        The other arguments are the same as for the Popen constructor.
        """
        if input is not None:
            if kwargs.get('stdin') is not None:
                raise ValueError('stdin and input arguments may not both be used.')
            kwargs['stdin'] = PIPE

        if capture_output:
            if kwargs.get('stdout') is not None or kwargs.get('stderr') is not None:
                raise ValueError('stdout and stderr arguments may not be used '
                                 'with capture_output.')
            kwargs['stdout'] = PIPE
            kwargs['stderr'] = PIPE

        with Popen(*popenargs, **kwargs) as process:
            try:
                stdout, stderr = process.communicate(input, timeout=timeout)
            except TimeoutExpired as exc:
                process.kill()
                if _mswindows:
                    # Windows accumulates the output in a single blocking
                    # read() call run on child threads, with the timeout
                    # being done in a join() on those threads.  communicate()
                    # _after_ kill() is required to collect that and add it
                    # to the exception.
                    exc.stdout, exc.stderr = process.communicate()
                else:
                    # POSIX _communicate already populated the output so
                    # far into the TimeoutExpired exception.
                    process.wait()
                raise
            except:  # Including KeyboardInterrupt, communicate handled that.
                process.kill()
                # We don't call process.wait() as .__exit__ does that for us.
                raise
            retcode = process.poll()
            if check and retcode:
>               raise CalledProcessError(retcode, process.args,
                                         output=stdout, stderr=stderr)
E               subprocess.CalledProcessError: Command '['/home/runner/.local/bin/mc', '--json', 'admin', 'policy', 'attach', 'testcloud', 'readwrite', '--user', 'abcdef1234567890']' returned non-zero exit status 1.
JSKenyon commented 1 year ago

OK, I can confirm that something in minio==17.1.16 breaks the test suite. Reverting for now.

sjperkins commented 1 year ago

OK, I can confirm that something in minio==17.1.16 breaks the test suite. Reverting for now.

This has been fixed on master. Could you merge or rebase the PR?

JSKenyon commented 1 year ago

I have merged in master.

sjperkins commented 1 year ago

Could you please add a HISTORY.rst entry?