sdss / astra

Analysis framework for SDSS-V/Milky Way Mapper
BSD 3-Clause "New" or "Revised" License
3 stars 0 forks source link

How tasks decide which spectra they will analyse #23

Open andycasey opened 1 month ago

andycasey commented 1 month ago

Some tasks run on all spectra, and some should only run on some kinds of spectra. In the narrowest case, CORV should only run on BOSS spectra assigned to the white dwarf carton, where SnowWhite has already classified that spectrum as a DA-type. In the broadest case, BOSSNet runs on any BOSS spectrum.

In the past I would specify this by setting defaults for the spectra argument in a task. For example:

spectra: Optional[Iterable[BossVisitSpectrum]] = (
        BossVisitSpectrum
        .select()
        .join(SnowWhite, on=(BossVisitSpectrum.spectrum_pk == SnowWhite.spectrum_pk))
        .switch(BossVisitSpectrum)
        .join(
            Corv,
            JOIN.LEFT_OUTER, 
            on=(
                (BossVisitSpectrum.spectrum_pk == Corv.spectrum_pk)
            &   (Corv.v_astra == __version__)
            )
        )
        .where(
            Corv.spectrum_pk.is_null()
        &   (SnowWhite.classification == "DA")
        )
    ),

This is problematic for a few reasons:

  1. It specifies that the default should be BossVisitSpectrum type, but in reality the task could take co-added spectra or visit spectra. Specifying the spectrum type in the default argument means that if we want to use a non-default type, we need to supply the classification constraints ourselves.
  2. When we call a task from the astra command line tool, we can easily scale a task across many nodes and processes. If we know what task we want to run, and what spectrum type to run it on (e.g., BossVisitSpectrum) then a 0th order balancing of this is easy: we simply get the number of spectra that we need to analyse, then paginate the SQL query across each node or processor. But that means that the astra CLI task needs to know whatever constraints the task has about what spectra it will, or will not accept. Otherwise it can't construct the query and paginate it efficiently across nodes and processes.

The requirements are:

The new task interface might look something like:

@task(
  spectrum_models=(
    BossVisitSpectrum, 
    BossRestFrameVisitSpectrum, 
    BossCombinedSpectrum
  ),
  inner_join=MappingProxyType({"snow_white": SnowWhite})
)
def corv(spectra: Iterable[Spectrum]):

  for spectrum in spectra:
    # We could check for spectrum.source.assigned_to_program("mwm_wd") but SnowWhite would have done that
    if (spectrum.snow_white.classification == "DA"):
      ...
    else:
      yield Corv.from_spectrum(spectrum, flag_not_processed=True)

The benefits are:

  1. No limit or page handling within the task.
  2. No ModelSelect logic within the task.
  3. We keep a 1-to-1 record of spectrum rows and pipeline rows, even if it adds table bloat, it prevents duplicates.
  4. If we supply no spectrum type to the astra CLI, it can figure out which spectra it should run, and distribute across all nodes/procs.

The downsides are:

  1. If you just want to run things interactively, you better make sure you give the spectra with the appropriate attrs (e.g., snow_white). This is already a problem in the current set up. We could mitigate this by having something like:
    generate_task_runs(corv, overwrite=False, limit=10)

    which would look at the task definition, see what spectra it accepts, create the queries with the necessary joins, only analyse new spectra (overwrite=False) or analyse everything (overwrite=True) and yields a query you can provide to corv.

andycasey commented 1 month ago

Any details about how the task should be executed should be in the local Astra config file, and not hard-coded to the path. This includes things like: