i4Ds / Karabo-Pipeline

The Karabo Pipeline can be used as Digital Twin for SKA
https://i4ds.github.io/Karabo-Pipeline/
MIT License
11 stars 4 forks source link

Add MALS sky #562

Closed Lukas113 closed 5 months ago

Lukas113 commented 6 months ago

Yesterday I got an e-mail that a new MALS-survey from MeerKAT got released. Since MeerKAT is a precursor of SKA. I was wondering whether we should provide the survey through Karabo. It has a catalogue available for more than 700k unique sources, created using PyBDSF. @lmachadopolettivalle and @sfiruch what do you think about this?

Website: https://mals.iucaa.in/

CSV-col-desc: https://iopscience.iop.org/article/10.3847/1538-4365/acf7b9#apjsacf7b9t3

There are some inconveniences to provide the survey as a Karabo-Sky. One of them is that the full catalogue is just available as .csv and is larger than 3.6GB, which currently takes more than an hour to download from their server. However, I was able to download the .csv and convert it to a .fits file, where the remaining data is saved more efficiently and filtered with the relevant SkyModel.sources data. The code looks roughly as follows, where some parts obviously need some adaptions:

from __future__ import annotations

import os
import shutil
import tempfile
import warnings
from collections.abc import Iterable, Iterator

import astropy.units as u
import numpy as np
import pandas as pd
import requests
from astropy.coordinates import Angle
from astropy.io import ascii, fits
from astropy.table.table import Table
from astropy.units import Quantity, UnitBase
from karabo.data.external_data import DownloadObject
from karabo.util._types import DirPathType, FilePathType

def create_MALS_survey_as_fits(
    directory: DirPathType,
    version: int = 3,
    check_for_updates: bool = False,
    verbose: bool = True,
) -> str:
    directory = str(directory)
    try:
        version = int(version)  # run-time check
    except ValueError as e:
        raise TypeError(f"{version=} must be an integer!") from e
    base_url = "https://mals.iucaa.in"
    remote_path = "/catalogue/"
    fname_csv_template = "catalogue_malsdr1v{0}_all.csv"
    fname_csv = fname_csv_template.format(version)
    url = base_url + remote_path + fname_csv
    if check_for_updates:
        update_address = base_url + remote_path + fname_csv_template.format(version + 1)
        if DownloadObject.is_url_available(update_address):
            print(
                f"An updated version of {url} is available! Please have a look at {base_url}"
            )
    local_csv_file = os.path.join(directory, fname_csv)
    if not local_csv_file.endswith(
        ".csv"
    ):  # should be impossible, but just for double-checking
        raise ValueError(f"{local_csv_file=} is not a valid csv file!")
    local_fits_file = local_csv_file[:-4] + ".fits.gz"
    if os.path.exists(local_fits_file):
        if verbose:
            print(f"{local_fits_file} already exists.")
        return local_fits_file
    if not os.path.exists(local_csv_file):
        _ = DownloadObject.download(
            url=url, local_file_path=local_csv_file, verify=False, verbose=verbose
        )  # about 3.6GB, may take VERY long

    if verbose:
        "Read csv from disk ..."
    df = pd.read_csv(local_csv_file)  # takes about 40-45 s
    cols_of_interest = [
        "ra_max",
        "dec_max",
        "peak_flux",
        "ref_freq",
        "maj",
        "min",
        "pa",
        "source_name",
        "pointing_name",  # uniqueness is only ensured for a combination of pointing-id and spw-id
        "spw_id",
    ]
    units: dict[str, UnitBase] = {
        "ra_max": u.deg,
        "dec_max": u.deg,
        "peak_flux": u.mJy / u.beam,
        "ref_freq": u.MHz,
        "maj": u.arcsec,
        "min": u.arcsec,
        "pa": u.deg,
    }
    table = Table.from_pandas(df[cols_of_interest], units=units)
    if verbose:
        print(f"Creating {local_fits_file} ...")
    with tempfile.TemporaryDirectory() as tmpdir:
        tmp_fits = os.path.join(tmpdir, "mals.fits")
        table.write(tmp_fits, format="fits")  # about 368MB
        with fits.open(tmp_fits) as f:
            f.writeto(local_fits_file)

    return local_fits_file

fits_file = create_MALS_survey_as_fits(
    directory="./",
)
from karabo.simulation.sky_model import SkySourcesUnits, SkyModel, SkyPrefixMapping

fits_file = "catalogue_malsdr1v3_all.fits.gz"
unit_mapping: dict[str, UnitBase] = {
    "deg": u.deg,
    "beam-1 mJy": u.mJy / u.beam,
    "MHz": u.MHz,
    "arcsec": u.arcsec,
}
prefix_mapping = SkyPrefixMapping(
    ra="ra_max",
    dec="dec_max",
    stokes_i="peak_flux",
    ref_freq="ref_freq",
    major="maj",
    minor="min",
    pa="pa",
    id="source_name",
)
unit_sources = SkySourcesUnits(
    stokes_i=u.Jy / u.beam,
)
sky = SkyModel.get_sky_model_from_fits(
    fits_file=fits_file,
    prefix_mapping=prefix_mapping,
    unit_mapping=unit_mapping,
    units_sources=unit_sources,
    min_freq=976.4e6,
    max_freq=1036.5e6,
)
sfiruch commented 6 months ago

Yeah, totally, this is awesome. And it looks like an great catalog, covering more! Might be useful for MMODAS?

If the license allows it (couldn't find it with a quick glance), redistributing it from CSCS object store would be nice. .fits.gz is probably even smaller, and supported by most FITS libraries.

Can you make sure to reference the authors/source of the dataset in the docstring?

Lukas113 commented 5 months ago

Creating a .fits.gz does compress the .fits file from about 368MB to ~200MB (gzip=206MB, bzip2=196MB).

Yeah referencing it in the docstring is what also what came to my mind. Similar as GLEAM and MIGHTEE docstring references. The only sentences I've found for referencing MALS are:

Lukas113 commented 5 months ago

I've looked into the survey a little more. It is important to know that this is not a wide-field survey like GLEAM which was created by some drift-scans. The MALS survey is created by source-tracking of 391 different sources with a specified radius of the sky. The coverage is looking as follows:

image image

Therefore, I think we should provide an additonal function which is able to get these positions as RA/DEC (MALS format of pointing_name is: JHHMMSS.ss ± DDMMSS.s, which is the only info to get the phase-center)

Therefore, I've created an additional function to easily get the RA/DEC in degrees from these strings. However, they have to get extracted from the .fits file accordingly.

def get_pos_ids_to_ra_dec(
    pos_ids: Iterable[str] | str,
) -> dict[str, tuple[float, float]]:
    """Converts position-id(s) from str to ra-dec in degrees.

    The supported format(s) of `pos_ids` is 'JHHMMSS(.s+)±DDMMSS(.s+)' (J2000) ,
    where each pos-id has to follow the same format. This means e.g. if the first
    pos-id has the format 'JHHMMSS.ss±DDMMSS.s', then having other pos-ids
    whth another format like 'JHHMMSS.s±DDMMSS' is not allowed nor checked
    in this function.

    A valid example of `pos_ids` (here just str instead of an iterable) is
    `pos_ids`="J130508.50-285042.0", which results in RA≈196.285, DEC≈-28.845

    Args:
        pos_ids: Position-id(s), which all must follow the same supported format
        for positional-parsing and unit-encoding.

    Returns:
        Dict from pos-id to ra-dec-tuple.
    """
    UnitMatchType = dict[str, list[int]]
    if isinstance(pos_ids, str):
        pos_ids = [pos_ids]

    def get_unit_matches_idxs(formatted_pos: str) -> UnitMatchType:
        """Gets indices of `formatted_pos` units.

        Supports the following formats:
            - ra-format='JHHMMSS(.s+)'
            - dec-format='±DDMMSS(.s+)'

        Args:
            formatted_pos: Splitted position-id for positional

        Returns:
            Unit-char to positions as dict[str, list[int]].
        """
        unit_idxs: UnitMatchType = dict()
        if (first_char := formatted_pos[0]) == "J":
            ra = True
            format_ = "JHHMMSS"  # positional encoded units
            units = ("h", "m", "s")  # used units in `format_`
        elif first_char == "+" or first_char == "-":
            ra = False
            format_ = "±DDMMSS"
            units = ("d", "m", "s")  # order according to `format_` and `Angle`
        else:
            raise ValueError(
                f"First value of {formatted_pos} is expected to be 'J', '+' or '-'"
            )
        formatted_pos_splitted = formatted_pos.split(".")
        if len(formatted_pos_splitted) > 2:
            raise ValueError(
                f"{formatted_pos=} contains multiple dot-separators "
                + "which is not supported"
            )
        if len(formatted_pos_splitted[0]) != len(format_):
            raise ValueError(
                f"{formatted_pos=} doesn't follow format: {format_ + '(.s+)'}"
            )
        format_list = np.array(list(format_.lower()))

        for unit_char in units:
            match_idxs = tuple(np.where(format_list == unit_char)[0])
            if (
                unit_char == format_list[-1] and len(formatted_pos_splitted) == 2
            ):  # handle (.s+)
                point_idx = int(np.where(np.array(list(formatted_pos)) == ".")[0][0])
                extension_idxs = tuple(range(point_idx, len(formatted_pos)))
                match_idxs = match_idxs + extension_idxs
            if len(match_idxs) > 0:
                unit_idxs[unit_char] = match_idxs
        return unit_idxs

    def get_sign(pos_id: str) -> str:
        """Extracts the pos-id separator.

        Allowed separators are '+' and '-'.

        Args:
            pos_id: Position-id to extract sign-separator from.

        Returns:
            Sign separator.
        """
        if "+" in pos_id:
            sign = "+"
        elif "-" in pos_id:
            sign = "-"
        else:
            raise ValueError(
                f"{pos_id} doesn't contain a valid sign-separator '+' or '-'"
            )
        return sign

    # just check first pos-id to not have to call `get_unit_matches_idxs` for each id
    sample_pos_id = pos_ids[0]
    sample_sign = get_sign(sample_pos_id)
    sample_ra_str, sample_dec_str = sample_pos_id.split(sep=sample_sign, maxsplit=1)
    sample_dec_str = f"{sample_sign}{sample_dec_str}"
    ra_unit_matches = get_unit_matches_idxs(formatted_pos=sample_ra_str)
    dec_unit_matches = get_unit_matches_idxs(formatted_pos=sample_dec_str)

    def create_angle_str(formatted_pos: str, unit_matches: UnitMatchType) -> str:
        """Creates `astropy.coordinates.Angle` compatible string.

        Args:
            formatted_pos: RA or DEC string from a single pos-id.
            unit_matches: Unit positional indices. order must follow
                (h, m, s) or (d, m, s).

        Returns:
            Angle string.
        """
        angle_str = ""
        if (first_char := formatted_pos[0]) == "+" or first_char == "-":
            angle_str = first_char
        for unit_char, unit_idxs in unit_matches.items():
            angle_str += (
                f"{formatted_pos[unit_idxs[0]:unit_idxs[-1]+1]}{unit_char}"
            )
        return angle_str

    def convert_pos_id_to_ra_dec(pos_id: str) -> tuple[float, float]:
        """Converts a single pos-id to ra-dec tuple in degrees.

        Assumes that unit-matches for each unit are all consecutive.

        Args:
            pos_id: Positional-id to convert.

        Returns:
            RA-DEC tuple in deg.
        """
        sign = get_sign(pos_id=pos_id)
        ra_pos, dec_pos = pos_id.split(sign, maxsplit=1)
        dec_pos = f"{sign}{dec_pos}"  # keeps sign in dec-str
        ra_angle_str = create_angle_str(
            formatted_pos=ra_pos, unit_matches=ra_unit_matches
        )
        dec_angle_str = create_angle_str(
            formatted_pos=dec_pos, unit_matches=dec_unit_matches
        )
        ra = Angle(ra_angle_str).to(u.deg).value
        dec = Angle(dec_angle_str).to(u.deg).value
        return ra, dec

    ra_dec: dict[str, tuple[float, float]] = dict()
    for pos_id in pos_ids:
        ra_dec[pos_id] = convert_pos_id_to_ra_dec(
            pos_id=pos_id,
        )
    return ra_dec

df = pd.read_csv("catalogue_malsdr1v3_all.csv", usecols=["pointing_name"])
pos_ids = df["pointing_name"].unique()  # should come from .fits, not from .csv file
sky_coords = get_pos_ids_to_ra_dec(pos_ids=pos_ids)