xorbitsai / xorbits

Scalable Python DS & ML, in an API compatible & lightning fast way.
https://xorbits.io
Apache License 2.0
1.06k stars 67 forks source link

BUG: groupby().agg() with nunique failed #721

Closed qinxuye closed 9 months ago

qinxuye commented 9 months ago

Describe the bug

groupby().agg() with nunique failed.

To Reproduce

To help us to reproduce this bug, please provide information below:

  1. Your Python version
  2. The version of Xorbits you use
  3. Versions of crucial packages, such as numpy, scipy and pandas
  4. Full stack of the error.
  5. Minimized code to reproduce the error.
In [1]: import xorbits.pandas as pd

In [2]: df = pd.DataFrame({'a': [1, 2, 3], 'b': ['a', 'b', 'a'], 'c': [0.1, 0.
   ...: 2, 0.1]})

In [3]: df.groupby('b').agg(d=('a', 'nunique'))
Out[3]: /Users/xuyeqin/Workspace/xorbits/python/xorbits/_mars/deploy/oscar/session.py:1953: UserWarning: No existing session found, creating a new local session now.
  warnings.warn(warning_msg)
2023-09-25 19:47:37,368 xorbits._mars.deploy.oscar.local 68540 WARNING  Web service started at http://0.0.0.0:26219
  0%|                                            |   0.00/100 [00:00<?, ?it/s]2023-09-25 19:47:38,199 xorbits._mars.services.scheduling.worker.execution 68540 ERROR    Failed to run subtask Q6W3IZNCzdfb9s9y6FeeFbqO on band numa-0
Traceback (most recent call last):
  File "/Users/xuyeqin/Workspace/xorbits/python/xorbits/_mars/services/subtask/worker/processor.py", line 212, in _execute_operand
    return execute(ctx, op)
  File "/Users/xuyeqin/Workspace/xorbits/python/xorbits/_mars/core/operand/core.py", line 492, in execute
    result = executor(results, op)
  File "/Users/xuyeqin/Workspace/xorbits/python/xorbits/_mars/core/custom_log.py", line 95, in wrap
    return func(cls, ctx, op)
  File "/Users/xuyeqin/Workspace/xorbits/python/xorbits/_mars/utils.py", line 1234, in wrapped
    result = func(cls, ctx, op)
  File "/Users/xuyeqin/Workspace/xorbits/python/xorbits/_mars/dataframe/groupby/aggregation.py", line 1288, in execute
    cls._execute_map(ctx, op)
  File "/Users/xuyeqin/Workspace/xorbits/python/xorbits/_mars/dataframe/groupby/aggregation.py", line 1108, in _execute_map
    agg_dfs.append(cls._do_custom_agg(raw_func_name, op, in_data))
  File "/Users/xuyeqin/Workspace/xorbits/python/xorbits/_mars/dataframe/groupby/aggregation.py", line 992, in _do_custom_agg
    return custom_agg_functions[func_name].execute_map(op, in_data)
KeyError: 'd'
100%|███████████████████████████████████| 100.00/100 [00:00<00:00, 121.12it/s]
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
File ~/miniconda3/lib/python3.9/site-packages/IPython/core/formatters.py:706, in PlainTextFormatter.__call__(self, obj)
    699 stream = StringIO()
    700 printer = pretty.RepresentationPrinter(stream, self.verbose,
    701     self.max_width, self.newline,
    702     max_seq_length=self.max_seq_length,
    703     singleton_pprinters=self.singleton_printers,
    704     type_pprinters=self.type_printers,
    705     deferred_pprinters=self.deferred_printers)
--> 706 printer.pretty(obj)
    707 printer.flush()
    708 return stream.getvalue()

File ~/miniconda3/lib/python3.9/site-packages/IPython/lib/pretty.py:410, in RepresentationPrinter.pretty(self, obj)
    407                         return meth(obj, self, cycle)
    408                 if cls is not object \
    409                         and callable(cls.__dict__.get('__repr__')):
--> 410                     return _repr_pprint(obj, self, cycle)
    412     return _default_pprint(obj, self, cycle)
    413 finally:

File ~/miniconda3/lib/python3.9/site-packages/IPython/lib/pretty.py:778, in _repr_pprint(obj, p, cycle)
    776 """A pprint that just redirects to the normal repr function."""
    777 # Find newlines and replace them with p.break_()
--> 778 output = repr(obj)
    779 lines = output.splitlines()
    780 with p.group():

File ~/Workspace/xorbits/python/xorbits/utils.py:38, in safe_repr_str.<locals>.inn(self, *args, **kwargs)
     36     return getattr(object, f.__name__)(self)
     37 else:
---> 38     return f(self, *args, **kwargs)

File ~/Workspace/xorbits/python/xorbits/core/data.py:310, in DataRef.__repr__(self)
    308     return self.data._mars_entity.op.data.__repr__()
    309 else:
--> 310     run(self)
    311     return self.data.__repr__()

File ~/Workspace/xorbits/python/xorbits/core/execution.py:55, in run(obj, **kwargs)
     53 mars_tileables = [_get_mars_entity(ref) for ref in refs_to_execute.values()]
     54 if mars_tileables:
---> 55     mars_execute(mars_tileables, **kwargs)

File ~/Workspace/xorbits/python/xorbits/_mars/deploy/oscar/session.py:1789, in execute(tileable, session, wait, new_session_kwargs, show_progress, progress_update_interval, *tileables, **kwargs)
   1787     session = get_default_or_create(**(new_session_kwargs or dict()))
   1788 session = _ensure_sync(session)
-> 1789 return session.execute(
   1790     tileable,
   1791     *tileables,
   1792     wait=wait,
   1793     show_progress=show_progress,
   1794     progress_update_interval=progress_update_interval,
   1795     **kwargs,
   1796 )

File ~/Workspace/xorbits/python/xorbits/_mars/deploy/oscar/session.py:1600, in SyncSession.execute(self, tileable, show_progress, warn_duplicated_execution, *tileables, **kwargs)
   1598 fut = asyncio.run_coroutine_threadsafe(coro, self._loop)
   1599 try:
-> 1600     execution_info: ExecutionInfo = fut.result(
   1601         timeout=self._isolated_session.timeout
   1602     )
   1603 except KeyboardInterrupt:  # pragma: no cover
   1604     logger.warning("Cancelling running task")

File ~/miniconda3/lib/python3.9/concurrent/futures/_base.py:446, in Future.result(self, timeout)
    444     raise CancelledError()
    445 elif self._state == FINISHED:
--> 446     return self.__get_result()
    447 else:
    448     raise TimeoutError()

File ~/miniconda3/lib/python3.9/concurrent/futures/_base.py:391, in Future.__get_result(self)
    389 if self._exception:
    390     try:
--> 391         raise self._exception
    392     finally:
    393         # Break a reference cycle with the exception in self._exception
    394         self = None

File ~/Workspace/xorbits/python/xorbits/_mars/deploy/oscar/session.py:1769, in _execute(session, wait, show_progress, progress_update_interval, cancelled, *tileables, **kwargs)
   1766     else:
   1767         # set cancelled to avoid wait task leak
   1768         cancelled.set()
-> 1769     await execution_info
   1770 else:
   1771     return execution_info

File ~/Workspace/xorbits/python/xorbits/_mars/deploy/oscar/session.py:124, in ExecutionInfo._ensure_future.<locals>.wait()
    123 async def wait():
--> 124     return await self._aio_task

File ~/Workspace/xorbits/python/xorbits/_mars/deploy/oscar/session.py:884, in _IsolatedSession._run_in_background(self, tileables, task_id, progress, profiling)
    878         logger.warning(
    879             "Profile task %s execution result:\n%s",
    880             task_id,
    881             json.dumps(task_result.profiling, indent=4),
    882         )
    883     if task_result.error:
--> 884         raise task_result.error.with_traceback(task_result.traceback)
    885 if cancelled:
    886     return

File ~/Workspace/xorbits/python/xorbits/_mars/services/task/supervisor/processor.py:388, in TaskProcessor.run(self)
    386 async with self._executor:
    387     async for stage_args in self._iter_stage_chunk_graph():
--> 388         await self._process_stage_chunk_graph(*stage_args)
    389 await self._task_info_collector.collect_result_nodes(
    390     self._task, self._subtask_graphs
    391 )
    392 await self._task_info_collector.collect_tileable_structure(
    393     self._task, self.get_tileable_to_subtasks()
    394 )

File ~/Workspace/xorbits/python/xorbits/_mars/services/task/supervisor/processor.py:265, in TaskProcessor._process_stage_chunk_graph(self, stage_id, stage_profiler, chunk_graph)
    259 tile_context = await asyncio.to_thread(
    260     self._get_stage_tile_context,
    261     {c for c in chunk_graph.result_chunks if not isinstance(c.op, Fetch)},
    262 )
    264 with Timer() as timer:
--> 265     chunk_to_result = await self._executor.execute_subtask_graph(
    266         stage_id, subtask_graph, chunk_graph, tile_context
    267     )
    268 stage_profiler.set("run", timer.duration)
    270 self._preprocessor.post_chunk_graph_execution()

