Catalog.to_hipscat() is much slower than Catalog._ddf.to_parquet() for large jobs.
For example, for a smaller job, to_hipscat() took 100s, while to_parquet() took only 50s. For a larger job to_hipscat took 63 minutes, while to_parquet() took only 12 minutes. These are on Bridges2, 128 cores / 256 GB, 16 Dask workers.
For the larger job for the most of the time I see no activity with Dask Dashboard, and 100% of a single CPU core usage with top. This probably means that some planning job is talking all the time, not actual computations and I/O.
Code I run
```python
from pathlib import Path
import dask.distributed
import lsdb
HIPSCAT_PATH = Path('/ocean/projects/phy210048p/shared/hipscat/catalogs/')
PS1_BANDS = 'riz'
# Smaller version adds .cone_search(0, 45, 25*3600) to each catalog
ztf_dr17_coord = lsdb.read_hipscat(
'hipscat/ztf_dr17_coord',
margin_cache='hipscat/ztf_dr17_coord_2arcsec',
).query('filter == 2') # r-band only
gaia = lsdb.read_hipscat(
HIPSCAT_PATH / 'gaia_dr3' / 'gaia',
margin_cache=str(HIPSCAT_PATH / 'gaia_dr3' / 'gaia_10arcs'),
columns=[
'ra', 'dec',
# 'parallax',
'parallax_over_error',
'teff_gspphot', 'teff_gspphot_lower', 'teff_gspphot_upper',
'logg_gspphot', 'logg_gspphot_lower', 'logg_gspphot_upper',
# 'ag_gspphot', 'ag_gspphot_lower', 'ag_gspphot_upper',
],
).query(
"parallax_over_error > 10.0"
" and teff_gspphot_upper < 3800"
" and (teff_gspphot_upper - teff_gspphot_lower) < 400"
" and logg_gspphot_lower > 4.5"
" and (logg_gspphot_upper - logg_gspphot_lower) < 0.2"
)
panstarrs = lsdb.read_hipscat(
HIPSCAT_PATH / 'ps1' / 'ps1_otmo',
margin_cache=str(HIPSCAT_PATH / 'ps1' / 'ps1_otmo_10arcs'),
columns=
['raMean', 'decMean']
+ [f'{b}MeanPSFMag' for b in PS1_BANDS]
+ [f'{b}MeanPSFMagErr' for b in PS1_BANDS],
).query(
"((rMeanPSFMag - iMeanPSFMag) + (rMeanPSFMagErr + iMeanPSFMagErr)) > 0.42"
" and ((iMeanPSFMag - zMeanPSFMag) + (iMeanPSFMagErr + zMeanPSFMagErr)) > 0.23"
" and rMeanPSFMagErr < 0.1 and iMeanPSFMagErr < 0.1 and zMeanPSFMagErr < 0.1"
)
catalog = ztf_dr17_coord.crossmatch(
gaia,
radius_arcsec=1,
n_neighbors=1,
suffixes=['', '_gaia'],
).crossmatch(
panstarrs,
radius_arcsec=1,
n_neighbors=1,
suffixes=['', ''],
)
with dask.distributed.Client(n_workers=16):
catalog.to_hipscat(base_catalog_path=f'hipscat/catalog_name', catalog_name='catalog_name')
with dask.distributed.Client(n_workers=16):
catalog._ddf.to_parquet('delete-me')
```
Before submitting
Please check the following:
[x] I have described the situation in which the bug arose, including what code was executed, information about my environment, and any applicable data others will need to reproduce the problem.
[x] I have included available evidence of the unexpected behavior (including error messages, screenshots, and/or plots) as well as a descriprion of what I expected instead.
[ ] If I have a solution in mind, I have provided an explanation and/or pseudocode and/or task list.
Bug report
Catalog.to_hipscat()
is much slower thanCatalog._ddf.to_parquet()
for large jobs.For example, for a smaller job,
to_hipscat()
took 100s, whileto_parquet()
took only 50s. For a larger jobto_hipscat
took 63 minutes, whileto_parquet()
took only 12 minutes. These are on Bridges2, 128 cores / 256 GB, 16 Dask workers.For the larger job for the most of the time I see no activity with Dask Dashboard, and 100% of a single CPU core usage with
top
. This probably means that some planning job is talking all the time, not actual computations and I/O.Code I run
```python from pathlib import Path import dask.distributed import lsdb HIPSCAT_PATH = Path('/ocean/projects/phy210048p/shared/hipscat/catalogs/') PS1_BANDS = 'riz' # Smaller version adds .cone_search(0, 45, 25*3600) to each catalog ztf_dr17_coord = lsdb.read_hipscat( 'hipscat/ztf_dr17_coord', margin_cache='hipscat/ztf_dr17_coord_2arcsec', ).query('filter == 2') # r-band only gaia = lsdb.read_hipscat( HIPSCAT_PATH / 'gaia_dr3' / 'gaia', margin_cache=str(HIPSCAT_PATH / 'gaia_dr3' / 'gaia_10arcs'), columns=[ 'ra', 'dec', # 'parallax', 'parallax_over_error', 'teff_gspphot', 'teff_gspphot_lower', 'teff_gspphot_upper', 'logg_gspphot', 'logg_gspphot_lower', 'logg_gspphot_upper', # 'ag_gspphot', 'ag_gspphot_lower', 'ag_gspphot_upper', ], ).query( "parallax_over_error > 10.0" " and teff_gspphot_upper < 3800" " and (teff_gspphot_upper - teff_gspphot_lower) < 400" " and logg_gspphot_lower > 4.5" " and (logg_gspphot_upper - logg_gspphot_lower) < 0.2" ) panstarrs = lsdb.read_hipscat( HIPSCAT_PATH / 'ps1' / 'ps1_otmo', margin_cache=str(HIPSCAT_PATH / 'ps1' / 'ps1_otmo_10arcs'), columns= ['raMean', 'decMean'] + [f'{b}MeanPSFMag' for b in PS1_BANDS] + [f'{b}MeanPSFMagErr' for b in PS1_BANDS], ).query( "((rMeanPSFMag - iMeanPSFMag) + (rMeanPSFMagErr + iMeanPSFMagErr)) > 0.42" " and ((iMeanPSFMag - zMeanPSFMag) + (iMeanPSFMagErr + zMeanPSFMagErr)) > 0.23" " and rMeanPSFMagErr < 0.1 and iMeanPSFMagErr < 0.1 and zMeanPSFMagErr < 0.1" ) catalog = ztf_dr17_coord.crossmatch( gaia, radius_arcsec=1, n_neighbors=1, suffixes=['', '_gaia'], ).crossmatch( panstarrs, radius_arcsec=1, n_neighbors=1, suffixes=['', ''], ) with dask.distributed.Client(n_workers=16): catalog.to_hipscat(base_catalog_path=f'hipscat/catalog_name', catalog_name='catalog_name') with dask.distributed.Client(n_workers=16): catalog._ddf.to_parquet('delete-me') ```Before submitting Please check the following: