Open qinxuye opened 10 months ago
Series with arrow string dtype rechunk failed。
To help us to reproduce this bug, please provide information below:
In [1]: import xorbits.pandas as pd In [2]: s = pd.Series(['a', 'b', 'd'], dtype='string[pyarrow]') In [3]: s Out[3]: 0 a 1 b 2 d dtype: string In [4]: s.rechunk(2) Out[4]: /Users/xuyeqin/Workspace/xorbits/python/xorbits/_mars/deploy/oscar/session.py:1924: UserWarning: No existing session found, creating a new local session now. warnings.warn(warning_msg) 2023-09-18 15:19:13,907 xorbits._mars.deploy.oscar.local 24904 WARNING Web service started at http://0.0.0.0:18711 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 100.00/100 [00:00<00:00, 35311.53it/s] --------------------------------------------------------------------------- AttributeError Traceback (most recent call last) File ~/miniconda3/lib/python3.9/site-packages/IPython/core/formatters.py:706, in PlainTextFormatter.__call__(self, obj) 699 stream = StringIO() 700 printer = pretty.RepresentationPrinter(stream, self.verbose, 701 self.max_width, self.newline, 702 max_seq_length=self.max_seq_length, 703 singleton_pprinters=self.singleton_printers, 704 type_pprinters=self.type_printers, 705 deferred_pprinters=self.deferred_printers) --> 706 printer.pretty(obj) 707 printer.flush() 708 return stream.getvalue() File ~/miniconda3/lib/python3.9/site-packages/IPython/lib/pretty.py:410, in RepresentationPrinter.pretty(self, obj) 407 return meth(obj, self, cycle) 408 if cls is not object \ 409 and callable(cls.__dict__.get('__repr__')): --> 410 return _repr_pprint(obj, self, cycle) 412 return _default_pprint(obj, self, cycle) 413 finally: File ~/miniconda3/lib/python3.9/site-packages/IPython/lib/pretty.py:778, in _repr_pprint(obj, p, cycle) 776 """A pprint that just redirects to the normal repr function.""" 777 # Find newlines and replace them with p.break_() --> 778 output = repr(obj) 779 lines = output.splitlines() 780 with p.group(): File ~/Workspace/xorbits/python/xorbits/utils.py:38, in safe_repr_str.<locals>.inn(self, *args, **kwargs) 36 return getattr(object, f.__name__)(self) 37 else: ---> 38 return f(self, *args, **kwargs) File ~/Workspace/xorbits/python/xorbits/core/data.py:310, in DataRef.__repr__(self) 308 return self.data._mars_entity.op.data.__repr__() 309 else: --> 310 run(self) 311 return self.data.__repr__() File ~/Workspace/xorbits/python/xorbits/core/execution.py:55, in run(obj, **kwargs) 53 mars_tileables = [_get_mars_entity(ref) for ref in refs_to_execute.values()] 54 if mars_tileables: ---> 55 mars_execute(mars_tileables, **kwargs) File ~/Workspace/xorbits/python/xorbits/_mars/deploy/oscar/session.py:1760, in execute(tileable, session, wait, new_session_kwargs, show_progress, progress_update_interval, *tileables, **kwargs) 1758 session = get_default_or_create(**(new_session_kwargs or dict())) 1759 session = _ensure_sync(session) -> 1760 return session.execute( 1761 tileable, 1762 *tileables, 1763 wait=wait, 1764 show_progress=show_progress, 1765 progress_update_interval=progress_update_interval, 1766 **kwargs, 1767 ) File ~/Workspace/xorbits/python/xorbits/_mars/deploy/oscar/session.py:1576, in SyncSession.execute(self, tileable, show_progress, warn_duplicated_execution, *tileables, **kwargs) 1574 fut = asyncio.run_coroutine_threadsafe(coro, self._loop) 1575 try: -> 1576 execution_info: ExecutionInfo = fut.result( 1577 timeout=self._isolated_session.timeout 1578 ) 1579 except KeyboardInterrupt: # pragma: no cover 1580 logger.warning("Cancelling running task") File ~/miniconda3/lib/python3.9/concurrent/futures/_base.py:446, in Future.result(self, timeout) 444 raise CancelledError() 445 elif self._state == FINISHED: --> 446 return self.__get_result() 447 else: 448 raise TimeoutError() File ~/miniconda3/lib/python3.9/concurrent/futures/_base.py:391, in Future.__get_result(self) 389 if self._exception: 390 try: --> 391 raise self._exception 392 finally: 393 # Break a reference cycle with the exception in self._exception 394 self = None File ~/Workspace/xorbits/python/xorbits/_mars/deploy/oscar/session.py:1740, in _execute(session, wait, show_progress, progress_update_interval, cancelled, *tileables, **kwargs) 1737 else: 1738 # set cancelled to avoid wait task leak 1739 cancelled.set() -> 1740 await execution_info 1741 else: 1742 return execution_info File ~/Workspace/xorbits/python/xorbits/_mars/deploy/oscar/session.py:124, in ExecutionInfo._ensure_future.<locals>.wait() 123 async def wait(): --> 124 return await self._aio_task File ~/Workspace/xorbits/python/xorbits/_mars/deploy/oscar/session.py:873, in _IsolatedSession._run_in_background(self, tileables, task_id, progress, profiling) 867 logger.warning( 868 "Profile task %s execution result:\n%s", 869 task_id, 870 json.dumps(task_result.profiling, indent=4), 871 ) 872 if task_result.error: --> 873 raise task_result.error.with_traceback(task_result.traceback) 874 if cancelled: 875 return File ~/Workspace/xorbits/python/xorbits/_mars/services/task/supervisor/processor.py:387, in TaskProcessor.run(self) 382 self._tileable_id_to_tileable = await asyncio.to_thread( 383 self._get_tileable_id_to_tileable, self._preprocessor.tileable_graph 384 ) 386 async with self._executor: --> 387 async for stage_args in self._iter_stage_chunk_graph(): 388 await self._process_stage_chunk_graph(*stage_args) 389 await self._task_info_collector.collect_result_nodes( 390 self._task, self._subtask_graphs 391 ) File ~/Workspace/xorbits/python/xorbits/_mars/services/task/supervisor/processor.py:171, in TaskProcessor._iter_stage_chunk_graph(self) 169 with Timer() as stage_timer: 170 with Timer() as timer: --> 171 chunk_graph = await self._get_next_chunk_graph(chunk_graph_iter) 172 if chunk_graph is None: 173 # tile finished 174 self._preprocessor.done = True File ~/Workspace/xorbits/python/xorbits/_mars/services/task/supervisor/processor.py:162, in TaskProcessor._get_next_chunk_graph(chunk_graph_iter) 159 return 161 fut = asyncio.to_thread(next_chunk_graph) --> 162 chunk_graph = await fut 163 return chunk_graph File ~/miniconda3/lib/python3.9/asyncio/threads.py:25, in to_thread(func, *args, **kwargs) 23 ctx = contextvars.copy_context() 24 func_call = functools.partial(ctx.run, func, *args, **kwargs) ---> 25 return await loop.run_in_executor(None, func_call) File ~/miniconda3/lib/python3.9/concurrent/futures/thread.py:58, in _WorkItem.run(self) 55 return 57 try: ---> 58 result = self.fn(*self.args, **self.kwargs) 59 except BaseException as exc: 60 self.future.set_exception(exc) File ~/Workspace/xorbits/python/xorbits/_mars/services/task/supervisor/processor.py:157, in TaskProcessor._get_next_chunk_graph.<locals>.next_chunk_graph() 155 def next_chunk_graph(): 156 try: --> 157 return next(chunk_graph_iter) 158 except StopIteration: 159 return File ~/Workspace/xorbits/python/xorbits/_mars/services/task/supervisor/preprocessor.py:201, in TaskPreprocessor.tile(self, tileable_graph) 199 if hasattr(t.op, "logic_key") and t.op.logic_key is None: 200 t.op.logic_key = t.op.get_logic_key() --> 201 for chunk_graph in chunk_graph_builder.build(): 202 if len(chunk_graph) == 0: 203 continue File ~/Workspace/xorbits/python/xorbits/_mars/core/graph/builder/chunk.py:431, in ChunkGraphBuilder.build(self) 430 def build(self) -> Generator[Union[TileableGraph, ChunkGraph], None, None]: --> 431 yield from self._build() File ~/Workspace/xorbits/python/xorbits/_mars/core/graph/builder/chunk.py:425, in ChunkGraphBuilder._build(self) 423 try: 424 with enter_mode(build=True, kernel=True): --> 425 graph = next(tile_iterator) 426 yield graph 427 except StopIteration: File ~/Workspace/xorbits/python/xorbits/_mars/services/task/supervisor/preprocessor.py:75, in CancellableTiler._iter_without_check(self) 73 def _iter_without_check(self): 74 while self._tileable_handlers: ---> 75 to_update_tileables = self._iter() 76 if not self.cancelled: 77 yield self._cur_chunk_graph File ~/Workspace/xorbits/python/xorbits/_mars/core/graph/builder/chunk.py:308, in Tiler._iter(self) 306 # tile 307 for tile_handler in self._gen_tileable_handlers(next_tileable_handlers): --> 308 self._tile( 309 chunk_graph, 310 tile_handler.tileable, 311 tile_handler.handler, 312 next_tileable_handlers, 313 to_update_tileables, 314 visited, 315 ) 316 self._tileable_handlers = next_tileable_handlers 317 # gen result chunks File ~/Workspace/xorbits/python/xorbits/_mars/core/graph/builder/chunk.py:201, in Tiler._tile(self, chunk_graph, tileable, tile_handler, next_tileable_handlers, to_update_tileables, visited) 191 def _tile( 192 self, 193 chunk_graph: ChunkGraph, (...) 198 visited: Set[EntityType], 199 ): 200 try: --> 201 need_process = next(tile_handler) 203 if isinstance(need_process, TileStatus): 204 # process tile that returns progress 205 self._tile_context.set_progress(tileable, need_process.progress) File ~/Workspace/xorbits/python/xorbits/_mars/core/graph/builder/chunk.py:173, in Tiler._tile_handler(self, tileable) 171 tiled_tileables = [self._get_data(t) for t in tiled_tileables] 172 # start to tile --> 173 tiled_tileables = yield from handler.tile(tiled_tileables) 174 return tiled_tileables File ~/Workspace/xorbits/python/xorbits/_mars/core/entity/tileables.py:80, in OperandTilesHandler.tile(cls, tileables) 74 tile_handler = cls.get_handler(op) 75 if inspect.isgeneratorfunction(tile_handler): 76 # op.tile can be a generator function, 77 # each time an operand yield some chunks, 78 # they will be put into ChunkGraph and executed first. 79 # After execution, resume from the yield place. ---> 80 tiled_result = yield from tile_handler(op) 81 else: 82 # without iterative tiling 83 tiled_result = tile_handler(op) File ~/Workspace/xorbits/python/xorbits/_mars/dataframe/base/rechunk.py:83, in DataFrameRechunk.tile(cls, op) 81 else: 82 inp = asindex(inp) ---> 83 chunk_size = _get_chunk_size(inp, op.chunk_size) 84 if chunk_size == inp.nsplits: 85 return [inp] File ~/Workspace/xorbits/python/xorbits/_mars/dataframe/base/rechunk.py:190, in _get_chunk_size(a, chunk_size) 188 itemsize = max(getattr(dt, "itemsize", 8) for dt in a.dtypes) 189 else: --> 190 itemsize = a.dtype.itemsize 191 return get_nsplits(a, chunk_size, itemsize) AttributeError: 'StringDtype' object has no attribute 'itemsize'
Describe the bug
Series with arrow string dtype rechunk failed。
To Reproduce
To help us to reproduce this bug, please provide information below: