ratt-ru / dask-ms

Implementation of a dask/xarray dataset backed by a CASA MS
https://dask-ms.readthedocs.io
Other
19 stars 7 forks source link

read, join, and write MS #272

Open caseyjlaw opened 1 year ago

caseyjlaw commented 1 year ago

Description

I'd like to read multiple MS files and write one that joins along spectral window. Each MS file represents a single integration with a different spectral window and data have identical structure in each MS.

What I Did

I tried to follow the suggestion in the docs, but I may have an issue with the MS.

> ds = xds_from_ms('20221129_034113_15MHz.ms:SPECTRAL_WINDOW', group_cols="__row__")
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-75-b6c8fc404ff8> in <module>
----> 1 ds = xds_from_ms('20221129_034113_15MHz.ms:SPECTRAL_WINDOW', group_cols="__row__")

~/.conda/envs/development/lib/python3.6/site-packages/daskms/dask_ms.py in xds_from_ms(ms, columns, index_cols, group_cols, **kwargs)
    397                           index_cols=index_cols,
    398                           group_cols=group_cols,
--> 399                           **kwargs)
    400 
    401 

~/.conda/envs/development/lib/python3.6/site-packages/daskms/dask_ms.py in xds_from_table(table_name, columns, index_cols, group_cols, **kwargs)
    324     dask_datasets = DatasetFactory(table_name, columns,
    325                                    group_cols, index_cols,
--> 326                                    **kwargs).datasets()
    327 
    328     # Return dask datasets if xarray is not available

~/.conda/envs/development/lib/python3.6/site-packages/daskms/reads.py in __init__(self, table, select_cols, group_cols, index_cols, **kwargs)
    279     def __init__(self, table, select_cols, group_cols, index_cols, **kwargs):
    280         if not table_exists(table):
--> 281             raise ValueError("'%s' does not appear to be a CASA Table" % table)
    282 
    283         chunks = kwargs.pop('chunks', [{'row': _DEFAULT_ROW_CHUNKS}])

ValueError: '20221129_034113_15MHz.ms:SPECTRAL_WINDOW' does not appear to be a CASA Table

However, the file can be read at some level:

> ds = xds_from_ms('20221129_034113_15MHz.ms', group_cols=["FIELD_ID", "DATA_DESC_ID"])
> ds
[<xarray.Dataset>
 Dimensions:         (chan: 192, corr: 4, flagcat: 1, row: 62128, uvw: 3)
 Coordinates:
     ROWID           (row) int32 dask.array<chunksize=(10000,), meta=np.ndarray>
 Dimensions without coordinates: chan, corr, flagcat, row, uvw
 Data variables:
     ANTENNA1        (row) int32 dask.array<chunksize=(10000,), meta=np.ndarray>
     ANTENNA2        (row) int32 dask.array<chunksize=(10000,), meta=np.ndarray>
     ARRAY_ID        (row) int32 dask.array<chunksize=(10000,), meta=np.ndarray>
     DATA            (row, chan, corr) complex64 dask.array<chunksize=(10000, 192, 4), meta=np.ndarray>
     EXPOSURE        (row) float64 dask.array<chunksize=(10000,), meta=np.ndarray>
     FEED1           (row) int32 dask.array<chunksize=(10000,), meta=np.ndarray>
     FEED2           (row) int32 dask.array<chunksize=(10000,), meta=np.ndarray>
     FLAG            (row, chan, corr) bool dask.array<chunksize=(10000, 192, 4), meta=np.ndarray>
     FLAG_CATEGORY   (row, flagcat, chan, corr) bool dask.array<chunksize=(10000, 1, 192, 4), meta=np.ndarray>
     FLAG_ROW        (row) bool dask.array<chunksize=(10000,), meta=np.ndarray>
     INTERVAL        (row) float64 dask.array<chunksize=(10000,), meta=np.ndarray>
     OBSERVATION_ID  (row) int32 dask.array<chunksize=(10000,), meta=np.ndarray>
     PROCESSOR_ID    (row) int32 dask.array<chunksize=(10000,), meta=np.ndarray>
     SCAN_NUMBER     (row) int32 dask.array<chunksize=(10000,), meta=np.ndarray>
     SIGMA           (row, corr) float32 dask.array<chunksize=(10000, 4), meta=np.ndarray>
     STATE_ID        (row) int32 dask.array<chunksize=(10000,), meta=np.ndarray>
     TIME            (row) float64 dask.array<chunksize=(10000,), meta=np.ndarray>
     TIME_CENTROID   (row) float64 dask.array<chunksize=(10000,), meta=np.ndarray>
     UVW             (row, uvw) float64 dask.array<chunksize=(10000, 3), meta=np.ndarray>
     WEIGHT          (row, corr) float32 dask.array<chunksize=(10000, 4), meta=np.ndarray>
 Attributes:
     FIELD_ID:      0
     DATA_DESC_ID:  0]
