mars-project / mars

Mars is a tensor-based unified framework for large-scale data computation which scales numpy, pandas, scikit-learn and Python functions.
https://mars-project.readthedocs.io
Apache License 2.0
2.69k stars 325 forks source link

[BUG] read_csv().apply(func, axis=1) failed #1107

Open qinxuye opened 4 years ago

qinxuye commented 4 years ago

Describe the bug

read_csv().apply(func, axis=1) failed.

To Reproduce To help us reproducing this bug, please provide information below:

  1. Your Python version
  2. The version of Mars you use
  3. Versions of crucial packages, such as numpy, scipy and protobuf
  4. Full stack of the error.
  5. Minimized code to reproduce the error.
In [44]: %%time  
    ...: df = md.read_csv('Downloads/world-cities-database/worldcitiespop.csv', chunk_bytes='16M') 
    ...: df.apply(lambda r: haversine(r['Latitude'], r['Longitude'], 30.25, 120.17), result_type='reduce', axis=1).execute
    ...: () 
    ...:  
    ...:                                                                                                                  
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<timed exec> in <module>

~/Workspace/mars/mars/core.py in execute(self, session, **kw)
    426         if session is None:
    427             session = Session.default_or_local()
--> 428         return session.run(self, **kw)
    429 
    430     def fetch(self, session=None, **kw):

~/Workspace/mars/mars/session.py in run(self, *tileables, **kw)
    181         tileables = tuple(mt.tensor(t) if not isinstance(t, (Entity, Base)) else t
    182                           for t in tileables)
--> 183         result = self._sess.run(*tileables, **kw)
    184 
    185         for t in tileables:

~/Workspace/mars/mars/session.py in run(self, *tileables, **kw)
     88             # set number of running cores
     89             self.context.set_ncores(kw['n_parallel'])
---> 90             res = self._executor.execute_tileables(tileables, **kw)
     91             return res
     92 

~/Workspace/mars/mars/utils.py in _wrapped(*args, **kwargs)
    380                 _kernel_mode.eager = False
    381             _kernel_mode.eager_count = enter_eager_count + 1
--> 382             return func(*args, **kwargs)
    383         finally:
    384             _kernel_mode.eager_count -= 1

~/Workspace/mars/mars/utils.py in inner(*args, **kwargs)
    468     def inner(*args, **kwargs):
    469         with build_mode():
--> 470             return func(*args, **kwargs)
    471     return inner
    472 

~/Workspace/mars/mars/executor.py in execute_tileables(self, tileables, fetch, n_parallel, n_thread, print_progress, mock, compose)
    810                 # build chunk graph, tile will be done during building
    811                 chunk_graph = chunk_graph_builder.build(
--> 812                     tileables, tileable_graph=tileable_graph)
    813                 tileable_graph = chunk_graph_builder.prev_tileable_graph
    814                 temp_result_keys = set(result_keys)

~/Workspace/mars/mars/utils.py in _wrapped(*args, **kwargs)
    380                 _kernel_mode.eager = False
    381             _kernel_mode.eager_count = enter_eager_count + 1
--> 382             return func(*args, **kwargs)
    383         finally:
    384             _kernel_mode.eager_count -= 1

~/Workspace/mars/mars/utils.py in inner(*args, **kwargs)
    468     def inner(*args, **kwargs):
    469         with build_mode():
--> 470             return func(*args, **kwargs)
    471     return inner
    472 

~/Workspace/mars/mars/tiles.py in build(self, tileables, tileable_graph)
    338 
    339         chunk_graph = super().build(
--> 340             tileables, tileable_graph=tileable_graph)
    341         self._iterative_chunk_graphs.append(chunk_graph)
    342         if len(self._interrupted_ops) == 0:

~/Workspace/mars/mars/utils.py in _wrapped(*args, **kwargs)
    380                 _kernel_mode.eager = False
    381             _kernel_mode.eager_count = enter_eager_count + 1
--> 382             return func(*args, **kwargs)
    383         finally:
    384             _kernel_mode.eager_count -= 1

~/Workspace/mars/mars/utils.py in inner(*args, **kwargs)
    468     def inner(*args, **kwargs):
    469         with build_mode():
--> 470             return func(*args, **kwargs)
    471     return inner
    472 

~/Workspace/mars/mars/tiles.py in build(self, tileables, tileable_graph)
    252                     # for further execution
    253                     partial_tiled_chunks = \
--> 254                         self._on_tile_failure(tileable_data.op, exc_info)
    255                     if partial_tiled_chunks is not None and \
    256                             len(partial_tiled_chunks) > 0:

~/Workspace/mars/mars/tiles.py in inner(op, exc_info)
    290                     on_tile_failure(op, exc_info)
    291                 else:
--> 292                     raise exc_info[1].with_traceback(exc_info[2]) from None
    293         return inner
    294 

~/Workspace/mars/mars/tiles.py in build(self, tileables, tileable_graph)
    232                 continue
    233             try:
--> 234                 tiled = self._tile(tileable_data, tileable_graph)
    235                 tiled_op.add(tileable_data.op)
    236                 for t, td in zip(tileable_data.op.outputs, tiled):

~/Workspace/mars/mars/tiles.py in _tile(self, tileable_data, tileable_graph)
    326         if any(inp.op in self._interrupted_ops for inp in tileable_data.inputs):
    327             raise TilesError('Tile fail due to failure of inputs')
--> 328         return super()._tile(tileable_data, tileable_graph)
    329 
    330     @kernel_mode

~/Workspace/mars/mars/tiles.py in _tile(self, tileable_data, tileable_graph)
    190                 t._nsplits = o.nsplits
    191         elif on_tile is None:
--> 192             tds[0]._inplace_tile()
    193         else:
    194             tds = on_tile(tileable_data.op.outputs, tds)

~/Workspace/mars/mars/core.py in _inplace_tile(self)
    160 
    161     def _inplace_tile(self):
--> 162         return handler.inplace_tile(self)
    163 
    164     def __getattr__(self, attr):

~/Workspace/mars/mars/tiles.py in inplace_tile(self, to_tile)
    125         if not to_tile.is_coarse():
    126             return to_tile
--> 127         dispatched = self.dispatch(to_tile.op)
    128         self._assign_to([d.data for d in dispatched], to_tile.op.outputs)
    129         return to_tile

~/Workspace/mars/mars/utils.py in _wrapped(*args, **kwargs)
    380                 _kernel_mode.eager = False
    381             _kernel_mode.eager_count = enter_eager_count + 1
--> 382             return func(*args, **kwargs)
    383         finally:
    384             _kernel_mode.eager_count -= 1

~/Workspace/mars/mars/tiles.py in dispatch(self, op)
    113             return self._handlers[op_cls](op)
    114         try:
--> 115             return op_cls.tile(op)
    116         except NotImplementedError:
    117             for registered_op_cls in self._handlers:

~/Workspace/mars/mars/dataframe/base/apply.py in tile(cls, op)
    161     def tile(cls, op):
    162         if op.inputs[0].op.object_type == ObjectType.dataframe:
--> 163             return cls._tile_df(op)
    164         else:
    165             return cls._tile_series(op)

~/Workspace/mars/mars/dataframe/base/apply.py in _tile_df(cls, op)
    101             if axis == 1:
    102                 chunk_size = chunk_size[::-1]
--> 103             in_df = in_df.rechunk(chunk_size).tiles()
    104 
    105         chunks = []

~/Workspace/mars/mars/dataframe/base/rechunk.py in rechunk(a, chunk_size, threshold, chunk_size_limit)
     88     else:
     89         itemsize = a.dtype.itemsize
---> 90     chunk_size = get_nsplits(a, chunk_size, itemsize)
     91     if chunk_size == a.nsplits:
     92         return a

~/Workspace/mars/mars/tensor/rechunk/core.py in get_nsplits(tileable, new_chunk_size, itemsize)
     36         chunk_size = new_chunk_size
     37 
---> 38     return decide_chunk_sizes(tileable.shape, chunk_size, itemsize)
     39 
     40 

~/Workspace/mars/mars/tensor/utils.py in decide_chunk_sizes(shape, chunk_size, itemsize)
    560         raise ValueError("chunks have more dimensions than input tensor")
    561     if nleft == 0:
--> 562         return normalize_chunk_sizes(shape, tuple(chunk_size[j] for j in range(len(shape))))
    563 
    564     max_chunk_size = options.chunk_store_limit

~/Workspace/mars/mars/tensor/utils.py in normalize_chunk_sizes(shape, chunk_size)
     69             assert isinstance(chunk, int)
     70 
---> 71             sizes = tuple(chunk for _ in range(int(size / chunk))) + \
     72                 (tuple() if size % chunk == 0 else (size % chunk,))
     73             chunk_sizes.append(sizes)

ValueError: cannot convert float NaN to integer
qinxuye commented 4 years ago

Definition of haversine.

import numpy as np
def haversine(lat1, lon1, lat2, lon2): 
    dlon = np.radians(lon2 - lon1) 
    dlat = np.radians(lat2 - lat1) 
    a = np.sin(dlat / 2) ** 2 + np.cos(np.radians(lat1)) * np.cos(np.radians(lat2)) * np.sin(dlon / 2) ** 2 
    c = 2 * np.arcsin(np.sqrt(a)) 
    r =  6371
    return c * r 

Data is available at: https://www.kaggle.com/max-mind/world-cities-database/data

For the local run, I executed read_csv first, then I can accomplish apply stuff, but there is no any speed boost.

In [40]: df = pd.read_csv('Downloads/world-cities-database/worldcitiespop.csv')                                           
/Users/xuyeqin/miniconda3/lib/python3.7/site-packages/IPython/core/interactiveshell.py:3058: DtypeWarning: Columns (3) have mixed types.Specify dtype option on import or set low_memory=False.
  interactivity=interactivity, compiler=compiler, result=result)

In [41]: df.head()                                                                                                        
Out[41]: 
  Country        City  AccentCity Region  Population   Latitude  Longitude
0      ad       aixas       Aixàs      6         NaN  42.483333   1.466667
1      ad  aixirivali  Aixirivali      6         NaN  42.466667   1.500000
2      ad  aixirivall  Aixirivall      6         NaN  42.466667   1.500000
3      ad   aixirvall   Aixirvall      6         NaN  42.466667   1.500000
4      ad    aixovall    Aixovall      6         NaN  42.466667   1.483333

In [42]: %%time  
    ...: df.apply(lambda r: haversine(r['Latitude'], r['Longitude'], 30.25, 120.17), result_type='reduce', axis=1) 
    ...:  
    ...:                                                                                                                  
CPU times: user 1min 49s, sys: 410 ms, total: 1min 50s
Wall time: 1min 50s
Out[42]: 
0           9789.135208
1           9788.270528
2           9788.270528
3           9788.270528
4           9789.307210
               ...     
3173953    10899.720735
3173954    11220.703197
3173955    10912.645753
3173956    11318.038981
3173957    11141.080171
Length: 3173958, dtype: float64

In [55]: df = md.read_csv('Downloads/world-cities-database/worldcitiespop.csv', chunk_bytes='16M', dtype={'Region': object
    ...: })                                                                                                               

In [56]: df.execute(fetch=False)                                                                                          

In [57]: %%time  
    ...: df.apply(lambda r: haversine(r['Latitude'], r['Longitude'], 30.25, 120.17), result_type='reduce', axis=1).execute
    ...: () 
    ...:  
    ...:                                                                                                                  
CPU times: user 1min 50s, sys: 724 ms, total: 1min 51s
Wall time: 1min 48s
Out[57]: 
0          9789.135208
1          9788.270528
2          9788.270528
3          9788.270528
4          9789.307210
              ...     
248061    10899.720735
248062    11220.703197
248063    10912.645753
248064    11318.038981
248065    11141.080171
Length: 3173958, dtype: float64

Thus, I try to use local cluster, still I executed read_csv first, then run the apply, still cannot see any promotion.

In [4]: cluster = new_cluster(web=True)                                                                                   
/opt/concourse/worker/volumes/live/b066e71c-c221-49d1-533f-e4cc9fc71f42/volume/arrow-cpp_1540956703648/work/cpp/src/plasma/store.cc:971: Allowing the Plasma store to use up to 6.87195GB of memory.
/opt/concourse/worker/volumes/live/b066e71c-c221-49d1-533f-e4cc9fc71f42/volume/arrow-cpp_1540956703648/work/cpp/src/plasma/store.cc:1001: Starting object store with directory /tmp and huge page support disabled
WARNING:bokeh.server.util:Host wildcard '*' will allow connections originating from multiple (or possibly all) hostnames or IPs. Use non-wildcard values to restrict access explicitly
Web endpoint started at http://0.0.0.0:61352

In [5]: WARNING:tornado.access:404 GET /favicon.ico (127.0.0.1) 0.35ms                                                    

In [7]: import mars.dataframe as md                                                                                       

In [8]: df = md.read_csv('Downloads/world-cities-database/worldcitiespop.csv', chunk_bytes='16M', dtype={'Region': object}
   ...: )                                                                                                                 

In [9]: df.execute(fetch=False)                                                                                           
/Users/xuyeqin/miniconda3/lib/python3.7/site-packages/pyarrow/serialization.py:104: FutureWarning: The SparseSeries class is removed from pandas. Accessing it from the top-level namespace will also be removed in the next version
  if isinstance(obj, pd.SparseSeries):
/Users/xuyeqin/miniconda3/lib/python3.7/site-packages/pyarrow/serialization.py:93: FutureWarning: The SparseDataFrame class is removed from pandas. Accessing it from the top-level namespace will also be removed in the next version
  if isinstance(obj, pd.SparseDataFrame):
/Users/xuyeqin/miniconda3/lib/python3.7/site-packages/pyarrow/serialization.py:104: FutureWarning: The SparseSeries class is removed from pandas. Accessing it from the top-level namespace will also be removed in the next version
  if isinstance(obj, pd.SparseSeries):
