desihub / desispec

DESI spectral pipeline
BSD 3-Clause "New" or "Revised" License
35 stars 24 forks source link

Read spectra parallel #2169

Closed sbailey closed 6 months ago

sbailey commented 7 months ago

This PR that adds read_spectra_parallel(targets_table, ...) to simplify the process of reading N>>1 targets from M>>1 different files, taking subsets as needed and then stacking them back together into a single Spectra result. It supports multiprocessing or MPI for parallelism, and auto-derives from the columns of the targets_table whether to read tiles/cumulative spectra or healpix spectra.

The intended use case would be to read a zcatalog/ or equivalent redshift catalog, filter down to the targets you want, then call read_spectra_parallel to efficiently read them. Beware: you'll probably blow memory if you try to read 1M quasars in a single call, but this is still useful to read in batches by nside=16 healpix or groups of 10 tiles, etc.

This includes a suite of unit tests for tiles and healpix spectra, and multiprocessing and serial reading.

@dylanagreen @moustakas, this PR admittedly would have been more useful a few weeks ago, but could one of you take a look at this and kick the tires and let me know if it would make your life easier for the next time you need to read N>>1 targets, and/or broken pieces or feature requests (within reason; I reserve the right to punt future requests to other PRs if too complicated to implement).

moustakas commented 7 months ago

Thanks, this is a terrific contribution! I am happy to test this as part of my template work. My one quick thought about the top-level wrapper is that it's not obvious how to specify the specprod, e.g., iron vs fuji, etc. I think it's derived in findfile via an environment variable, but that's a little annoying. :) Could we add a specprod optional input at the top level?

sbailey commented 6 months ago

@moustakas good idea about more flexible specprod options. I added a read_spectra_parallel(..., specprod=SPECPROD) option which can take either the name of a specprod (e.g. "fuji") or the full path to a specprod (e.g. /global/cfs/cdirs/desi/spectro/redux/fuji), and those will override $DESI_SPECTRO_REDUX/$SPECPROD.

Under the hood, I implemented this in a more general way than just read_spectra_parallel, so now desispec.io.findfile also has a specprod option (instead of previously only supporting the full path though specprod_dir). Two non-backwards compatible implications:

sbailey commented 6 months ago

Thanks for the testing and the crash cases! I really thought I had the tests covering all the cases, alas. Will fix.

For sorting, the intended behavior is that the output spectra match the sort order of the input targets list, even if that is different from the order that the targets appear in the files on disk. I think matching input order is better than doing something like sorting on TARGETID. The unit tests cover some cases of input order = output order != disk order, but if you find a reproducer case where that isn't true, please post that as well.

sbailey commented 6 months ago

I've fixed the nproc=None (default) case, with some embarrassment that none of my unit tests had actually checked the default behavior (they do now)

Regarding your nproc=1, specprod=None (default) case:

The issue, of course, is that the code falls back to the $SPECPROD environment variable

I think the code is doing the right thing, sort of — if you don't specify specprod, it should use the $SPECPROD environment variable because the zz Table you provided has no knowledge in memory that it came from the iron production. Note that the Table(fitsio(..., rows=...)) wrapper stripped out the header info.

Although it wouldn't have helped in this case, I could see defining that if the input targets table is as astropy Table and has either SPECPROD or DEPNAMnn='SPECPROD', DEPVERnn=something combo in targets.meta, then that should should be used instead of $SPECPROD if the specprod input isn't set.

If we're going into checking for targets.meta, I might also support allowing targets.meta['SURVEY'] and targets.meta['PROGRAM'] to override missing SURVEY/PROGRAM columns.

Feature creep! While we're at it, we could augment desispec.io.read_table (that already uses Table(fitsio.read(...)) while re-adding the header to result.meta) to also support other fitsio-optimizations like columns and rows and then you could have your cake and eat it too as long as you like the desispec.io.read_table flavor of cake. UPDATE: done for read_table(..., rows, columns), but not for the other header parsing stuff in earlier paragraphs.

sbailey commented 6 months ago

@moustakas this now works, auto-deriving the SPECPROD, PROGRAM, and SURVEY from the catalog header:

from desispec.io import read_table, read_spectra_parallel
zz = read_table('/global/cfs/cdirs/desi/spectro/redux/iron/zcatalog/zpix-sv1-bright.fits', rows=[0, 300, 900])
ss = read_spectra_parallel(zz)
moustakas commented 6 months ago

Thanks @sbailey. Here's an example where the sorting isn't preserved (inspired by something that's actually happening to me in a different piece of code).

from desispec.io import read_table, read_spectra_parallel
zz = read_table('/global/cfs/cdirs/desi/spectro/redux/iron/zcatalog/zpix-sv1-bright.fits', rows=[0, 300, 900])
ss = read_spectra_parallel(zz[[2, 0, 1]])
ss.target_ids() == zz[[2, 0, 1]]['TARGETID']
  array([False, False, False])
sbailey commented 6 months ago

@moustakas got it, thanks for example. Underlying problem fixed, with tests to confirm. New arg match_order (default True) will ensure that the output order matches the input order. Setting match_order=False will reduce a temporary memory spike, but the output order may or may not match the input in that case.

moustakas commented 6 months ago

Thanks @sbailey. I apologize for not mentioning this earlier, but, I don't think it's sufficient to sort on targetid. I think for healpix coadds we need to sort on a "key" of survey-program-healpix-targetid and for cumulative coadds we need tileid-targetid. (I don't think we should support other types of coadds to avoid mission creep.)

Here's an example failure in the case that someone is trying to coadd spectra across surveys or programs (which is not a crazy use case)--

import fitsio, pdb
from astropy.table import Table
from desispec.io import read_spectra_parallel

zall = Table(fitsio.read('/global/cfs/cdirs/desi/spectro/redux/iron/zcatalog/zall-pix-iron.fits',
                       columns=['TARGETID', 'SURVEY', 'PROGRAM', 'HEALPIX', 'MAIN_NSPEC', 'MAIN_PRIMARY', 'TSNR2_LRG']))
# this is the first example where MAIN_PRIMARY == True and MAIN_NSPEC>1
zz = zall[zall['TARGETID'] == 616089273883427662]
zz = zz[[1, 0]]

ss = read_spectra_parallel(zz, specprod='iron')
print(ss.scores['TSNR2_LRG'] == zz['TSNR2_LRG'])
  [False False]

Also, oddly, target_ids() returns a scalar. Is this expected behavior?

ss.fibermap
<Table length=2>
     TARGETID      COADD_FIBERSTATUS     TARGET_RA          TARGET_DEC       PMRA   PMDEC  REF_EPOCH ... STD_FIBER_RA   MEAN_FIBER_DEC   STD_FIBER_DEC MEAN_PSF_TO_FIBER_SPECFLUX HEALPIX SURVEY PROGRAM
      int64              int32            float64            float64       float32 float32  float32  ...   float32         float64          float32             float32            int64   str4    str6
------------------ ----------------- ------------------ ------------------ ------- ------- --------- ... ------------ ------------------ ------------- -------------------------- ------- ------ -------
616089273883427662                 0 134.65160350096173 32.053057584507805     0.0     0.0       0.0 ...  0.010053255 32.053078586707976  0.0028417162                 0.79246867    5059   main  backup
616089273883427662               512 134.65160350096173 32.053057584507805     0.0     0.0       0.0 ...          0.0  32.05351351608743           0.0                   1.484397    5059   main  bright

ss.target_ids()
<Column name='TARGETID' dtype='int64' length=1>
616089273883427662
sbailey commented 6 months ago

@moustakas good point about TARGETIDs that span SURVEYs and PROGRAMs. Fixed, with tests. Anything else that I've missed?

Also, oddly, target_ids() returns a scalar. Is this expected behavior?

spectra.target_ids() returns the unique TARGETIDs that appear in the object, not the spectra.fibermap['TARGETID'] column which could have repeats. In your above example, it isn't returning a scalar, but it is returning an array/column of length 1 since there is only one unique TARGETID in your ss object. Also note that ss.num_spectra() == 2 and ss.num_targets() == 1