ratt-ru / dask-ms

Implementation of a dask/xarray dataset backed by a CASA MS
https://dask-ms.readthedocs.io
Other
19 stars 7 forks source link

Chunking size mismatch introduced in dataset writing in version 0.2.12 #254

Closed bennahugo closed 1 year ago

bennahugo commented 1 year ago

Description

Upstream breakage caught by xova test cases

xova/apps/xova/app.py:107: in execute
    main_writes = xds_to_table(output_ds, args.output, "ALL",
../venvxova/lib/python3.8/site-packages/daskms/dask_ms.py:96: in xds_to_table
    out_ds = write_datasets(table_name, xds, columns,
../venvxova/lib/python3.8/site-packages/daskms/writes.py:725: in write_datasets
    write_datasets = _write_datasets(table, tp, datasets, columns,
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

table = '/tmp/tmpud07fd4u_averaged.ms'
table_proxy = TableProxy[/tmp/tmpud07fd4u_averaged.ms](table, /tmp/tmpud07fd4u_averaged.ms, ack=False,readonly=False,lockoptions=user,__executor_key__=/tmp/tmpud07fd4u_averaged.ms)
datasets = []
columns = ['ANTENNA1', 'ANTENNA2', 'ARRAY_ID', 'DATA', 'DATA_DESC_ID', 'EXPOSURE', ...]
descriptor = 'ms(False)', table_keywords = None, column_keywords = None

    def _write_datasets(table, table_proxy, datasets, columns, descriptor,
                        table_keywords, column_keywords):
        _, table_name, subtable = table_path_split(table)
        table_name = '::'.join((table_name, subtable)) if subtable else table_name
        row_orders = []

        # Put table and column keywords
        table_proxy.submit(_put_keywords, WRITELOCK,
                           table_keywords, column_keywords).result()

        # Sort datasets on (not has "ROWID", index) such that
        # datasets with ROWID's are handled first, while
        # those without (which imply appends to the MS)
        # are handled last
        sorted_datasets = sorted(enumerate(datasets),
                                 key=lambda t: ("ROWID" not in t[1].data_vars,
                                                t[0]))

        # Establish row orders for each dataset
        for di, ds in sorted_datasets:
            try:
                rowid = ds.ROWID.data
            except AttributeError:
                # Add operation
                # No ROWID's, assume they're missing from the table
                # and remaining datasets. Generate addrows
                # NOTE(sjperkins)
                # This could be somewhat brittle, but exists to
                # update MS empty subtables once they've been
                # created along with the main MS by a call to default_ms.
                # Users could also it to append rows to an existing table.
                # An xds_append_to_table may be a better solution...
                last_datasets = datasets[di:]
                last_row_orders = add_row_order_factory(table_proxy, last_datasets)

                # We don't inline the row ordering if it is derived
                # from the row sizes of provided arrays.
                # The range of possible dependencies are far too large to inline
                row_orders.extend([(False, lro) for lro in last_row_orders])
                # We have established row orders for all datasets
                # at this point, quit the loop
                break
            else:
                # Update operation
                # Generate row orderings from existing row IDs
                row_order = cached_row_order(rowid)

                # Inline the row ordering in the graph
                row_orders.append((True, row_order))

        assert len(row_orders) == len(datasets)

        datasets = []

        for (di, ds), (inline, row_order) in zip(sorted_datasets, row_orders):
            # Hold the variables representing array writes
            write_vars = {}

            # Generate a dask array for each column
            for column in columns:
                try:
                    variable = ds.data_vars[column]
                except KeyError:
                    log.warning("Ignoring '%s' not present "
                                "on dataset %d" % (column, di))
                    continue
                else:
                    full_dims = variable.dims
                    array = variable.data

                if not isinstance(array, da.Array):
                    raise TypeError("%s on dataset %d is not a dask Array "
                                    "but a %s" % (column, di, type(array)))

                args = [row_order, ("row",)]

                # We only need to pass in dimension extent arrays if
                # there is more than one chunk in any of the non-row columns.
                # In that case, we can putcol, otherwise putcolslice is required

                inlinable_arrays = [row_order]

                if (row_order.shape[0] != array.shape[0] or
                        row_order.chunks[0] != array.chunks[0]):
>                   raise ValueError(f"ROWID shape and/or chunking does "
                                     f"not match that of {column}")
E                   ValueError: ROWID shape and/or chunking does not match that of ANTENNA1

What I Did

Just force upgrade xova branch prepare-0.1.2 to use the latest dask-ms 0.2.12 and run the test suite

bennahugo commented 1 year ago

refer to discussion https://github.com/ratt-ru/xova/pull/28

JSKenyon commented 1 year ago

I am not 100% sure that this is a bug in dask-ms - it may just be that a previously silent, non-fatal error is now noisy. I suspect that ROWID is no longer consistent with the averaged data.

JSKenyon commented 1 year ago

I will try reproduce and let you know if it is something more malign.

bennahugo commented 1 year ago

the xova PR is now passing tests on fake and real data again. You can go from that branch directly to reproduce. Thanks @JSKenyon. It is not critically urgent we find a fix for this. The keywords exposure takes higher priority

JSKenyon commented 1 year ago

This is relatively simple, and it should probably be fixed in dask-ms. The cause of the problem is the fact that np.nan == np.nan returns False i.e. the comparison fails in the case of unknown chunk sizes.

bennahugo commented 1 year ago

Great thanks for finding it, you're much more familiar with the dask codebase than I am. We can probably punch a release and make xova do != 0.2.12 in the dependencies?

JSKenyon commented 1 year ago

Hmm, I cannot remember everything that has gone in since the last release. @sjperkins may have a better idea. This bug is actually a little fiddlier than I thought. Turns out that the nan on the dask.array is not actually a np.nan, it is a float('nan'). Just need to figure out how resilient the logic is when comparing these things.

bennahugo commented 1 year ago

lol I love how each library defines its own types instead of just relying on well defined defacto standard types in cpython....

bennahugo commented 1 year ago

Am I missing something here?

n [1]: float('nan')
Out[1]: nan

In [2]: type(float('nan'))
Out[2]: float

In [3]: import numpy as np

In [4]: np.nan
Out[4]: nan

In [5]: type(np.nan)
Out[5]: float

In [6]: float('nan') == np.nan
Out[6]: False

In [9]: from ctypes import *

In [12]: bin(cast(pointer(c_float(float('nan'))), POINTER(c_int64)).contents.value)
Out[12]: '0b1111111110000000000000000000000'

In [13]: bin(cast(pointer(c_float(np.nan)), POINTER(c_int64)).contents.value)
Out[13]: '0b1111111110000000000000000000000'

In [23]: bin(cast(pointer(c_float(np.nan)), POINTER(c_int64)).contents.value == cast(pointer(c_float(float('nan'))), POINTER(c_int64)).contents.value)
Out[23]: '0b1'

the two are bitwise equal IEEE754 representations?!

...
result. QNaN is represented by 0 or 1 as the sign bit, all 1s as
exponent, and a 0 as the left-most bit of the significand and at least
one 1 in the rest of the significand. SNaN is represented by 0 or
1 as the sign bit, all 1s as exponent, and a 1 as the left-most bit of
the significand and any string of bits for the remaining 22 bits. We
give below the representations of QNaN and SNaN.
bennahugo commented 1 year ago

Wait I'm being silly. nan cannot accept comparisons. You need something that catches the nan signal

In [24]: np.nan == np.nan
Out[24]: False

In [25]: np.isnan(np.nan)
Out[25]: True

In [26]: np.isnan(float('nan'))
Out[26]: True
JSKenyon commented 1 year ago

Yeah, math.isnan works too.

sjperkins commented 1 year ago

Thanks for the investigation and report. Have a fix incoming in https://github.com/ratt-ru/dask-ms/pull/256, but the CI is a bit broken at the moment.

sjperkins commented 1 year ago

Closed by #255