xorbitsai / xorbits

Scalable Python DS & ML, in an API compatible & lightning fast way.
https://xorbits.io
Apache License 2.0
1.06k stars 67 forks source link

BUG: `merge` performance issue caused by `DataFrameAutoMergeMixin` #740

Closed ChengjieLi28 closed 8 months ago

ChengjieLi28 commented 8 months ago

Fix a performance issue caused by DataFrameAutoMergeMixin. Without this PR, TPCH sf1 will have the following error:

AssertionError
 51%|███████████████████████████████████████████████████▍                                                 |  50.88/100 [00:01<00:00, 50.65it/s]2023-10-13 16:13:55,293 xorbits._mars.services.task.execution.mars.stage 88633 ERROR    Subtask 1bmsxjlcmLYPTmtAN6LlM4X0 errored
Traceback (most recent call last):
  File "/Users/lichengjie/Projects/workspace/xorbits/python/xorbits/_mars/services/subtask/worker/processor.py", line 212, in _execute_operand
    return execute(ctx, op)
  File "/Users/lichengjie/Projects/workspace/xorbits/python/xorbits/_mars/core/operand/core.py", line 492, in execute
    result = executor(results, op)
  File "/Users/lichengjie/Projects/workspace/xorbits/python/xorbits/_mars/dataframe/merge/concat.py", line 349, in execute
    ctx[chunk.key] = _base_concat(chunk, inputs)
  File "/Users/lichengjie/Projects/workspace/xorbits/python/xorbits/_mars/dataframe/merge/concat.py", line 252, in _base_concat
    return _auto_concat_dataframe_chunks(chunk, inputs)
  File "/Users/lichengjie/Projects/workspace/xorbits/python/xorbits/_mars/dataframe/merge/concat.py", line 281, in _auto_concat_dataframe_chunks
    assert n_rows * n_cols == len(inputs)
AssertionError
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████| 100.00/100 [00:01<00:00, 96.81it/s]
Traceback (most recent call last):
  File "/Users/lichengjie/Projects/workspace/xorbits/benchmarks/tpch/run_queries.py", line 1149, in <module>
    main()
  File "/Users/lichengjie/Projects/workspace/xorbits/benchmarks/tpch/run_queries.py", line 1136, in main
    run_queries(
  File "/Users/lichengjie/Projects/workspace/xorbits/benchmarks/tpch/run_queries.py", line 1053, in run_queries
    globals()[f"q{query:02}"](*queries_to_args[query])
  File "/Users/lichengjie/Projects/workspace/xorbits/benchmarks/tpch/run_queries.py", line 146, in wrapped
    q(*args, **kwargs)
  File "/Users/lichengjie/Projects/workspace/xorbits/benchmarks/tpch/run_queries.py", line 586, in q09
    print(total)
  File "/Users/lichengjie/Projects/workspace/xorbits/python/xorbits/utils.py", line 38, in inn
    return f(self, *args, **kwargs)
  File "/Users/lichengjie/Projects/workspace/xorbits/python/xorbits/core/data.py", line 299, in __str__
    run(self)
  File "/Users/lichengjie/Projects/workspace/xorbits/python/xorbits/core/execution.py", line 55, in run
    mars_execute(mars_tileables, **kwargs)
  File "/Users/lichengjie/Projects/workspace/xorbits/python/xorbits/_mars/deploy/oscar/session.py", line 1789, in execute
    return session.execute(
  File "/Users/lichengjie/Projects/workspace/xorbits/python/xorbits/_mars/deploy/oscar/session.py", line 1600, in execute
    execution_info: ExecutionInfo = fut.result(
  File "/Users/lichengjie/miniconda3/lib/python3.9/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/Users/lichengjie/miniconda3/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/Users/lichengjie/Projects/workspace/xorbits/python/xorbits/_mars/deploy/oscar/session.py", line 1769, in _execute
    await execution_info
  File "/Users/lichengjie/Projects/workspace/xorbits/python/xorbits/_mars/deploy/oscar/session.py", line 124, in wait
    return await self._aio_task
  File "/Users/lichengjie/Projects/workspace/xorbits/python/xorbits/_mars/deploy/oscar/session.py", line 884, in _run_in_background
    raise task_result.error.with_traceback(task_result.traceback)
  File "/Users/lichengjie/Projects/workspace/xorbits/python/xorbits/_mars/services/task/supervisor/processor.py", line 388, in run
    await self._process_stage_chunk_graph(*stage_args)
  File "/Users/lichengjie/Projects/workspace/xorbits/python/xorbits/_mars/services/task/supervisor/processor.py", line 265, in _process_stage_chunk_graph
    chunk_to_result = await self._executor.execute_subtask_graph(
  File "/Users/lichengjie/Projects/workspace/xorbits/python/xorbits/_mars/services/task/execution/mars/executor.py", line 203, in execute_subtask_graph
    return await stage_processor.run()
  File "/Users/lichengjie/Projects/workspace/xorbits/python/xorbits/_mars/services/task/execution/mars/stage.py", line 233, in run
    return await self._run()
  File "/Users/lichengjie/Projects/workspace/xorbits/python/xorbits/_mars/services/task/execution/mars/stage.py", line 253, in _run
    raise self.result.error.with_traceback(self.result.traceback)
  File "/Users/lichengjie/Projects/workspace/xorbits/python/xorbits/_mars/services/subtask/worker/processor.py", line 212, in _execute_operand
    return execute(ctx, op)
  File "/Users/lichengjie/Projects/workspace/xorbits/python/xorbits/_mars/core/operand/core.py", line 492, in execute
    result = executor(results, op)
  File "/Users/lichengjie/Projects/workspace/xorbits/python/xorbits/_mars/dataframe/merge/concat.py", line 349, in execute
    ctx[chunk.key] = _base_concat(chunk, inputs)
  File "/Users/lichengjie/Projects/workspace/xorbits/python/xorbits/_mars/dataframe/merge/concat.py", line 252, in _base_concat
    return _auto_concat_dataframe_chunks(chunk, inputs)
  File "/Users/lichengjie/Projects/workspace/xorbits/python/xorbits/_mars/dataframe/merge/concat.py", line 281, in _auto_concat_dataframe_chunks
    assert n_rows * n_cols == len(inputs)
AssertionError

and on TPCH sf1000 merge op would hang and have performance issue.

Related issue number

Related PR #699

Check code requirements

codecov[bot] commented 8 months ago

Codecov Report

Merging #740 (e888727) into main (7ea4316) will decrease coverage by 7.29%. The diff coverage is 100.00%.

@@            Coverage Diff             @@
##             main     #740      +/-   ##
==========================================
- Coverage   93.55%   86.26%   -7.29%     
==========================================
  Files        1026     1026              
  Lines       79586    79589       +3     
  Branches    16497    16497              
==========================================
- Hits        74453    68654    -5799     
- Misses       3453     9366    +5913     
+ Partials     1680     1569     -111     
Flag Coverage Δ
unittests 86.18% <100.00%> (-7.27%) :arrow_down:

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Coverage Δ
...on/xorbits/_mars/dataframe/base/cartesian_chunk.py 94.65% <100.00%> (+0.04%) :arrow_up:
python/xorbits/_mars/dataframe/base/core.py 100.00% <100.00%> (ø)
python/xorbits/_mars/dataframe/merge/merge.py 98.32% <100.00%> (+<0.01%) :arrow_up:

... and 129 files with indirect coverage changes