lincc-frameworks / tape

[Deprecated] Package for working with LSST time series data
https://tape.readthedocs.io
MIT License
12 stars 3 forks source link

Bug with calc_nobs implicit repartitioning #405

Closed dougbrn closed 5 months ago

dougbrn commented 5 months ago

Bug report Blocks #404. There's an issue with the repartitioning done within calc_nobs(by_band=True) to align the new source table's partitions with the object tables partitions. The following code block produces an error, complaining about division mismatches:

# Load some ZTF Data
from tape import Ensemble, ColumnMapper

colmap = ColumnMapper().use_known_map("ztf")
ens= Ensemble().from_hipscat("https://epyc.astro.washington.edu/~lincc-frameworks/half_degree_surveys/ztf/ztf_source",
                "https://epyc.astro.washington.edu/~lincc-frameworks/half_degree_surveys/ztf/ztf_object",
                column_mapper=colmap, source_index="ps1_objid", object_index="ps1_objid")

# Do some filtering
ens.calc_nobs(by_band=True)
ens.object.query("nobs_g > 50").update_ensemble()
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Cell In[21], line 10
      5 ens= Ensemble().from_hipscat("https://epyc.astro.washington.edu/~lincc-frameworks/half_degree_surveys/ztf/ztf_source",
      6                 "https://epyc.astro.washington.edu/~lincc-frameworks/half_degree_surveys/ztf/ztf_object",
      7                 column_mapper=colmap, source_index="ps1_objid", object_index="ps1_objid")
      9 # Do some filtering
---> 10 ens.calc_nobs(by_band=True)
     11 ens.object.query("nobs_g > 50").update_ensemble()

File ~/lincc/tape/src/tape/ensemble.py:765, in Ensemble.calc_nobs(self, by_band, label, temporary)
    762     meta = {band: float for band in unq_bands}
    764     # Map the groupby to each partition
--> 765     band_counts = self.source.map_partitions(
    766         lambda x: x.groupby(id_col)[[band_col]]
    767         .value_counts()
    768         .to_frame()
    769         .reset_index()
    770         .pivot_table(values=band_col, index=id_col, columns=band_col, aggfunc="sum"),
    771         meta=meta,
    772     ).repartition(divisions=self.object.divisions)
    773 else:
    774     band_counts = (
    775         self.source.groupby([self._id_col])[self._band_col]  # group by each object
    776         .value_counts()  # count occurence of each band
   (...)
    783         )
    784     )  # the pivot_table call makes each band_count a column of the id_col row

File ~/miniforge3/envs/ray310/lib/python3.10/site-packages/dask/dataframe/core.py:1805, in _Frame.repartition(self, divisions, npartitions, partition_size, freq, force)
   1803     return repartition_npartitions(self, npartitions)
   1804 elif divisions is not None:
-> 1805     return repartition(self, divisions, force=force)
   1806 elif freq is not None:
   1807     return repartition_freq(self, freq=freq)

File ~/miniforge3/envs/ray310/lib/python3.10/site-packages/dask/dataframe/core.py:8278, in repartition(df, divisions, force)
   8276 tmp = "repartition-split-" + token
   8277 out = "repartition-merge-" + token
-> 8278 dsk = repartition_divisions(
   8279     df.divisions, divisions, df._name, tmp, out, force=force
   8280 )
   8281 graph = HighLevelGraph.from_collections(out, dsk, dependencies=[df])
   8282 return new_dd_object(graph, out, df._meta, divisions)

File ~/miniforge3/envs/ray310/lib/python3.10/site-packages/dask/dataframe/core.py:7935, in repartition_divisions(a, b, name, out1, out2, force)
   7933 if a[0] != b[0]:
   7934     msg = "left side of old and new divisions are different"
-> 7935     raise ValueError(msg)
   7936 if a[-1] != b[-1]:
   7937     msg = "right side of old and new divisions are different"

ValueError: left side of old and new divisions are different

My guess is that as part of calc_nobs I assumed that you could use the object divisions directly to format the source partitions, but it appears like that is not valid in all cases. This may need to be changed to repartition down to the number of object partitions rather than using them directly.

Before submitting Please check the following:

dougbrn commented 5 months ago

https://docs.dask.org/en/stable/generated/dask_expr._collection.DataFrame.repartition.html#dask_expr._collection.DataFrame.repartition

This might be as simple as adding the force=True flag to the repartition call