File ~/Workspace/xorbits/python/xorbits/_mars/services/task/execution/mars/executor.py:203, in MarsTaskExecutor.execute_subtask_graph(self, stage_id, subtask_graph, chunk_graph, tile_context, context)
    201 curr_tile_progress = self._tile_context.get_all_progress() - prev_progress
    202 self._stage_tile_progresses.append(curr_tile_progress)
--> 203 return await stage_processor.run()

File ~/Workspace/xorbits/python/xorbits/_mars/services/task/execution/mars/stage.py:233, in TaskStageProcessor.run(self)
    229     if self.subtask_graph.num_shuffles() > 0:
    230         # disable scale-in when shuffle is executing so that we can skip
    231         # store shuffle meta in supervisor.
    232         await self._scheduling_api.disable_autoscale_in()
--> 233     return await self._run()
    234 finally:
    235     if self.subtask_graph.num_shuffles() > 0:

File ~/Workspace/xorbits/python/xorbits/_mars/services/task/execution/mars/stage.py:253, in TaskStageProcessor._run(self)
    251 if self.error_or_cancelled():
    252     if self.result.error is not None:
--> 253         raise self.result.error.with_traceback(self.result.traceback)
    254     else:
    255         raise asyncio.CancelledError()

File ~/Workspace/xorbits/python/xorbits/_mars/services/subtask/worker/processor.py:212, in _execute_operand()
    207 @enter_mode(build=False, kernel=True)
    208 def _execute_operand(
    209     self, ctx: Dict[str, Any], op: OperandType
    210 ):  # noqa: R0201  # pylint: disable=no-self-use
    211     try:
--> 212         return execute(ctx, op)
    213     except BaseException as ex:
    214         # wrap exception in execution to avoid side effects
    215         raise ExecutionError(ex).with_traceback(ex.__traceback__) from None

File ~/Workspace/xorbits/python/xorbits/_mars/core/operand/core.py:492, in execute()
    488 else:
    489     # Cast `UFuncTypeError` to `TypeError` since subclasses of the former is unpickleable.
    490     # The `UFuncTypeError` was introduced by numpy#12593 since v1.17.0.
    491     try:
--> 492         result = executor(results, op)
    493         succeeded = True
    494         if op.stage is not None:

File ~/Workspace/xorbits/python/xorbits/_mars/core/custom_log.py:95, in wrap()
     92 custom_log_dir = ctx.new_custom_log_dir()
     94 if custom_log_dir is None:
---> 95     return func(cls, ctx, op)
     97 log_path = os.path.join(custom_log_dir, op.key)
     99 with _LogWrapper(ctx, op, log_path):

File ~/Workspace/xorbits/python/xorbits/_mars/utils.py:1234, in wrapped()
   1231     _enter_counter += 1
   1233 try:
-> 1234     result = func(cls, ctx, op)
   1235 finally:
   1236     with AbstractSession._lock:

File ~/Workspace/xorbits/python/xorbits/_mars/dataframe/groupby/aggregation.py:1288, in execute()
   1283 @classmethod
   1284 @redirect_custom_log
   1285 @enter_current_session
   1286 def execute(cls, ctx, op: "DataFrameGroupByAgg"):
   1287     if op.stage == OperandStage.map:
-> 1288         cls._execute_map(ctx, op)
   1289     elif op.stage == OperandStage.combine:
   1290         cls._execute_combine(ctx, op)

File ~/Workspace/xorbits/python/xorbits/_mars/dataframe/groupby/aggregation.py:1108, in _execute_map()
   1106 input_obj = ret_map_groupbys[input_key]
   1107 if map_func_name == "custom_reduction":
-> 1108     agg_dfs.append(cls._do_custom_agg(raw_func_name, op, in_data))
   1109 else:
   1110     single_func = map_func_name == op.raw_func

File ~/Workspace/xorbits/python/xorbits/_mars/dataframe/groupby/aggregation.py:992, in _do_custom_agg()
    987 @staticmethod
    988 def _do_custom_agg(
    989     func_name: str, op: "DataFrameGroupByAgg", in_data: pd.DataFrame
    990 ) -> Union[pd.Series, pd.DataFrame]:
    991     if op.stage == OperandStage.map:
--> 992         return custom_agg_functions[func_name].execute_map(op, in_data)
    993     elif op.stage == OperandStage.combine:
    994         return custom_agg_functions[func_name].execute_combine(op, in_data)

KeyError: 'd'