mars-project / mars

Mars is a tensor-based unified framework for large-scale data computation which scales numpy, pandas, scikit-learn and Python functions.
https://mars-project.readthedocs.io
Apache License 2.0
2.69k stars 325 forks source link

[BUG]The index of the merge result is not as expected #2184

Open zebivy opened 3 years ago

zebivy commented 3 years ago

Describe the bug The index of the merge result is not as expected.

To Reproduce To help us reproducing this bug, please provide information below:

  1. Your Python version: 3.9.0
  2. The version of Mars you use: latest master
  3. The test data is generated according to https://github.com/h2oai/db-benchmark
  4. Minimized code to reproduce the error:
    md_x = md.read_csv("J1_1e7_NA_0_0.csv",
                       dtype={'id1': 'Int32', 'id2': 'Int32', 'id3': 'Int32', 'id4': 'category', 'id5': 'category',
                              'id6': 'category', 'v1': 'float64'}).execute()
    md_small = md.read_csv("J1_1e7_1e1_0_0.csv",
                       dtype={'id1': 'Int32', 'id4': 'category', 'v2': 'float64'}).execute()
    md_merge_small = md_x.merge(md_small, on='id1', sort=True).execute()
    print(md_merge_small.shape)
    print(md_merge_small.tail(3).execute())
    print(md_merge_small.loc[0].execute()) # error

Wrong result

(8998860, 9)
         id1   id2      id3 id4_x     id5        id6         v1 id4_y        v2
1147824   11  4777  7403727  id11  id4777  id7403727  59.203235  id11  3.118364
1147825   11  2811  6143417  id11  id2811  id6143417   7.507115  id11  3.118364
1147826   11  9868  2265587  id11  id9868  id2265587  69.899032  id11  3.118364

The output of shape is correct, but the index obtained by tail() is not as expected. When try to get the value by loc[0] , "Failed to run subtask ..." is reported.

Process finished with exit code 1

**Expected result**

(8998860, 9) id1 id2 id3 id4_x id5 id6 v1 id4_y v2 8998857 11 4777 7403727 id11 id4777 id7403727 59.203235 id11 3.118364 8998858 11 2811 6143417 id11 id2811 id6143417 7.507115 id11 3.118364 8998859 11 9868 2265587 id11 id9868 id2265587 69.899032 id11 3.118364 id1 3 id2 6139 id3 7725507 id4_x id3 id5 id6139 id6 id7725507 v1 56.095139 id4_y id3 v2 78.482651 Name: 0, dtype: object

wjsi commented 3 years ago

Mars does not hold the scenario when index key n have multiple rows and df.loc[n] produces a DataFrame. This will be fixed later.

hekaisheng commented 3 years ago

Minimal reproduction:

In [4]: import mars.dataframe as md                                                                 

In [5]: import pandas as pd                                                                         

In [6]: a = pd.DataFrame([1,2,3], index=[1,1,2])                                                    

In [7]: a                                                                                           
Out[7]: 
   0
1  1
1  2
2  3

In [8]: md.DataFrame(a).loc[1].execute()                                                            
/Users/hekaisheng/Documents/mars_dev/mars/mars/core/session.py:577: UserWarning: No session found, local session will be created in the background, it may take a while before execution. If you want to new a local session by yourself, run code below:

  warnings.warn(warning_msg)
WARNING:bokeh.server.util:Host wildcard '*' will allow connections originating from multiple (or possibly all) hostnames or IPs. Use non-wildcard values to restrict access explicitly
  0%|                                                                       | 0/100 [00:00<?, ?it/s]Failed to run subtask UuWckalRy6lVJTfr9Uv1vPFF on band numa-0
Traceback (most recent call last):
  File "/Users/hekaisheng/Documents/mars_dev/mars/mars/services/scheduling/worker/execution.py", line 223, in internal_run_subtask
    yield asyncio.shield(run_aiotask)
  File "mars/oscar/core.pyx", line 273, in mars.oscar.core._Actor._run_actor_async_generator
  File "mars/oscar/core.pyx", line 208, in _handle_actor_result
  File "/Users/hekaisheng/Documents/mars_dev/mars/mars/services/subtask/api.py", line 59, in run_subtask_in_slot
    return await ref.run_subtask(subtask)
  File "/Users/hekaisheng/Documents/mars_dev/mars/mars/oscar/backends/context.py", line 154, in send
    return self._process_result_message(result)
  File "/Users/hekaisheng/Documents/mars_dev/mars/mars/oscar/backends/context.py", line 59, in _process_result_message
    raise message.error.with_traceback(message.traceback)
  File "/Users/hekaisheng/Documents/mars_dev/mars/mars/oscar/backends/pool.py", line 492, in send
    result = await future
  File "/Users/hekaisheng/Documents/mars_dev/mars/mars/oscar/api.py", line 111, in __on_receive__
    return await super().__on_receive__(message)
  File "mars/oscar/core.pyx", line 333, in __on_receive__
  File "mars/oscar/core.pyx", line 327, in mars.oscar.core._Actor.__on_receive__
  File "mars/oscar/core.pyx", line 239, in _handle_actor_result
  File "mars/oscar/core.pyx", line 264, in _run_actor_async_generator
  File "mars/oscar/core.pyx", line 268, in mars.oscar.core._Actor._run_actor_async_generator
  File "/Users/hekaisheng/Documents/mars_dev/mars/mars/services/subtask/worker/runner.py", line 90, in run_subtask
    result = yield self._running_processor.run(subtask)
  File "mars/oscar/core.pyx", line 273, in mars.oscar.core._Actor._run_actor_async_generator
  File "mars/oscar/core.pyx", line 208, in _handle_actor_result
  File "/Users/hekaisheng/Documents/mars_dev/mars/mars/oscar/backends/context.py", line 154, in send
    return self._process_result_message(result)
  File "/Users/hekaisheng/Documents/mars_dev/mars/mars/oscar/backends/context.py", line 59, in _process_result_message
    raise message.error.with_traceback(message.traceback)
  File "/Users/hekaisheng/Documents/mars_dev/mars/mars/oscar/backends/pool.py", line 492, in send
    result = await future
  File "/Users/hekaisheng/Documents/mars_dev/mars/mars/oscar/api.py", line 111, in __on_receive__
    return await super().__on_receive__(message)
  File "mars/oscar/core.pyx", line 333, in __on_receive__
  File "mars/oscar/core.pyx", line 327, in mars.oscar.core._Actor.__on_receive__
  File "mars/oscar/core.pyx", line 239, in _handle_actor_result
  File "mars/oscar/core.pyx", line 264, in _run_actor_async_generator
  File "mars/oscar/core.pyx", line 268, in mars.oscar.core._Actor._run_actor_async_generator
  File "/Users/hekaisheng/Documents/mars_dev/mars/mars/services/subtask/worker/processor.py", line 465, in run
    result = yield self._running_aio_task
  File "mars/oscar/core.pyx", line 273, in mars.oscar.core._Actor._run_actor_async_generator
  File "mars/oscar/core.pyx", line 208, in _handle_actor_result
  File "/Users/hekaisheng/Documents/mars_dev/mars/mars/services/subtask/worker/processor.py", line 360, in run
    stored_keys, store_sizes, memory_sizes = await self._store_data(chunk_graph)
  File "/Users/hekaisheng/Documents/mars_dev/mars/mars/services/subtask/worker/processor.py", line 249, in _store_data
    result_chunk.params = result_chunk.get_params_from_data(result_data)
  File "/Users/hekaisheng/Documents/mars_dev/mars/mars/dataframe/core.py", line 881, in get_params_from_data
    'dtype': data.dtype,
  File "/Users/hekaisheng/miniconda3/lib/python3.7/site-packages/pandas/core/generic.py", line 5460, in __getattr__
    return object.__getattribute__(self, name)
AttributeError: 'DataFrame' object has no attribute 'dtype'
Exception in callback _execute.<locals>._attach_session(<Task finishe...ute 'dtype'")>) at /Users/hekaisheng/Documents/mars_dev/mars/mars/core/session.py:611
handle: <Handle _execute.<locals>._attach_session(<Task finishe...ute 'dtype'")>) at /Users/hekaisheng/Documents/mars_dev/mars/mars/core/session.py:611>
Traceback (most recent call last):
  File "/Users/hekaisheng/miniconda3/lib/python3.7/asyncio/events.py", line 88, in _run
    self._context.run(self._callback, *self._args)
  File "/Users/hekaisheng/Documents/mars_dev/mars/mars/core/session.py", line 612, in _attach_session
    fut.result()
  File "/Users/hekaisheng/Documents/mars_dev/mars/mars/deploy/oscar/session.py", line 160, in _run_in_background
    raise task_result.error.with_traceback(task_result.traceback)
  File "/Users/hekaisheng/Documents/mars_dev/mars/mars/services/scheduling/worker/execution.py", line 223, in internal_run_subtask
    yield asyncio.shield(run_aiotask)
  File "mars/oscar/core.pyx", line 273, in mars.oscar.core._Actor._run_actor_async_generator
  File "mars/oscar/core.pyx", line 208, in _handle_actor_result
  File "/Users/hekaisheng/Documents/mars_dev/mars/mars/services/subtask/api.py", line 59, in run_subtask_in_slot
    return await ref.run_subtask(subtask)
  File "/Users/hekaisheng/Documents/mars_dev/mars/mars/oscar/backends/context.py", line 154, in send
    return self._process_result_message(result)
  File "/Users/hekaisheng/Documents/mars_dev/mars/mars/oscar/backends/context.py", line 59, in _process_result_message
    raise message.error.with_traceback(message.traceback)
  File "/Users/hekaisheng/Documents/mars_dev/mars/mars/oscar/backends/pool.py", line 492, in send
    result = await future
  File "/Users/hekaisheng/Documents/mars_dev/mars/mars/oscar/api.py", line 111, in __on_receive__
    return await super().__on_receive__(message)
  File "mars/oscar/core.pyx", line 333, in __on_receive__
  File "mars/oscar/core.pyx", line 327, in mars.oscar.core._Actor.__on_receive__
  File "mars/oscar/core.pyx", line 239, in _handle_actor_result
  File "mars/oscar/core.pyx", line 264, in _run_actor_async_generator
  File "mars/oscar/core.pyx", line 268, in mars.oscar.core._Actor._run_actor_async_generator
  File "/Users/hekaisheng/Documents/mars_dev/mars/mars/services/subtask/worker/runner.py", line 90, in run_subtask
    result = yield self._running_processor.run(subtask)
  File "mars/oscar/core.pyx", line 273, in mars.oscar.core._Actor._run_actor_async_generator
  File "mars/oscar/core.pyx", line 208, in _handle_actor_result
  File "/Users/hekaisheng/Documents/mars_dev/mars/mars/oscar/backends/context.py", line 154, in send
    return self._process_result_message(result)
  File "/Users/hekaisheng/Documents/mars_dev/mars/mars/oscar/backends/context.py", line 59, in _process_result_message
    raise message.error.with_traceback(message.traceback)
  File "/Users/hekaisheng/Documents/mars_dev/mars/mars/oscar/backends/pool.py", line 492, in send
    result = await future
  File "/Users/hekaisheng/Documents/mars_dev/mars/mars/oscar/api.py", line 111, in __on_receive__
    return await super().__on_receive__(message)
  File "mars/oscar/core.pyx", line 333, in __on_receive__
  File "mars/oscar/core.pyx", line 327, in mars.oscar.core._Actor.__on_receive__
  File "mars/oscar/core.pyx", line 239, in _handle_actor_result
  File "mars/oscar/core.pyx", line 264, in _run_actor_async_generator
  File "mars/oscar/core.pyx", line 268, in mars.oscar.core._Actor._run_actor_async_generator
  File "/Users/hekaisheng/Documents/mars_dev/mars/mars/services/subtask/worker/processor.py", line 465, in run
    result = yield self._running_aio_task
  File "mars/oscar/core.pyx", line 273, in mars.oscar.core._Actor._run_actor_async_generator
  File "mars/oscar/core.pyx", line 208, in _handle_actor_result
  File "/Users/hekaisheng/Documents/mars_dev/mars/mars/services/subtask/worker/processor.py", line 360, in run
    stored_keys, store_sizes, memory_sizes = await self._store_data(chunk_graph)
  File "/Users/hekaisheng/Documents/mars_dev/mars/mars/services/subtask/worker/processor.py", line 249, in _store_data
    result_chunk.params = result_chunk.get_params_from_data(result_data)
  File "/Users/hekaisheng/Documents/mars_dev/mars/mars/dataframe/core.py", line 881, in get_params_from_data
    'dtype': data.dtype,
  File "/Users/hekaisheng/miniconda3/lib/python3.7/site-packages/pandas/core/generic.py", line 5460, in __getattr__
    return object.__getattribute__(self, name)
AttributeError: 'DataFrame' object has no attribute 'dtype'
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-8-7a505571b1b6> in <module>
----> 1 md.DataFrame(a).loc[1].execute()

~/Documents/mars_dev/mars/mars/core/entity/tileables.py in execute(self, session, **kw)
    430 
    431     def execute(self, session=None, **kw):
--> 432         result = self.data.execute(session=session, **kw)
    433         if isinstance(result, TILEABLE_TYPE):
    434             return self

~/Documents/mars_dev/mars/mars/core/entity/executable.py in execute(self, session, **kw)
     78 
     79         session = _get_session(self, session)
---> 80         return execute(self, session=session, **kw)
     81 
     82     def _check_session(self,

~/Documents/mars_dev/mars/mars/core/session.py in execute(tileable, session, wait, backend, new_session_kwargs, show_progress, progress_update_interval, *tileables, **kwargs)
    679                                 show_progress=show_progress,
    680                                 progress_update_interval=progress_update_interval,
--> 681                                 **kwargs)
    682 
    683 

~/Documents/mars_dev/mars/mars/core/session.py in _inner(*args, **kwargs)
    742         fut = _pool.submit(run_in_thread)
    743         try:
--> 744             result, default_session_in_thread = fut.result()
    745         except KeyboardInterrupt:  # pragma: no cover
    746             logger.warning('Cancelling running task')

~/miniconda3/lib/python3.7/concurrent/futures/_base.py in result(self, timeout)
    433                 raise CancelledError()
    434             elif self._state == FINISHED:
--> 435                 return self.__get_result()
    436             else:
    437                 raise TimeoutError()

~/miniconda3/lib/python3.7/concurrent/futures/_base.py in __get_result(self)
    382     def __get_result(self):
    383         if self._exception:
--> 384             raise self._exception
    385         else:
    386             return self._result

~/miniconda3/lib/python3.7/concurrent/futures/thread.py in run(self)
     55 
     56         try:
---> 57             result = self.fn(*self.args, **self.kwargs)
     58         except BaseException as exc:
     59             self.future.set_exception(exc)

~/Documents/mars_dev/mars/mars/core/session.py in run_in_thread()
    738                 # set default session in this thread
    739                 _sync_default_session(default_session)
--> 740                 return func(*args, **kwargs), get_default_session()
    741 
    742         fut = _pool.submit(run_in_thread)

~/Documents/mars_dev/mars/mars/core/session.py in execute(self, tileable, show_progress, *tileables, **kwargs)
    776         execution_info = _loop.run_until_complete(_execute(
    777             *set(to_execute_tileables), session=self,
--> 778             show_progress=show_progress, **kwargs))
    779         if wait:
    780             return tileable if len(tileables) == 0 else \

~/miniconda3/lib/python3.7/asyncio/base_events.py in run_until_complete(self, future)
    581             raise RuntimeError('Event loop stopped before Future completed.')
    582 
--> 583         return future.result()
    584 
    585     def stop(self):

~/Documents/mars_dev/mars/mars/core/session.py in _execute(session, wait, show_progress, progress_update_interval, cancelled, *tileables, **kwargs)
    635                 try:
    636                     await asyncio.wait_for(asyncio.shield(execution_info),
--> 637                                            progress_update_interval)
    638                     # done
    639                     if not cancelled.is_set():

~/miniconda3/lib/python3.7/asyncio/tasks.py in wait_for(fut, timeout, loop)
    440 
    441         if fut.done():
--> 442             return fut.result()
    443         else:
    444             fut.remove_done_callback(cb)

~/miniconda3/lib/python3.7/asyncio/tasks.py in _wrap_awaitable(awaitable)
    628     that will later be wrapped in a Task by ensure_future().
    629     """
--> 630     return (yield from awaitable.__await__())
    631 
    632 

~/miniconda3/lib/python3.7/asyncio/events.py in _run(self)
     86     def _run(self):
     87         try:
---> 88             self._context.run(self._callback, *self._args)
     89         except Exception as exc:
     90             cb = format_helpers._format_callback_source(

~/Documents/mars_dev/mars/mars/core/session.py in _attach_session(fut)
    610 
    611     def _attach_session(fut: asyncio.Future):
--> 612         fut.result()
    613         for t in tileables:
    614             t._attach_session(session)

~/Documents/mars_dev/mars/mars/deploy/oscar/session.py in _run_in_background(self, tileables, task_id, progress)
    158                     await self._task_api.cancel_task(task_id)
    159             if task_result.error:
--> 160                 raise task_result.error.with_traceback(task_result.traceback)
    161             if cancelled:
    162                 return

~/Documents/mars_dev/mars/mars/services/scheduling/worker/execution.py in internal_run_subtask(self, subtask, band_name)
    221             run_aiotask = asyncio.create_task(
    222                 subtask_api.run_subtask_in_slot(band_name, slot_id, subtask))
--> 223             yield asyncio.shield(run_aiotask)
    224             if subtask_info.cancelling:
    225                 raise asyncio.CancelledError

~/Documents/mars_dev/mars/mars/oscar/core.pyx in mars.oscar.core._Actor._run_actor_async_generator()

~/Documents/mars_dev/mars/mars/oscar/core.pyx in _handle_actor_result()

~/Documents/mars_dev/mars/mars/services/subtask/api.py in run_subtask_in_slot(self, band_name, slot_id, subtask)
     57         """
     58         ref = await self._get_runner_ref(band_name, slot_id)
---> 59         return await ref.run_subtask(subtask)
     60 
     61     async def cancel_subtask_in_slot(self, band_name: str, slot_id: int):

~/Documents/mars_dev/mars/mars/oscar/backends/context.py in send(self, actor_ref, message, wait_response)
    152             if wait_response:
    153                 result = await self._wait(future, actor_ref.address, message)
--> 154                 return self._process_result_message(result)
    155             else:
    156                 return future

~/Documents/mars_dev/mars/mars/oscar/backends/context.py in _process_result_message(message)
     57             return message.result
     58         else:
---> 59             raise message.error.with_traceback(message.traceback)
     60 
     61     async def _wait(self,

~/Documents/mars_dev/mars/mars/oscar/backends/pool.py in send()
    490             coro = self._actors[actor_id].__on_receive__(message.content)
    491             with self._run_coro(message.message_id, coro) as future:
--> 492                 result = await future
    493             processor.result = ResultMessage(message.message_id, result,
    494                                              protocol=message.protocol)

~/Documents/mars_dev/mars/mars/oscar/api.py in __on_receive__()
    109             Message shall be (method_name,) + args + (kwargs,)
    110         """
--> 111         return await super().__on_receive__(message)
    112 
    113 

~/Documents/mars_dev/mars/mars/oscar/core.pyx in __on_receive__()

~/Documents/mars_dev/mars/mars/oscar/core.pyx in mars.oscar.core._Actor.__on_receive__()

~/Documents/mars_dev/mars/mars/oscar/core.pyx in _handle_actor_result()

~/Documents/mars_dev/mars/mars/oscar/core.pyx in _run_actor_async_generator()

~/Documents/mars_dev/mars/mars/oscar/core.pyx in mars.oscar.core._Actor._run_actor_async_generator()

~/Documents/mars_dev/mars/mars/services/subtask/worker/runner.py in run_subtask()
     88         self._running_processor = self._last_processor = processor
     89         try:
---> 90             result = yield self._running_processor.run(subtask)
     91         finally:
     92             self._running_processor = None

~/Documents/mars_dev/mars/mars/oscar/core.pyx in mars.oscar.core._Actor._run_actor_async_generator()

~/Documents/mars_dev/mars/mars/oscar/core.pyx in _handle_actor_result()

~/Documents/mars_dev/mars/mars/oscar/backends/context.py in send()
    152             if wait_response:
    153                 result = await self._wait(future, actor_ref.address, message)
--> 154                 return self._process_result_message(result)
    155             else:
    156                 return future

~/Documents/mars_dev/mars/mars/oscar/backends/context.py in _process_result_message()
     57             return message.result
     58         else:
---> 59             raise message.error.with_traceback(message.traceback)
     60 
     61     async def _wait(self,

~/Documents/mars_dev/mars/mars/oscar/backends/pool.py in send()
    490             coro = self._actors[actor_id].__on_receive__(message.content)
    491             with self._run_coro(message.message_id, coro) as future:
--> 492                 result = await future
    493             processor.result = ResultMessage(message.message_id, result,
    494                                              protocol=message.protocol)

~/Documents/mars_dev/mars/mars/oscar/api.py in __on_receive__()
    109             Message shall be (method_name,) + args + (kwargs,)
    110         """
--> 111         return await super().__on_receive__(message)
    112 
    113 

~/Documents/mars_dev/mars/mars/oscar/core.pyx in __on_receive__()

~/Documents/mars_dev/mars/mars/oscar/core.pyx in mars.oscar.core._Actor.__on_receive__()

~/Documents/mars_dev/mars/mars/oscar/core.pyx in _handle_actor_result()

~/Documents/mars_dev/mars/mars/oscar/core.pyx in _run_actor_async_generator()

~/Documents/mars_dev/mars/mars/oscar/core.pyx in mars.oscar.core._Actor._run_actor_async_generator()

~/Documents/mars_dev/mars/mars/services/subtask/worker/processor.py in run()
    463         self._running_aio_task = asyncio.create_task(processor.run())
    464         try:
--> 465             result = yield self._running_aio_task
    466             raise mo.Return(result)
    467         finally:

~/Documents/mars_dev/mars/mars/oscar/core.pyx in mars.oscar.core._Actor._run_actor_async_generator()

~/Documents/mars_dev/mars/mars/oscar/core.pyx in _handle_actor_result()

~/Documents/mars_dev/mars/mars/services/subtask/worker/processor.py in run()
    358                 await self._unpin_data(input_keys)
    359             # store results data
--> 360             stored_keys, store_sizes, memory_sizes = await self._store_data(chunk_graph)
    361             # store meta
    362             await self._store_meta(chunk_graph, stored_keys, store_sizes, memory_sizes)

~/Documents/mars_dev/mars/mars/services/subtask/worker/processor.py in _store_data()
    247                 # update meta
    248                 if not isinstance(result_data, tuple):
--> 249                     result_chunk.params = result_chunk.get_params_from_data(result_data)
    250 
    251                 put = self._storage_api.put.delay(data_key, result_data)

~/Documents/mars_dev/mars/mars/dataframe/core.py in get_params_from_data()
    879         return {
    880             'shape': data.shape,
--> 881             'dtype': data.dtype,
    882             'index_value': parse_index(data.index, store_data=False),
    883             'name': data.name

~/miniconda3/lib/python3.7/site-packages/pandas/core/generic.py in __getattr__()
   5458             if self._info_axis._can_hold_identifiers_and_holds_name(name):
   5459                 return self[name]
-> 5460             return object.__getattribute__(self, name)
   5461 
   5462     def __setattr__(self, name: str, value) -> None:

AttributeError: 'DataFrame' object has no attribute 'dtype'