lincc-frameworks / tape

[Deprecated] Package for working with LSST time series data
MIT License
12 stars 3 forks source link

Potential dask-expr compatitbility errors #434

Open wilsonbb opened 4 months ago

wilsonbb commented 4 months ago

We're seeing some consistent unit test failures with dask-expr 1.0.12. See the following example

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/ in __repr__
    data = self._repr_data().to_string(max_rows=5)
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/ in _repr_data
    index = self._repr_divisions
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/[257]( in _repr_divisions
    name = f"npartitions={self.npartitions}"
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/ in npartitions
    return self.expr.npartitions
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/ in npartitions
    return len(self.divisions) - 1
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/ in __get__
    val = self.func(instance)
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/ in divisions
    return tuple(self._divisions())
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/ in _divisions
    return _get_divisions_map_partitions(
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask/dataframe/ in _get_divisions_map_partitions
    divisions = max((d.divisions for d in dfs), key=len)
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask/dataframe/ in <genexpr>
    divisions = max((d.divisions for d in dfs), key=len)
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/ in __get__
    val = self.func(instance)
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/ in divisions
    return tuple(self._divisions())
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/ in _divisions
    if not self._broadcast_dep(arg):
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/ in _broadcast_dep
    return dep.npartitions == 1 and dep.ndim < self.ndim
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/ in npartitions
    return len(self.divisions) - 1
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/ in __get__
    val = self.func(instance)
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/ in divisions
    return tuple(self._divisions())
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/ in _divisions
    if self.need_to_shuffle:
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/ in __get__
    val = self.func(instance)
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/ in need_to_shuffle
    if any(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

.0 = <set_iterator object at 0x7f43fc3c36c0>

    if any(
>       set(self._by_columns) >= set(cols)
        for cols in self.frame.unique_partition_mapping_columns_from_shuffle
E   TypeError: 'NoneType' object is not iterable

We're also seeing more intermittent failures on earlier versions, but it is less clear if this is tied to specific versions of dask-expr or if dask-expr is involved. Example

________________________ test_batch_by_band[bounds-on1] ________________________

parquet_ensemble = <tape.ensemble.Ensemble object at 0x7fbac85f8250>
func_label = 'bounds', on = ['ps1_objid', 'filterName']

    @pytest.mark.parametrize("on", [None, ["ps1_objid", "filterName"], ["filterName", "ps1_objid"]])
    @pytest.mark.parametrize("func_label", ["mean", "bounds"])
    def test_batch_by_band(parquet_ensemble, func_label, on):
        Test that ensemble.batch(by_band=True) works as intended.

        if func_label == "mean":

            def my_mean(flux):
                """returns a single value"""
                return np.mean(flux)

            res = parquet_ensemble.batch(my_mean, parquet_ensemble._flux_col, on=on, by_band=True)

            filter_res = parquet_ensemble.batch(my_mean, parquet_ensemble._flux_col, on=on, by_band=False)

            # An EnsembleFrame should be returned
            assert isinstance(res, EnsembleFrame)

            # Make sure we get all the expected columns
            assert all([col in res.columns for col in ["result_g", "result_r"]])

            # These should be equivalent
            # [expr] need this TODO: investigate typing issue
            filter_res.index = filter_res.index.astype("int")
            assert (

        elif func_label == "bounds":

            def my_bounds(flux):
                """returns a series"""
                return pd.Series({"min": np.min(flux), "max": np.max(flux)})

            res = parquet_ensemble.batch(
                my_bounds, "psFlux", on=on, by_band=True, meta={"min": float, "max": float}

            filter_res = parquet_ensemble.batch(
                my_bounds, "psFlux", on=on, by_band=False, meta={"min": float, "max": float}

            # An EnsembleFrame should be returned
            assert isinstance(res, EnsembleFrame)

            # Make sure we get all the expected columns
            assert all([col in res.columns for col in ["max_g", "max_r", "min_g", "min_r"]])

            # These should be equivalent

            # [expr] need this TODO: investigate typing issue
            filter_res.index = filter_res.index.astype("int")

>           assert (
E           AssertionError: assert False
E            +  where False = <bound method NDFrame.equals of ps1_objid\n88472935274829959    0.0\nName: max_g, dtype: float32>(ps1_objid\n88472935274829959   NaN\nName: max, dtype: float32)
E            +    where <bound method NDFrame.equals of ps1_objid\n88472935274829959    0.0\nName: max_g, dtype: float32> = ps1_objid\n88472935274829959    0.0\nName: max_g, dtype: float32.equals
E            +      where ps1_objid\n88472935274829959    0.0\nName: max_g, dtype: float32 = <bound method FrameBase.compute of Dask Series Structure:\nnpartitions=1\n    object\n       ...\nDask Name: getitem, 11 e...Expr=(LocUnknown(frame=MapPartitions(TapeFrame), iindexer=slice(88472935274829959, 88472935274829959, None)))['max_g']>()
E            +        where <bound method FrameBase.compute of Dask Series Structure:\nnpartitions=1\n    object\n       ...\nDask Name: getitem, 11 e...Expr=(LocUnknown(frame=MapPartitions(TapeFrame), iindexer=slice(88472935274829959, 88472935274829959, None)))['max_g']> = Dask Series Structure:\nnpartitions=1\n    object\n       ...\nDask Name: getitem, 11 expressions\nExpr=(LocUnknown(frame=MapPartitions(TapeFrame), iindexer=slice(88472935274829959, 88472935274829959, None)))['max_g'].compute
E            +    and   ps1_objid\n88472935274829959   NaN\nName: max, dtype: float32 = <bound method FrameBase.compute of Dask Series Structure:\nnpartitions=1\n    object\n       ...\nDask Name: getitem, 11 e...s(TapeFrame), other='ps1_objid')), dtypes='int')), iindexer=slice(88472935274829959, 88472935274829959, None)))['max']>()
E            +      where <bound method FrameBase.compute of Dask Series Structure:\nnpartitions=1\n    object\n       ...\nDask Name: getitem, 11 e...s(TapeFrame), other='ps1_objid')), dtypes='int')), iindexer=slice(88472935274829959, 88472935274829959, None)))['max']> = Dask Series Structure:\nnpartitions=1\n    object\n       ...\nDask Name: getitem, 11 expressions\nExpr=(LocUnknown(frame=A...ns(TapeFrame), other='ps1_objid')), dtypes='int')), iindexer=slice(88472935274829959, 88472935274829959, None)))['max'].compute
wilsonbb commented 4 months ago

So prevents the first error from showing up in our project but the latter error seems to have been present since the initial PR that added dask-expr to TAPE. Some useful examples

  1. is on an old branch only up to the commit where we merged in dask-expr to TAPE but with some changes to pin dask and dask-expr to the version we used at the time (as seen from the listed installed dependencies here and pinning to python<=3.11.8. Here we are still seeing the above failures for test_batch_by_band
  2. does the same changes of pinning to python<=3.11.8 and is built on the final PR to TAPE before we added dask-expr. This seems to be running without the above failures
  3. On the current build, if I remove all tests except for the failing test, test_batch_by_band we don't typically see failures. An example of this is Whatever flakiness is happening seems to be related to maybe test order or concurrent tests