mars-project / mars

Mars is a tensor-based unified framework for large-scale data computation which scales numpy, pandas, scikit-learn and Python functions.
Apache License 2.0
2.68k stars 325 forks source link

[BUG] Ray executor raises KeyError when gen_subtask_graph #3269

Closed fyrestone closed 1 year ago

fyrestone commented 1 year ago

Describe the bug A clear and concise description of what the bug is.

________________ test_bagging_classifier[True-10-1.0-True-SVC] _________________

setup = <mars.deploy.oscar.session.SyncSession object at 0x33ac7eb50>
use_dataframe = True, max_samples = 10, max_features = 1.0, with_weights = True
base_estimator_cls = <class 'sklearn.svm._classes.SVC'>

        "use_dataframe, max_samples, max_features, with_weights, base_estimator_cls",
            (False, 10, 0.5, False, LogisticRegression),
            (True, 10, 1.0, True, SVC),
    def test_bagging_classifier(
        setup, use_dataframe, max_samples, max_features, with_weights, base_estimator_cls
        rs = np.random.RandomState(0)

        raw_x, raw_y = make_classification(

        if not use_dataframe:
            t_x = mt.tensor(raw_x, chunk_size=20)
            raw_x = pd.DataFrame(raw_x)
            t_x = md.DataFrame(raw_x, chunk_size=20)

        raw_weights = rs.random(100)
        t_y = mt.tensor(raw_y, chunk_size=20)
        t_weights = mt.tensor(raw_weights, chunk_size=20) if with_weights else None

        clf = BaggingClassifier(
        ), t_y, sample_weight=t_weights)

        for _tiled, _chunk, chunk_data in _get_tileable_chunk_data(setup, clf.estimators_):
            assert len(chunk_data) == 2
            assert all(isinstance(c, base_estimator_cls) for c in chunk_data)

        if max_features < 1.0:
            assert clf.estimator_features_ is not None

        with pytest.warns(Warning):
  , t_y, sample_weight=t_weights)
        with pytest.raises(ValueError):
            clf.n_estimators = 5
  , t_y, sample_weight=t_weights)

        clf.n_estimators = 20, t_y, sample_weight=t_weights)
        assert clf.estimators_.shape[0] == 20

        proba = clf.predict_proba(t_x)
        proba_array = proba.fetch()
        assert np.all((proba_array >= 0) & (proba_array <= 1))
        assert np.allclose(np.sum(proba_array, axis=1), 1.0)

        log_proba = clf.predict_log_proba(t_x)
        exp_log_proba_array = np.exp(log_proba.fetch())
        assert np.all((exp_log_proba_array >= 0) & (exp_log_proba_array <= 1))
        assert np.allclose(np.sum(exp_log_proba_array, axis=1), 1.0)

>       y = clf.predict(t_x)

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
mars/learn/ensemble/ in predict
    return execute(y, session=session, **(run_kwargs or dict()))
