mars-project / mars

Mars is a tensor-based unified framework for large-scale data computation which scales numpy, pandas, scikit-learn and Python functions.
https://mars-project.readthedocs.io
Apache License 2.0
2.7k stars 326 forks source link

[BUG] Ray shuffle reduce call iter_mapper_data_with_index returns incorrect values #3186

Closed fyrestone closed 2 years ago

fyrestone commented 2 years ago

Describe the bug A clear and concise description of what the bug is.

mars/dataframe/indexing/tests/test_indexing_execution.py:727 (test_reset_index_execution)
setup = <mars.deploy.oscar.session.SyncSession object at 0x14c308f40>

    def test_reset_index_execution(setup):
        data = pd.DataFrame(
            [("bird", 389.0), ("bird", 24.0), ("mammal", 80.5), ("mammal", np.nan)],
            index=["falcon", "parrot", "lion", "monkey"],
            columns=("class", "max_speed"),
        )
        df = md.DataFrame(data)
        df2 = df.reset_index()
        result = df2.execute().fetch()
        expected = data.reset_index()
        pd.testing.assert_frame_equal(result, expected)

        df = md.DataFrame(data, chunk_size=2)
        df2 = df.reset_index()
        result = df2.execute().fetch()
        expected = data.reset_index()
        pd.testing.assert_frame_equal(result, expected)

        df = md.DataFrame(data, chunk_size=1)
        df2 = df.reset_index(drop=True)
        result = df2.execute().fetch()
        expected = data.reset_index(drop=True)
        pd.testing.assert_frame_equal(result, expected)

        index = pd.MultiIndex.from_tuples(
            [
                ("bird", "falcon"),
                ("bird", "parrot"),
                ("mammal", "lion"),
                ("mammal", "monkey"),
            ],
            names=["class", "name"],
        )
        data = pd.DataFrame(
            [("bird", 389.0), ("bird", 24.0), ("mammal", 80.5), ("mammal", np.nan)],
            index=index,
            columns=("type", "max_speed"),
        )
        df = md.DataFrame(data, chunk_size=1)
        df2 = df.reset_index(level="class")
        result = df2.execute().fetch()
        expected = data.reset_index(level="class")
        pd.testing.assert_frame_equal(result, expected)

        columns = pd.MultiIndex.from_tuples([("speed", "max"), ("species", "type")])
        data.columns = columns
        df = md.DataFrame(data, chunk_size=2)
        df2 = df.reset_index(level="class", col_level=1, col_fill="species")
        result = df2.execute().fetch()
        expected = data.reset_index(level="class", col_level=1, col_fill="species")
        pd.testing.assert_frame_equal(result, expected)

        df = md.DataFrame(data, chunk_size=3)
        df.reset_index(level="class", col_level=1, col_fill="species", inplace=True)
        result = df.execute().fetch()
        expected = data.reset_index(level="class", col_level=1, col_fill="species")
        pd.testing.assert_frame_equal(result, expected)

        # Test Series

        s = pd.Series(
            [1, 2, 3, 4], name="foo", index=pd.Index(["a", "b", "c", "d"], name="idx")
        )

        series = md.Series(s)
        s2 = series.reset_index(name="bar")
        result = s2.execute().fetch()
        expected = s.reset_index(name="bar")
        pd.testing.assert_frame_equal(result, expected)

        series = md.Series(s, chunk_size=2)
        s2 = series.reset_index(drop=True)
        result = s2.execute().fetch()
        expected = s.reset_index(drop=True)
        pd.testing.assert_series_equal(result, expected)

        # Test Unknown shape
        data1 = pd.DataFrame(np.random.rand(10, 3), index=[0, 10, 2, 3, 4, 5, 6, 7, 8, 9])
        df1 = md.DataFrame(data1, chunk_size=5)
        data2 = pd.DataFrame(np.random.rand(10, 3), index=[11, 1, 2, 5, 7, 6, 8, 9, 10, 3])
        df2 = md.DataFrame(data2, chunk_size=6)
        df = (df1 + df2).reset_index(incremental_index=True)
