xorbitsai / xorbits

Scalable Python DS & ML, in an API compatible & lightning fast way.
https://xorbits.readthedocs.io
Apache License 2.0
1.11k stars 67 forks source link

FEAT: Raise clearer errors when encountering OOM #562

Closed aresnow1 closed 1 year ago

aresnow1 commented 1 year ago

Is your feature request related to a problem? Please describe

Raise clearer error message to tell users what happens when encountering OOM.

here is what Xorbits raises now:

In [1]: import xorbits.remote as xr

In [2]: import numpy as np

In [3]: print(xr.spawn(lambda *_: np.random.rand(10**6, 10**5)))
/Users/hekaisheng/Documents/projects/xorbits/python/xorbits/_mars/deploy/oscar/session.py:1873: UserWarning: No existing session found, creating a new local session now.
  warnings.warn(warning_msg)
2023-07-03 14:49:14,591 xorbits._mars.deploy.oscar.local 44440 WARNING  Web service started at http://0.0.0.0:50554
  0%|                                                                                                                                                 |   0.00/100 [00:58<?, ?it/s]2023-07-03 14:50:14,032 xorbits._mars.services.scheduling.worker.execution 44440 ERROR    Run subtask failed due to Remote server unixsocket:///11649679360 closed, the subtask YyV4lvUSjJN14vo5kIJ6hQ7I is not retryable, it contains unretryable op:
[RemoteFunction <key=e2a7ed72aad3af929b833fceba76facd>]
2023-07-03 14:50:14,033 xorbits._mars.services.scheduling.worker.execution 44440 ERROR    Failed to run subtask YyV4lvUSjJN14vo5kIJ6hQ7I on band numa-0
Traceback (most recent call last):
  File "/Users/hekaisheng/Documents/projects/xorbits/python/xorbits/_mars/services/scheduling/worker/execution.py", line 402, in internal_run_subtask
    subtask_info.result = await self._retry_run_subtask(
  File "/Users/hekaisheng/Documents/projects/xorbits/python/xorbits/_mars/services/scheduling/worker/execution.py", line 520, in _retry_run_subtask
    raise wrap_exception(
  File "/Users/hekaisheng/Documents/projects/xorbits/python/xorbits/_mars/services/scheduling/worker/execution.py", line 506, in _retry_run_subtask
    return await _run_subtask_once()
  File "/Users/hekaisheng/Documents/projects/xorbits/python/xorbits/_mars/services/scheduling/worker/execution.py", line 481, in _run_subtask_once
    raise ex
  File "/Users/hekaisheng/Documents/projects/xorbits/python/xorbits/_mars/services/scheduling/worker/execution.py", line 445, in _run_subtask_once
    return await asyncio.shield(aiotask)
  File "/Users/hekaisheng/Documents/projects/xorbits/python/xorbits/_mars/services/subtask/api.py", line 70, in run_subtask_in_slot
    return await ref.run_subtask.options(profiling_context=profiling_context).send(
  File "/Users/hekaisheng/Documents/projects/xoscar/python/xoscar/backends/context.py", line 224, in send
    result = await self._wait(future, actor_ref.address, send_message)  # type: ignore
  File "/Users/hekaisheng/Documents/projects/xoscar/python/xoscar/backends/context.py", line 113, in _wait
    return await future
  File "/Users/hekaisheng/Documents/projects/xoscar/python/xoscar/backends/context.py", line 104, in _wait
    await asyncio.shield(future)
  File "/Users/hekaisheng/Documents/projects/xoscar/python/xoscar/backends/core.py", line 84, in _listen
    raise ServerClosed(
xoscar.errors.ServerClosed: Run subtask failed due to Remote server unixsocket:///11649679360 closed, the subtask YyV4lvUSjJN14vo5kIJ6hQ7I is not retryable, it contains unretryable op:
[RemoteFunction <key=e2a7ed72aad3af929b833fceba76facd>]
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 100.00/100 [00:59<00:00,  1.68it/s]
---------------------------------------------------------------------------
ServerClosed                              Traceback (most recent call last)
Cell In[3], line 1
----> 1 print(xr.spawn(lambda *_: np.random.rand(10**6, 10**5)))

File ~/Documents/projects/xorbits/python/xorbits/utils.py:37, in safe_repr_str.<locals>.inn(self, *args, **kwargs)
     35     return getattr(object, f.__name__)(self)
     36 else:
---> 37     return f(self, *args, **kwargs)

File ~/Documents/projects/xorbits/python/xorbits/core/data.py:278, in DataRef.__str__(self)
    276     return self.data._mars_entity.op.data.__str__()
    277 else:
--> 278     run(self)
    279     return self.data.__str__()

File ~/Documents/projects/xorbits/python/xorbits/core/execution.py:55, in run(obj, **kwargs)
     53 mars_tileables = [_get_mars_entity(ref) for ref in refs_to_execute.values()]
     54 if mars_tileables:
---> 55     mars_execute(mars_tileables, **kwargs)

File ~/Documents/projects/xorbits/python/xorbits/_mars/deploy/oscar/session.py:1709, in execute(tileable, session, wait, new_session_kwargs, show_progress, progress_update_interval, *tileables, **kwargs)
   1707     session = get_default_or_create(**(new_session_kwargs or dict()))
   1708 session = _ensure_sync(session)
-> 1709 return session.execute(
   1710     tileable,
   1711     *tileables,
   1712     wait=wait,
   1713     show_progress=show_progress,
   1714     progress_update_interval=progress_update_interval,
   1715     **kwargs,
   1716 )

File ~/Documents/projects/xorbits/python/xorbits/_mars/deploy/oscar/session.py:1526, in SyncSession.execute(self, tileable, show_progress, warn_duplicated_execution, *tileables, **kwargs)
   1524 fut = asyncio.run_coroutine_threadsafe(coro, self._loop)
   1525 try:
-> 1526     execution_info: ExecutionInfo = fut.result(
   1527         timeout=self._isolated_session.timeout
   1528     )
   1529 except KeyboardInterrupt:  # pragma: no cover
   1530     logger.warning("Cancelling running task")

File ~/miniconda3/lib/python3.9/concurrent/futures/_base.py:446, in Future.result(self, timeout)
    444     raise CancelledError()
    445 elif self._state == FINISHED:
--> 446     return self.__get_result()
    447 else:
    448     raise TimeoutError()

File ~/miniconda3/lib/python3.9/concurrent/futures/_base.py:391, in Future.__get_result(self)
    389 if self._exception:
    390     try:
--> 391         raise self._exception
    392     finally:
    393         # Break a reference cycle with the exception in self._exception
    394         self = None

File ~/Documents/projects/xorbits/python/xorbits/_mars/deploy/oscar/session.py:1689, in _execute(session, wait, show_progress, progress_update_interval, cancelled, *tileables, **kwargs)
   1686     else:
   1687         # set cancelled to avoid wait task leak
   1688         cancelled.set()
-> 1689     await execution_info
   1690 else:
   1691     return execution_info

File ~/Documents/projects/xorbits/python/xorbits/_mars/deploy/oscar/session.py:102, in ExecutionInfo._ensure_future.<locals>.wait()
    101 async def wait():
--> 102     return await self._aio_task

File ~/Documents/projects/xorbits/python/xorbits/_mars/deploy/oscar/session.py:848, in _IsolatedSession._run_in_background(self, tileables, task_id, progress, profiling)
    842         logger.warning(
    843             "Profile task %s execution result:\n%s",
    844             task_id,
    845             json.dumps(task_result.profiling, indent=4),
    846         )
    847     if task_result.error:
--> 848         raise task_result.error.with_traceback(task_result.traceback)
    849 if cancelled:
    850     return

File ~/Documents/projects/xorbits/python/xorbits/_mars/services/task/supervisor/processor.py:388, in TaskProcessor.run(self)
    386 async with self._executor:
    387     async for stage_args in self._iter_stage_chunk_graph():
--> 388         await self._process_stage_chunk_graph(*stage_args)
    389 await self._task_info_collector.collect_result_nodes(
    390     self._task, self._subtask_graphs
    391 )
    392 await self._task_info_collector.collect_tileable_structure(
    393     self._task, self.get_tileable_to_subtasks()
    394 )

File ~/Documents/projects/xorbits/python/xorbits/_mars/services/task/supervisor/processor.py:265, in TaskProcessor._process_stage_chunk_graph(self, stage_id, stage_profiler, chunk_graph)
    259 tile_context = await asyncio.to_thread(
    260     self._get_stage_tile_context,
    261     {c for c in chunk_graph.result_chunks if not isinstance(c.op, Fetch)},
    262 )
    264 with Timer() as timer:
--> 265     chunk_to_result = await self._executor.execute_subtask_graph(
    266         stage_id, subtask_graph, chunk_graph, tile_context
    267     )
    268 stage_profiler.set("run", timer.duration)
    270 self._preprocessor.post_chunk_graph_execution()

File ~/Documents/projects/xorbits/python/xorbits/_mars/services/task/execution/mars/executor.py:203, in MarsTaskExecutor.execute_subtask_graph(self, stage_id, subtask_graph, chunk_graph, tile_context, context)
    201 curr_tile_progress = self._tile_context.get_all_progress() - prev_progress
    202 self._stage_tile_progresses.append(curr_tile_progress)
--> 203 return await stage_processor.run()

File ~/Documents/projects/xorbits/python/xorbits/_mars/services/task/execution/mars/stage.py:233, in TaskStageProcessor.run(self)
    229     if self.subtask_graph.num_shuffles() > 0:
    230         # disable scale-in when shuffle is executing so that we can skip
    231         # store shuffle meta in supervisor.
    232         await self._scheduling_api.disable_autoscale_in()
--> 233     return await self._run()
    234 finally:
    235     if self.subtask_graph.num_shuffles() > 0:

File ~/Documents/projects/xorbits/python/xorbits/_mars/services/task/execution/mars/stage.py:253, in TaskStageProcessor._run(self)
    251 if self.error_or_cancelled():
    252     if self.result.error is not None:
--> 253         raise self.result.error.with_traceback(self.result.traceback)
    254     else:
    255         raise asyncio.CancelledError()

File ~/Documents/projects/xorbits/python/xorbits/_mars/services/scheduling/worker/execution.py:402, in SubtaskExecutionActor.internal_run_subtask(self, subtask, band_name)
    400     batch_quota_req = {(subtask.session_id, subtask.subtask_id): calc_size}
    401     logger.debug("Start actual running of subtask %s", subtask.subtask_id)
--> 402     subtask_info.result = await self._retry_run_subtask(
    403         subtask, band_name, subtask_api, batch_quota_req
    404     )
    405 except:  # noqa: E722  # pylint: disable=bare-except
    406     _fill_subtask_result_with_exception(subtask, subtask_info)

File ~/Documents/projects/xorbits/python/xorbits/_mars/services/scheduling/worker/execution.py:520, in SubtaskExecutionActor._retry_run_subtask(self, subtask, band_name, subtask_api, batch_quota_req)
    513 message = (
    514     f"Run subtask failed due to {e}, the subtask {subtask.subtask_id} is "
    515     f"not retryable, it contains unretryable op: \n"
    516     f"{pprint.pformat(unretryable_op)}"
    517 )
    518 logger.error(message)
--> 520 raise wrap_exception(
    521     e, wrap_name="_UnretryableException", message=message
    522 )

File ~/Documents/projects/xorbits/python/xorbits/_mars/services/scheduling/worker/execution.py:506, in SubtaskExecutionActor._retry_run_subtask(self, subtask, band_name, subtask_api, batch_quota_req)
    504 else:
    505     try:
--> 506         return await _run_subtask_once()
    507     except Exception as e:
    508         unretryable_op = [
    509             chunk.op
    510             for chunk in subtask.chunk_graph
    511             if not getattr(chunk.op, "retryable", True)
    512         ]

File ~/Documents/projects/xorbits/python/xorbits/_mars/services/scheduling/worker/execution.py:481, in SubtaskExecutionActor._retry_run_subtask.<locals>._run_subtask_once()
    479         sub_pool_address = await slot_manager_ref.get_slot_address(slot_id)
    480         await mo.wait_actor_pool_recovered(sub_pool_address, self.address)
--> 481     raise ex
    482 finally:
    483     # make sure allocated slots are traced
    484     if slot_id is None:  # pragma: no cover

File ~/Documents/projects/xorbits/python/xorbits/_mars/services/scheduling/worker/execution.py:445, in SubtaskExecutionActor._retry_run_subtask.<locals>._run_subtask_once()
    441     subtask_info.result.status = SubtaskStatus.running
    442     aiotask = asyncio.create_task(
    443         subtask_api.run_subtask_in_slot(band_name, slot_id, subtask)
    444     )
--> 445     return await asyncio.shield(aiotask)
    446 except asyncio.CancelledError as ex:
    447     try:

File ~/Documents/projects/xorbits/python/xorbits/_mars/services/subtask/api.py:70, in SubtaskAPI.run_subtask_in_slot(self, band_name, slot_id, subtask)
     64 enable_profiling = XOSCAR_ENABLE_PROFILING or (
     65     extra_config and extra_config.get("enable_profiling")
     66 )
     67 profiling_context = (
     68     ProfilingContext(task_id=subtask.task_id) if enable_profiling else None
     69 )
---> 70 return await ref.run_subtask.options(profiling_context=profiling_context).send(
     71     subtask
     72 )

File ~/Documents/projects/xoscar/python/xoscar/backends/context.py:224, in IndigenActorContext.send(self, actor_ref, message, wait_response, profiling_context)
    222 future = await self._call(actor_ref.address, send_message, wait=False)
    223 if wait_response:
--> 224     result = await self._wait(future, actor_ref.address, send_message)  # type: ignore
    225     return self._process_result_message(result)
    226 else:

File ~/Documents/projects/xoscar/python/xoscar/backends/context.py:113, in IndigenActorContext._wait(self, future, address, message)
    111 except:  # noqa: E722  # nosec  # pylint: disable=bare-except
    112     pass
--> 113 return await future

File ~/Documents/projects/xoscar/python/xoscar/backends/context.py:104, in IndigenActorContext._wait(self, future, address, message)
    102 async def _wait(self, future: asyncio.Future, address: str, message: _MessageBase):
    103     try:
--> 104         await asyncio.shield(future)
    105     except asyncio.CancelledError:
    106         try:

File ~/Documents/projects/xoscar/python/xoscar/backends/core.py:84, in ActorCaller._listen(self, client)
     81     except (ConnectionError, BrokenPipeError):
     82         # close failed, ignore it
     83         pass
---> 84     raise ServerClosed(
     85         f"Remote server {client.dest_address} closed"
     86     ) from None
     87 future = self._client_to_message_futures[client].pop(message.message_id)
     88 future.set_result(message)

ServerClosed: Run subtask failed due to Remote server unixsocket:///11649679360 closed, the subtask YyV4lvUSjJN14vo5kIJ6hQ7I is not retryable, it contains unretryable op:
[RemoteFunction <key=e2a7ed72aad3af929b833fceba76facd>]

ServerClosed is confused for users.