casangi / xradio

Xarray Radio Astronomy Data IO
https://xradio.readthedocs.io/en/latest/
Other
9 stars 5 forks source link

Create Weather Dataset #68

Closed Jan-Willem closed 7 months ago

Jan-Willem commented 10 months ago

Create a function that will convert a MS v2 weather table to a MS v4 weather dataset (https://docs.google.com/spreadsheets/d/14a6qMap9M5r_vjpLnaBKxsR9TF4azN5LVdOxLacOX-s/edit?usp=sharing).

The function should appear in xradio/vis/_vis_utils/_ms/msv4_sub_xdss.py (see create_ant_xds as an example) and be called in the convert_and_write_partition function (xradio/vis/_vis_utils/_ms/conversion.py) after ant_xds = create_ant_xds(in_file) and then saved to zarr under if storage_backend == "zarr":. The function read_processing_set (src/xradio/vis/read_processing_set.py) should also be updated so to that the datatset can be read.

FedeMPouzols commented 7 months ago

Hi @fdulwich , I took a look at this create_weather_xds() function. I think that it is very close to a working version in its current shape.

The main issue that I found is with the order of the dimensions. If you try this branch on example MSs you'll probably get errors like:

`ValueError: conflicting sizes for dimension 'antenna_id': length 1 on 'antenna_id' and length 276 on {'time': 'PRESSURE', 'antenna_id': 'PRESSURE'}

That is because in this weather_xds we are now using (time, antenna_id,...) ordering but in the earlier MS/casacore read functions we were using (antenna_id, time...) ordering. This can be fixed by applying a change like this to create_weather_xds()

         "wind_speed": "WIND_SPEED",
     }
     data_variable_dims = {
-        "h2o": ["time", "antenna_id"],
-        "ionos_electron": ["time", "antenna_id"],
-        "pressure": ["time", "antenna_id"],
-        "rel_humidity": ["time", "antenna_id"],
-        "temperature": ["time", "antenna_id"],
-        "dew_point": ["time", "antenna_id"],
-        "wind_direction": ["time", "antenna_id"],
-        "wind_speed": ["time", "antenna_id"],
+        "h2o": ["antenna_id", "time", ],
+        "ionos_electron": ["antenna_id", "time"],
+        "pressure": ["antenna_id", "time"],
+        "rel_humidity": ["antenna_id", "time"],
+        "temperature": ["antenna_id", "time"],
+        "dew_point": ["antenna_id", "time"],
+        "wind_direction": ["antenna_id", "time"],
+        "wind_speed": ["antenna_id", "time"],
     }

Or alternatively we can change the order of the dimensions in read.py:redimension_ms_subtable().

Now that this came up I'm not sure if (time, antenna_id,...) is necessarily the only or best order for this table. In the pointing_xds we are using the same order but I'm equally not sure if that's the most natural order. In any case it is just a matter of order of dimensions when creating the data arrays / giving the list of dimensions of the data vars.

Then if we want to move from "antenna_id" to "station_id" we'd need several changes I think. There are some comments in the spreadsheet/WEATHER and I probably missed the discussion so I'm not sure if the new station_id is just a rename or if we want to re-generate indices. For a start, something along the lines of this diff would apply the rename to "station_id" and produce the coords and dims that I think gets us closer to the specs:

diff --git a/src/xradio/vis/_vis_utils/_ms/msv4_sub_xdss.py b/src/xradio/vis/_vis_utils/_ms/msv4_sub_xdss.py
index f17dfd3..fd06bbe 100644
--- a/src/xradio/vis/_vis_utils/_ms/msv4_sub_xdss.py
+++ b/src/xradio/vis/_vis_utils/_ms/msv4_sub_xdss.py
@@ -114,22 +114,23 @@ def create_weather_xds(in_file: str):
         "wind_speed": "WIND_SPEED",
     }
     data_variable_dims = {
-        "h2o": ["time", "antenna_id"],
-        "ionos_electron": ["time", "antenna_id"],
-        "pressure": ["time", "antenna_id"],
-        "rel_humidity": ["time", "antenna_id"],
-        "temperature": ["time", "antenna_id"],
-        "dew_point": ["time", "antenna_id"],
-        "wind_direction": ["time", "antenna_id"],
-        "wind_speed": ["time", "antenna_id"],
+        "h2o": ["station_id", "time", ],
+        "ionos_electron": ["station_id", "time"],
+        "pressure": ["station_id", "time"],
+        "rel_humidity": ["station_id", "time"],
+        "temperature": ["station_id", "time"],
+        "dew_point": ["station_id", "time"],
+        "wind_direction": ["station_id", "time"],
+        "wind_speed": ["station_id", "time"],
     }
     to_new_coord_names = {
-        "time": "time",
-        "antenna_id": "antenna_id",
+        # No MS data cols are turned into xds coords
     }
     coord_dims = {
-        "time": ["time"],
-        "antenna_id": ["antenna_id"],
+        # No MS data cols are turned into xds coords
+    }
+    to_new_dim_names = {
+        "antenna_id": "station_id",
     }

     # Read WEATHER table into a Xarray Dataset.
@@ -138,6 +139,7 @@ def create_weather_xds(in_file: str):
         "WEATHER",
         rename_ids=subt_rename_ids["WEATHER"],
     )
+    generic_weather_xds = generic_weather_xds.rename_dims(to_new_dim_names)

     weather_column_description = generic_weather_xds.attrs["other"]["msv2"]["ctds_attrs"][
         "column_descriptions"
@@ -148,8 +150,8 @@ def create_weather_xds(in_file: str):
     weather_xds = xr.Dataset()

     coords = {
-        "antenna_id": np.arange(generic_weather_xds.dims["antenna_id"]),
-        "xyz_label": ["x", "y", "z"],
+        "station_id": np.arange(generic_weather_xds.dims["station_id"]),
+        "time": generic_weather_xds["time"],
     }
     for key in generic_weather_xds:
         msv4_measure = column_description_casacore_to_msv4_measure(

When looking into this I saw more potential tricky details that I'm not sure we have discussed. Form what I see in a few test MSs, a relatively common practice in the WEATHER subtable is:

  1. to leave the "ANTENNA_ID" to -1!
  2. (sometimes, not always) to use a "non-standard extension" column, NS_WX_STATION_ID (+ another non-standard NS_WX_STATION_POSITION

From what I see in the CASA "ASDM=>MS filler" this is always done when importing ASDM into MS. That NS_WX_STATION_ID is taken directly from the stationId key of the ASDM WEATHER table. If we wanted to use it we'd need to take its values to define the "station_id" coord (the coords = { ... dict above.

fdulwich commented 7 months ago

Thanks for taking a look at this, much appreciated. Indeed, I had noticed the ValueError: conflicting sizes for dimension 'antenna_id' message, but wasn't sure how best to go about fixing that, given the issue with potentially having to re-map the antenna and weather station IDs (and I was wondering about the -1 for the antenna IDs, too!).

I'll add your diffs to the branch, and we can think about how to accommodate the optional non-standard NS_WX_STATION_ID. I agree there are still quite a few tricky details to figure out.

FedeMPouzols commented 7 months ago

Hi @fdulwich , @Jan-Willem , I think that the following diff would bring this branch up to date with the spreadsheet and has the code to deal with the antenna_id=-1 issue:

diff --git a/src/xradio/vis/_vis_utils/_ms/_tables/read.py b/src/xradio/vis/_vis_utils/_ms/_tables/read.py
index 84ff733..2211ab3 100644
--- a/src/xradio/vis/_vis_utils/_ms/_tables/read.py
+++ b/src/xradio/vis/_vis_utils/_ms/_tables/read.py
@@ -483,6 +483,14 @@ def read_generic_cols(
         if col.endswith("_ID") or (
             infile.endswith(subts_with_time_key) and col == "TIME"
         ):
+            # weather table: importasdm produces very wrong "-1" ANTENNA_ID
+            if (
+                infile.endswith("WEATHER")
+                and col == "ANTENNA_ID"
+                and "NS_WX_STATION_ID" in colnames
+            ):
+                data = np.stack([row["NS_WX_STATION_ID"] for row in trows])
+
             mcoords[col.lower()] = xr.DataArray(
                 data,
                 dims=[
diff --git a/src/xradio/vis/_vis_utils/_ms/conversion.py b/src/xradio/vis/_vis_utils/_ms/conversion.py
index bd5f80a..31eaf50 100644
--- a/src/xradio/vis/_vis_utils/_ms/conversion.py
+++ b/src/xradio/vis/_vis_utils/_ms/conversion.py
@@ -381,7 +381,8 @@ def convert_and_write_partition(
             if storage_backend == "zarr":
                 xds.to_zarr(store=file_name + "/MAIN", mode=mode)
                 ant_xds.to_zarr(store=file_name + "/ANTENNA", mode=mode)
-                weather_xds.to_zarr(store=file_name + "/WEATHER", mode=mode)
+                if weather_xds:
+                    weather_xds.to_zarr(store=file_name + "/WEATHER", mode=mode)
             elif storage_backend == "netcdf":
                 # xds.to_netcdf(path=file_name+"/MAIN", mode=mode) #Does not work
                 raise
diff --git a/src/xradio/vis/_vis_utils/_ms/msv4_sub_xdss.py b/src/xradio/vis/_vis_utils/_ms/msv4_sub_xdss.py
index e248494..f16f9fa 100644
--- a/src/xradio/vis/_vis_utils/_ms/msv4_sub_xdss.py
+++ b/src/xradio/vis/_vis_utils/_ms/msv4_sub_xdss.py
@@ -134,23 +134,27 @@ def create_weather_xds(in_file: str):
     }

     # Read WEATHER table into a Xarray Dataset.
-    generic_weather_xds = read_generic_table(
-        in_file,
-        "WEATHER",
-        rename_ids=subt_rename_ids["WEATHER"],
-    )
+    try:
+        generic_weather_xds = read_generic_table(
+            in_file,
+            "WEATHER",
+            rename_ids=subt_rename_ids["WEATHER"],
+        )
+    except ValueError as _exc:
+        return None
+
     generic_weather_xds = generic_weather_xds.rename_dims(to_new_dim_names)

-    weather_column_description = generic_weather_xds.attrs["other"]["msv2"]["ctds_attrs"][
-        "column_descriptions"
-    ]
+    weather_column_description = generic_weather_xds.attrs["other"]["msv2"][
+        "ctds_attrs"
+    ]["column_descriptions"]
     # ['ANTENNA_ID', 'TIME', 'INTERVAL', 'H2O', 'IONOS_ELECTRON',
     #  'PRESSURE', 'REL_HUMIDITY', 'TEMPERATURE', 'DEW_POINT',
     #  'WIND_DIRECTION', 'WIND_SPEED']
     weather_xds = xr.Dataset()

     coords = {
-        "station_id": np.arange(generic_weather_xds.dims["station_id"]),
+        "station_id": generic_weather_xds["station_id"],
         "time": generic_weather_xds["time"],
     }
     for key in generic_weather_xds:
@@ -167,9 +171,7 @@ def create_weather_xds(in_file: str):
                 weather_xds[var_name].attrs.update(msv4_measure)

             if key in ["interval"]:
-                weather_xds[var_name].attrs.update(
-                    {"units": ["s"], "type": "quantity"}
-                )
+                weather_xds[var_name].attrs.update({"units": ["s"], "type": "quantity"})
             elif key in ["h2o"]:
                 weather_xds[var_name].attrs.update(
                     {"units": ["/m^2"], "type": "quantity"}
@@ -183,17 +185,11 @@ def create_weather_xds(in_file: str):
                     {"units": ["Pa"], "type": "quantity"}
                 )
             elif key in ["rel_humidity"]:
-                weather_xds[var_name].attrs.update(
-                    {"units": ["%"], "type": "quantity"}
-                )
+                weather_xds[var_name].attrs.update({"units": ["%"], "type": "quantity"})
             elif key in ["temperature"]:
-                weather_xds[var_name].attrs.update(
-                    {"units": ["K"], "type": "quantity"}
-                )
+                weather_xds[var_name].attrs.update({"units": ["K"], "type": "quantity"})
             elif key in ["dew_point"]:
-                weather_xds[var_name].attrs.update(
-                    {"units": ["K"], "type": "quantity"}
-                )
+                weather_xds[var_name].attrs.update({"units": ["K"], "type": "quantity"})
             elif key in ["wind_direction"]:
                 weather_xds[var_name].attrs.update(
                     {"units": ["rad"], "type": "quantity"}
diff --git a/src/xradio/vis/load_processing_set.py b/src/xradio/vis/load_processing_set.py
index bae88f0..a9490f4 100644
--- a/src/xradio/vis/load_processing_set.py
+++ b/src/xradio/vis/load_processing_set.py
@@ -70,12 +70,21 @@ def _load_ms_xds_core(ms_xds_name, slice_dict):
     )
     sub_xds = {
         "antenna_xds": "ANTENNA",
-        "weather_xds": "WEATHER",
     }
     for sub_xds_key, sub_xds_name in sub_xds.items():
         ms_xds.attrs[sub_xds_key] = _load_no_dask_zarr(
             zarr_name=os.path.join(ms_xds_name, sub_xds_name)
         )
+    optional_sub_xds = {
+        "weather_xds": "WEATHER",
+    }
+    for sub_xds_key, sub_xds_name in sub_xds.items():
+        sub_xds_path = os.path.join(ms_xds_name, sub_xds_name)
+        if os.path.isdir(sub_xds_path):
+            ms_xds.attrs[sub_xds_key] = _load_no_dask_zarr(
+                zarr_name=os.path.join(ms_xds_name, sub_xds_name)
+            )
+
     return ms_xds

@@ -112,8 +121,8 @@ def _load_no_dask_zarr(zarr_name, slice_dict={}):
             coord = var[
                 slice_dict_complete[var_attrs[DIMENSION_KEY][0]]
             ]  # Dimension coordinates.
-            del var_attrs['_ARRAY_DIMENSIONS']
-            xds = xds.assign_coords({var_name:coord})
+            del var_attrs["_ARRAY_DIMENSIONS"]
+            xds = xds.assign_coords({var_name: coord})
             xds[var_name].attrs = var_attrs
         else:
             # Construct slicing
@@ -124,12 +133,11 @@ def _load_no_dask_zarr(zarr_name, slice_dict={}):
             xds[var_name] = xr.DataArray(
                 var[slicing_tuple], dims=var_attrs[DIMENSION_KEY]
             )
-            
-            if 'coordinates' in var_attrs:
-                del var_attrs['coordinates']
-            del var_attrs['_ARRAY_DIMENSIONS']
-            xds[var_name].attrs = var_attrs

+            if "coordinates" in var_attrs:
+                del var_attrs["coordinates"]
+            del var_attrs["_ARRAY_DIMENSIONS"]
+            xds[var_name].attrs = var_attrs

     xds.attrs = group_attrs

diff --git a/src/xradio/vis/read_processing_set.py b/src/xradio/vis/read_processing_set.py
index 7337df5..85bab5b 100644
--- a/src/xradio/vis/read_processing_set.py
+++ b/src/xradio/vis/read_processing_set.py
@@ -16,11 +16,18 @@ def read_processing_set(ps_name, intents=None, fields=None):
                     ps[i] = xds
                     sub_xds = {
                         "antenna_xds": "ANTENNA",
-                        "weather_xds": "WEATHER",
                     }
                     for sub_xds_key, sub_xds_name in sub_xds.items():
                         ps[i].attrs[sub_xds_key] = xr.open_zarr(
                             ps_name + "/" + i + "/" + sub_xds_name
                         )

+                    optional_sub_xds = {
+                        "weather_xds": "WEATHER",
+                    }
+                    for sub_xds_key, sub_xds_name in optional_sub_xds.items():
+                        sub_xds_path = ps_name + "/" + i + "/" + sub_xds_name
+                        if os.path.isdir(sub_xds_path):
+                            ps[i].attrs[sub_xds_key] = xr.open_zarr(sub_xds_path)
+
     return ps

The diff also includes some spaces/quotes fixes from black, just to simplify merging with follow-up branches. This should apply without conflicts on this branch. I'm trying to attach the diff as a file but there must be something github say they don't support.