landmanbester commented 1 year ago

I think you may just need a double colon for the subtable i.e. try with

ds = xds_from_ms('20221129_034113_15MHz.ms::SPECTRAL_WINDOW', group_cols="__row__")
caseyjlaw commented 1 year ago

Thanks for the quick reply. Sorry, but I actually pasted the wrong error. Below is what I get when using the proper syntax. Perhaps our MS is not valid somehow. Do you have a way to verify that the MS file meets the specification?

In [2]: ds = xds_from_ms('20221129_034113_15MHz.ms::SPECTRAL_WINDOW', group_cols="__row__")
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
~/.conda/envs/development/lib/python3.6/site-packages/daskms/table_proxy.py in __call__(cls, *args, **kwargs)
    158             try:
--> 159                 return _table_cache[key]
    160             except KeyError:

~/.conda/envs/development/lib/python3.6/weakref.py in __getitem__(self, key)
    136             self._commit_removals()
--> 137         o = self.data[key]()
    138         if o is None:

KeyError: -316058696854364970

During handling of the above exception, another exception occurred:

RuntimeError                              Traceback (most recent call last)
<ipython-input-2-ed052d620bc7> in <module>
----> 1 ds = xds_from_ms('20221129_034113_15MHz.ms::SPECTRAL_WINDOW', group_cols="__row__")

~/.conda/envs/development/lib/python3.6/site-packages/daskms/dask_ms.py in xds_from_ms(ms, columns, index_cols, group_cols, **kwargs)
    397                           index_cols=index_cols,
    398                           group_cols=group_cols,
--> 399                           **kwargs)
    400 
    401 

~/.conda/envs/development/lib/python3.6/site-packages/daskms/dask_ms.py in xds_from_table(table_name, columns, index_cols, group_cols, **kwargs)
    324     dask_datasets = DatasetFactory(table_name, columns,
    325                                    group_cols, index_cols,
--> 326                                    **kwargs).datasets()
    327 
    328     # Return dask datasets if xarray is not available