/Users/xuyeqin/miniconda3/lib/python3.7/site-packages/pyarrow/serialization.py:93: FutureWarning: The SparseDataFrame class is removed from pandas. Accessing it from the top-level namespace will also be removed in the next version
  if isinstance(obj, pd.SparseDataFrame):
/Users/xuyeqin/miniconda3/lib/python3.7/site-packages/pyarrow/serialization.py:93: FutureWarning: The SparseDataFrame class is removed from pandas. Accessing it from the top-level namespace will also be removed in the next version
  if isinstance(obj, pd.SparseDataFrame):
/Users/xuyeqin/miniconda3/lib/python3.7/site-packages/pyarrow/serialization.py:93: FutureWarning: The SparseDataFrame class is removed from pandas. Accessing it from the top-level namespace will also be removed in the next version
  if isinstance(obj, pd.SparseDataFrame):
/Users/xuyeqin/miniconda3/lib/python3.7/site-packages/pyarrow/serialization.py:93: FutureWarning: The SparseDataFrame class is removed from pandas. Accessing it from the top-level namespace will also be removed in the next version
  if isinstance(obj, pd.SparseDataFrame):
/Users/xuyeqin/miniconda3/lib/python3.7/site-packages/pyarrow/serialization.py:93: FutureWarning: The SparseDataFrame class is removed from pandas. Accessing it from the top-level namespace will also be removed in the next version
  if isinstance(obj, pd.SparseDataFrame):
/Users/xuyeqin/miniconda3/lib/python3.7/site-packages/pyarrow/serialization.py:93: FutureWarning: The SparseDataFrame class is removed from pandas. Accessing it from the top-level namespace will also be removed in the next version
  if isinstance(obj, pd.SparseDataFrame):
/Users/xuyeqin/miniconda3/lib/python3.7/site-packages/pyarrow/serialization.py:93: FutureWarning: The SparseDataFrame class is removed from pandas. Accessing it from the top-level namespace will also be removed in the next version
  if isinstance(obj, pd.SparseDataFrame):
/Users/xuyeqin/miniconda3/lib/python3.7/site-packages/pyarrow/serialization.py:93: FutureWarning: The SparseDataFrame class is removed from pandas. Accessing it from the top-level namespace will also be removed in the next version
  if isinstance(obj, pd.SparseDataFrame):
/Users/xuyeqin/miniconda3/lib/python3.7/site-packages/pyarrow/serialization.py:93: FutureWarning: The SparseDataFrame class is removed from pandas. Accessing it from the top-level namespace will also be removed in the next version
  if isinstance(obj, pd.SparseDataFrame):
/Users/xuyeqin/miniconda3/lib/python3.7/site-packages/pyarrow/serialization.py:93: FutureWarning: The SparseDataFrame class is removed from pandas. Accessing it from the top-level namespace will also be removed in the next version
  if isinstance(obj, pd.SparseDataFrame):
/Users/xuyeqin/miniconda3/lib/python3.7/site-packages/pyarrow/serialization.py:93: FutureWarning: The SparseDataFrame class is removed from pandas. Accessing it from the top-level namespace will also be removed in the next version
  if isinstance(obj, pd.SparseDataFrame):

In [10]: %%time 
    ...: df.apply(lambda r: haversine(r['Latitude'], r['Longitude'], 30.25, 120.17), result_type='reduce', axis=1).execute
    ...: () 
    ...:  
    ...:                                                                                                                  
/Users/xuyeqin/miniconda3/lib/python3.7/site-packages/pyarrow/serialization.py:104: FutureWarning: The SparseSeries class is removed from pandas. Accessing it from the top-level namespace will also be removed in the next version
  if isinstance(obj, pd.SparseSeries):
/Users/xuyeqin/miniconda3/lib/python3.7/site-packages/pyarrow/serialization.py:93: FutureWarning: The SparseDataFrame class is removed from pandas. Accessing it from the top-level namespace will also be removed in the next version
  if isinstance(obj, pd.SparseDataFrame):
/Users/xuyeqin/miniconda3/lib/python3.7/site-packages/pyarrow/serialization.py:104: FutureWarning: The SparseSeries class is removed from pandas. Accessing it from the top-level namespace will also be removed in the next version
  if isinstance(obj, pd.SparseSeries):
CPU times: user 23.6 s, sys: 226 ms, total: 23.9 s
Wall time: 2min 20s
Out[10]: 
0          9789.135208
1          9788.270528
2          9788.270528
3          9788.270528
4          9789.307210
              ...     
248061    10899.720735
248062    11220.703197
248063    10912.645753
248064    11318.038981
248065    11141.080171
Length: 3173958, dtype: float64