>       result = df.execute().fetch()

mars/dataframe/indexing/tests/test_indexing_execution.py:810: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
mars/core/entity/tileables.py:462: in execute
    result = self.data.execute(session=session, **kw)
mars/core/entity/executable.py:144: in execute
    return execute(self, session=session, **kw)
mars/deploy/oscar/session.py:1890: in execute
    return session.execute(
mars/deploy/oscar/session.py:1684: in execute
    execution_info: ExecutionInfo = fut.result(
../../.pyenv/versions/3.8.7/lib/python3.8/concurrent/futures/_base.py:439: in result
    return self.__get_result()
../../.pyenv/versions/3.8.7/lib/python3.8/concurrent/futures/_base.py:388: in __get_result
    raise self._exception
mars/deploy/oscar/session.py:1870: in _execute
    await execution_info
../../.pyenv/versions/3.8.7/lib/python3.8/asyncio/tasks.py:695: in _wrap_awaitable
    return (yield from awaitable.__await__())
mars/deploy/oscar/session.py:105: in wait
    return await self._aio_task
mars/deploy/oscar/session.py:953: in _run_in_background
    raise task_result.error.with_traceback(task_result.traceback)
mars/services/task/supervisor/processor.py:369: in run
    await self._process_stage_chunk_graph(*stage_args)
mars/services/task/supervisor/processor.py:247: in _process_stage_chunk_graph
    chunk_to_result = await self._executor.execute_subtask_graph(
mars/services/task/execution/ray/executor.py:538: in execute_subtask_graph
    meta_list = await asyncio.gather(*output_meta_object_refs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

awaitable = ObjectRef(8915caf77059302bffffffffffffffffffffffff0100000001000000)

    @types.coroutine
    def _wrap_awaitable(awaitable):
        """Helper for asyncio.ensure_future().

        Wraps awaitable (an object with __await__) into a coroutine
        that will later be wrapped in a Task by ensure_future().
        """
>       return (yield from awaitable.__await__())
E       ray.exceptions.RayTaskError(TypeError): ray::execute_subtask() (pid=32948, ip=127.0.0.1)
E         At least one of the input arguments for this task could not be computed:
E       ray.exceptions.RayTaskError: ray::execute_subtask() (pid=32945, ip=127.0.0.1)
E         File "/Users/po/Work/mars/mars/services/task/execution/ray/executor.py", line 180, in execute_subtask
E           execute(context, chunk.op)
E         File "/Users/po/Work/mars/mars/core/operand/core.py", line 491, in execute
E           result = executor(results, op)
E         File "/Users/po/Work/mars/mars/dataframe/align.py", line 321, in execute
E           cls.execute_reduce(ctx, op)
E         File "/Users/po/Work/mars/mars/dataframe/align.py", line 298, in execute_reduce
E           row_idxes = sorted({idx[0] for idx in input_idx_to_df})
E         File "/Users/po/Work/mars/mars/dataframe/align.py", line 298, in <setcomp>
E           row_idxes = sorted({idx[0] for idx in input_idx_to_df})
E       TypeError: 'int' object is not subscriptable

../../.pyenv/versions/3.8.7/lib/python3.8/asyncio/tasks.py:695: RayTaskError(TypeError)

To Reproduce To help us reproducing this bug, please provide information below:

  1. Your Python version
  2. The version of Mars you use
  3. Versions of crucial packages, such as numpy, scipy and pandas
  4. Full stack of the error.
  5. Minimized code to reproduce the error.

Expected behavior A clear and concise description of what you expected to happen.

Additional context Add any other context about the problem here.

chaokunyang commented 2 years ago

This is pretty tricky, the index in DataFrameIndexAlign reduce chunk is a tuple for following graph: graphviz (3)

The tuple index for chunk index break the assumption in ray shuffle, which mock a chunk index using an int: image image