~/.conda/envs/development/lib/python3.6/site-packages/daskms/reads.py in datasets(self)
    397         elif len(self.group_cols) == 1 and self.group_cols[0] == "__row__":
    398             order_taql = ordering_taql(table_proxy, self.index_cols,
--> 399                                        self.taql_where)
    400             sorted_rows, row_runs = row_ordering(order_taql,
    401                                                  self.index_cols,

~/.conda/envs/development/lib/python3.6/site-packages/daskms/ordering.py in ordering_taql(table_proxy, index_cols, taql_where)
     74 
     75     return TableProxy(taql_factory, query, tables=[table_proxy],
---> 76                       __executor_key__=table_proxy.executor_key)
     77 
     78 

~/.conda/envs/development/lib/python3.6/site-packages/daskms/table_proxy.py in __call__(cls, *args, **kwargs)
    159                 return _table_cache[key]
    160             except KeyError:
--> 161                 instance = type.__call__(cls, *args, **kwargs)
    162                 _table_cache[key] = instance
    163                 return instance

~/.conda/envs/development/lib/python3.6/site-packages/daskms/table_proxy.py in __init__(self, factory, *args, **kwargs)
    341         # Private, should be inaccessible
    342         self._write = False
--> 343         self._writeable = ex.impl.submit(_iswriteable, table).result()
    344 
    345         should_be_writeable = not kwargs.get('readonly', True)

~/.conda/envs/development/lib/python3.6/concurrent/futures/_base.py in result(self, timeout)
    430                 raise CancelledError()
    431             elif self._state == FINISHED:
--> 432                 return self.__get_result()
    433             else:
    434                 raise TimeoutError()

~/.conda/envs/development/lib/python3.6/concurrent/futures/_base.py in __get_result(self)
    382     def __get_result(self):
    383         if self._exception:
--> 384             raise self._exception
    385         else:
    386             return self._result

~/.conda/envs/development/lib/python3.6/concurrent/futures/thread.py in run(self)
     54 
     55         try:
---> 56             result = self.fn(*self.args, **self.kwargs)
     57         except BaseException as exc:
     58             self.future.set_exception(exc)

~/.conda/envs/development/lib/python3.6/site-packages/daskms/table_proxy.py in _iswriteable(table_future)
    236 
    237 def _iswriteable(table_future):
--> 238     return table_future.result().iswritable()
    239 
    240 

~/.conda/envs/development/lib/python3.6/concurrent/futures/_base.py in result(self, timeout)
    423                 raise CancelledError()
    424             elif self._state == FINISHED:
--> 425                 return self.__get_result()
    426 
    427             self._condition.wait(timeout)

~/.conda/envs/development/lib/python3.6/concurrent/futures/_base.py in __get_result(self)
    382     def __get_result(self):
    383         if self._exception:
--> 384             raise self._exception
    385         else:
    386             return self._result

~/.conda/envs/development/lib/python3.6/concurrent/futures/thread.py in run(self)
     54 
     55         try:
---> 56             result = self.fn(*self.args, **self.kwargs)
     57         except BaseException as exc:
     58             self.future.set_exception(exc)

~/.conda/envs/development/lib/python3.6/site-packages/daskms/table_proxy.py in taql_factory(query, style, tables, readonly)
    189 
    190     try:
--> 191         return pt.taql(query, style=style, tables=tables)
    192     finally:
    193         for t in tables:

~/.conda/envs/development/lib/python3.6/site-packages/casacore/tables/table.py in taql(command, style, tables, globals, locals)
    152     if style:
    153         cmd = 'using style ' + style + ' ' + cmd
--> 154     tab = table(cmd, tabs, _oper=2)
    155     result = tab._getcalcresult()
    156     # If result is empty, it was a normal TaQL command resulting in a table.

~/.conda/envs/development/lib/python3.6/site-packages/casacore/tables/table.py in __init__(self, tablename, tabledesc, nrow, readonly, lockoptions, ack, dminfo, endian, memorytable, concatsubtables, _columnnames, _datatypes, _oper, _delete)
    327         elif _oper == 2:
    328             # This is the query or calc constructor.
--> 329             Table.__init__(self, tablename, tabledesc)
    330             if len(self._getcalcresult()) != 0:
    331                 # Do not make row object for a calc result

RuntimeError: Error in TaQL command: 'using style Python SELECT
    ROWID() as __tablerow__
FROM
    $1
ORDERBY
    TIME'
  Error in select expression: TIME is an unknown column (or keyword) in table /fast/claw/20221129_034113_15MHz.ms/SPECTRAL_WINDOW
sjperkins commented 1 year ago

Try using xds_from_table instead of xds_from_ms?

ds = xds_from_table('20221129_034113_15MHz.ms::SPECTRAL_WINDOW', group_cols="__row__")
caseyjlaw commented 1 year ago

Ok, that is working for me. I am running into other issues when I try to reproduce the tutorials from the documentation. I think my difficulty is that the docs assume a "TEST.MS" with a specific structure (esp that it has two datasets). Each of my MS files has one integration and one spectral window. I'd like an easy way to merge them together into a file with multiple spectral windows. Is that something that dask-ms can do easily?

JSKenyon commented 1 year ago

dask-ms can certainly do it as a virtual operation i.e. by concatenating the xarray.Dataset objects in memory. If your aim is to write a new ms that contains multiple spectral windows, that is probably doable but not necessarily simple. I think @sjperkins will be able to advise; I have limited experience with using dask-ms to author new measurement sets. The path of least resistance may be for @sjperkins or myself to add this functionality to dask-ms convert, as that should be guaranteed to have all the necessary metadata/subtables for writing a new measurement set - adding it as part of the xds_to_* functionality will likely be a bit fraught.

sjperkins commented 1 year ago

Ok, that is working for me. I am running into other issues when I try to reproduce the tutorials from the documentation. I think my difficulty is that the docs assume a "TEST.MS" with a specific structure (esp that it has two datasets).

There are three sources of information here that could be useful

Each of my MS files has one integration and one spectral window. I'd like an easy way to merge them together into a file with multiple spectral windows. Is that something that dask-ms can do easily?

It's doable but you need to do some book-keeping, particularly with respect to the:

An example of how this can be accomplished in an actual application is available here in our averaging application, xova which handles the SPW case as generally as possible.

If the structure of your input MS's are well-defined, you may want to exploit that instead of going with the above approach.