mars/deploy/oscar/ in execute
    return session.execute(
mars/deploy/oscar/ in execute
    execution_info: ExecutionInfo = fut.result(
../../.pyenv/versions/3.8.13/lib/python3.8/concurrent/futures/ in result
    return self.__get_result()
../../.pyenv/versions/3.8.13/lib/python3.8/concurrent/futures/ in __get_result
    raise self._exception
mars/deploy/oscar/ in _execute
    await execution_info
../../.pyenv/versions/3.8.13/lib/python3.8/asyncio/ in _wrap_awaitable
    return (yield from awaitable.__await__())
mars/deploy/oscar/ in wait
    return await self._aio_task
mars/deploy/oscar/ in _run_in_background
    raise task_result.error.with_traceback(task_result.traceback)
mars/services/task/supervisor/ in run
    await self._process_stage_chunk_graph(*stage_args)
mars/services/task/supervisor/ in _process_stage_chunk_graph
    subtask_graph = await asyncio.to_thread(
mars/lib/aio/ in to_thread
    return await loop.run_in_executor(None, func_call)
../../.pyenv/versions/3.8.13/lib/python3.8/concurrent/futures/ in run
    result = self.fn(*self.args, **self.kwargs)
mars/core/ in _inner
    return func(*args, **kwargs)
mars/services/task/supervisor/tests/ in analyze
    subtask_graph = analyzer.gen_subtask_graph()
mars/core/ in _inner
    return func(*args, **kwargs)
mars/services/task/analyzer/ in gen_subtask_graph
    subtask, inp_subtasks, is_shuffle_proxy = self._gen_subtask_info(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = < object at 0x33d7a9850>
chunks = [TensorChunkData <op=TensorConcatenate, key=c5bb11bc0edb3df973226e5e38faff2b_0>, TensorChunkData <op=TensorSum, stage=...agg, key=ce125e5107002a50457df480baf02bcf_0>, TensorChunkData <op=TensorSlice, key=dabeb289402d19739cc99fa887734617_0>]
chunk_to_subtask = {DataFrameChunkData <op=DataFrameDataSource, key=c7b091c709abec93f88cb961255bdd31_0>: <Subtask id=xdHHx4jnnJ0sw8K5Li6f...9462e_0>: <Subtask id=j7A8BuHPGXVw5D8A4QD25aEl results=[DataFrameDataSource(d2030277d86500b90f91ebb48da9462e_0)]>, ...}
chunk_to_bands = {TensorChunkData <op=FancyIndexingConcat, stage=reduce, key=fe84472d67b1c15aac3b0ce8e6ccab3c_0>: ('ray_virtual_address...FancyIndexingConcat, stage=reduce, key=e18dcabc2ba9703bff01dde3f2789b95_0>: ('ray_virtual_address_0:0', 'numa-0'), ...}
chunk_to_fetch_chunk = {DataFrameChunkData <op=DataFrameDataSource, key=c6d3ec8b2ef904b4b9a84d6d55bb56b4_0>: DataFrameChunkData <op=DataFrame...b091c709abec93f88cb961255bdd31_0>: DataFrameChunkData <op=DataFrameFetch, key=c7b091c709abec93f88cb961255bdd31_0>, ...}

    def _gen_subtask_info(
        chunks: List[ChunkType],
        chunk_to_subtask: Dict[ChunkType, Subtask],
        chunk_to_bands: Dict[ChunkType, BandType],
        chunk_to_fetch_chunk: Dict[ChunkType, ChunkType],
    ) -> Tuple[Subtask, List[Subtask], bool]:
        # gen subtask and its input subtasks
        chunks_set = set(chunks)
        result_chunks = []
        result_chunks_set = set()
        chunk_graph = ChunkGraph(result_chunks)
        out_of_scope_chunks = []
        chunk_to_copied = self._chunk_to_copied
        update_meta_chunks = []
        # subtask properties
        band = None
        is_virtual = None
        retryable = True
        chunk_priority = None
        expect_worker = None
        bands_specified = None
        processed = set()
        for chunk in chunks:
            if chunk in processed:
            if expect_worker is None:
                expect_worker = chunk.op.expect_worker
                bands_specified = expect_worker is not None
            else:  # pragma: no cover
                assert (
                    chunk.op.expect_worker is None
                    or expect_worker == chunk.op.expect_worker
                ), (
                    f"expect_worker {chunk.op.expect_worker} conflicts with chunks that have same color: "
            # process band
            chunk_band = chunk_to_bands.get(chunk)
            if chunk_band is not None:
                assert (
                    band is None or band == chunk_band
                ), "band conflicts with chunks that have same color"
                band = chunk_band
            # process is_virtual
            if isinstance(chunk.op, VirtualOperand):
                assert is_virtual is None, "only 1 virtual operand can exist"
                is_virtual = True
                is_virtual = False
            # process retryable
            if not chunk.op.retryable:
                retryable = False
            # process priority
            if chunk.op.priority is not None:
                assert (
                    chunk_priority is None or chunk_priority == chunk.op.priority
                ), "priority conflicts with chunks that have same color"
                chunk_priority = chunk.op.priority
            # process input chunks
            inp_chunks = []
            build_fetch_index_to_chunks = dict()
            for i, inp_chunk in enumerate(chunk.inputs):
                if inp_chunk in chunks_set:
                    build_fetch_index_to_chunks[i] = inp_chunk
                    if not isinstance(inp_chunk.op, Fetch):
            fetch_chunks = self._gen_input_chunks(
                list(build_fetch_index_to_chunks.values()), chunk_to_fetch_chunk
            for i, fetch_chunk in zip(build_fetch_index_to_chunks, fetch_chunks):
                inp_chunks[i] = fetch_chunk
            copied_op = chunk.op.copy()
            copied_op._key = chunk.op.key
            out_chunks = [
                for c in copied_op.new_chunks(
                    inp_chunks, kws=[c.params.copy() for c in chunk.op.outputs]
            for src_chunk, out_chunk in zip(chunk.op.outputs, out_chunks):
                out_chunk._key = src_chunk.key
                # cannot be copied twice
                assert src_chunk not in chunk_to_copied
                chunk_to_copied[src_chunk] = out_chunk
                if src_chunk in self._final_result_chunks_set:
                    if out_chunk not in result_chunks_set:
                        # add to result chunks
                        # chunk is in the result chunks of full chunk graph
                        # meta need to be updated
                if not is_virtual:
                    # skip adding fetch chunk to chunk graph when op is virtual operand
                    for c in inp_chunks:
                        if c not in chunk_graph:
                        chunk_graph.add_edge(c, out_chunk)
        # add chunks with no successors into result chunks
            for c in chunk_graph.iter_indep(reverse=True)
            if c not in result_chunks_set
        expect_bands = (
            if bands_specified
            else ([band] if band is not None else None)
        # calculate priority
        if out_of_scope_chunks:
            inp_subtasks = []
            for out_of_scope_chunk in out_of_scope_chunks:
>               copied_out_of_scope_chunk = chunk_to_copied[out_of_scope_chunk]
E               KeyError: TensorChunkData <op=TensorArgmax, stage=map, key=eab3440ebb09b9b78d27ad27130f6501_0>

mars/services/task/analyzer/ KeyError

The problem is that there are two mapper oprands in one colour (a subtask). Ray DAG shuffle requires one subtask one mapper. So, the TensorArgmax:map:ed36c and FancyIndexingDistribute:map:f6251 are set to new colours (subtasks). But, the TensorArgmax:agg:ea33c is the output of TensorArgmax:map:ed36c, it is kept in the original colour (subtask).

The solution may be:

  1. Assign a new colour to the mapper and all it's successors. (Only assign a new colour to the mapper itself is incorrect, e.g. this case).
  2. If the mapper's successor is not a shuffle proxy, we don't need to assign a new colour to it.

I thinks the second solution is better.


To Reproduce To help us reproducing this bug, please provide information below:

  1. Your Python version
  2. The version of Mars you use
  3. Versions of crucial packages, such as numpy, scipy and pandas
  4. Full stack of the error.
  5. Minimized code to reproduce the error.

Expected behavior A clear and concise description of what you expected to happen.

Additional context Add any other context about the problem here.