vaexio / vaex

Out-of-Core hybrid Apache Arrow/NumPy DataFrame for Python, ML, visualization and exploration of big tabular data at a billion rows per second 🚀
https://vaex.io
MIT License
8.23k stars 590 forks source link

[BUG-REPORT] DataFrame.to_pandas_df with selection and chunk_size applied #2307

Open slvnwhrl opened 1 year ago

slvnwhrl commented 1 year ago

Description When converting a vaex.DataFrame to a pandas.DataFrame with selection=True and a positive chunk_size, the following error occurs: RuntimeError: Oops, requesting column default from dataset, but it does not exist.

Software information

Additional information To reproduce the error, I am providing the following example:

df = vaex.example()
df.select(df.x > 5)
pandas_generator = df.to_pandas_df(selection=True, chunk_size=20)
for i in pandas_generator:
    print(i)

Producing this traceback:

RuntimeError                              Traceback (most recent call last)
Cell In [27], line 4
      2 df.select(df.x > 5)
      3 pandas_generator = df.to_pandas_df(selection=True, chunk_size=20)
----> 4 for i in pandas_generator:
      5     print(i)

File ~\.conda\envs\nlp\lib\site-packages\vaex\dataframe.py:3337, in DataFrame.to_pandas_df.<locals>.iterator()
   3336 def iterator():
-> 3337     for i1, i2, chunks in self.evaluate_iterator(column_names, selection=selection, parallel=parallel, chunk_size=chunk_size, array_type=array_type):
   3338         yield i1, i2, create_pdf(dict(zip(column_names, chunks)))

File ~\.conda\envs\nlp\lib\site-packages\vaex\dataframe.py:3143, in DataFrame.evaluate_iterator(self, expression, s1, s2, out, selection, filtered, array_type, parallel, chunk_size, prefetch, progress)
   3139 previous = executor.submit(f, previous_i1, previous_i2)
   3140 for l1, l2, i1, i2 in iter:
   3141     # and we submit the next job before returning the previous, so they run in parallel
   3142     # but make sure the previous is done
-> 3143     previous_chunk = previous.result()
   3144     current = executor.submit(f, i1, i2)
   3145     yield previous_l1, previous_l2, previous_chunk

File ~\.conda\envs\nlp\lib\concurrent\futures\_base.py:458, in Future.result(self, timeout)
    456     raise CancelledError()
    457 elif self._state == FINISHED:
--> 458     return self.__get_result()
    459 else:
    460     raise TimeoutError()

File ~\.conda\envs\nlp\lib\concurrent\futures\_base.py:403, in Future.__get_result(self)
    401 if self._exception:
    402     try:
--> 403         raise self._exception
    404     finally:
    405         # Break a reference cycle with the exception in self._exception
    406         self = None

File ~\.conda\envs\nlp\lib\concurrent\futures\thread.py:58, in _WorkItem.run(self)
     55     return
     57 try:
---> 58     result = self.fn(*self.args, **self.kwargs)
     59 except BaseException as exc:
     60     self.future.set_exception(exc)

File ~\.conda\envs\nlp\lib\site-packages\vaex\dataframe.py:3132, in DataFrame.evaluate_iterator.<locals>.f(i1, i2)
   3131 def f(i1, i2):
-> 3132     return self._evaluate_implementation(expression, i1=i1, i2=i2, out=out, selection=selection, filtered=filtered, array_type=array_type, parallel=parallel, raw=True)

File ~\.conda\envs\nlp\lib\site-packages\vaex\dataframe.py:6494, in DataFrameLocal._evaluate_implementation(self, expression, i1, i2, out, selection, filtered, array_type, parallel, chunk_size, raw, progress)
   6492             arrays[expression][i1:i2] = blocks[i]
   6493 if expression_to_evaluate:
-> 6494     df.map_reduce(assign, lambda *_: None, expression_to_evaluate, progress=progress, ignore_filter=False, selection=selection, pre_filter=use_filter, info=True, to_numpy=False, name="evaluate")
   6495 def finalize_result(expression):
   6496     expression_obj = expression

File ~\.conda\envs\nlp\lib\site-packages\vaex\dataframe.py:438, in DataFrame.map_reduce(self, map, reduce, arguments, progress, delay, info, to_numpy, ignore_filter, pre_filter, name, selection)
    436 progressbar.add_task(task, f'map reduce: {name}')
    437 task = self.executor.schedule(task)
--> 438 return self._delay(delay, task)

File ~\.conda\envs\nlp\lib\site-packages\vaex\dataframe.py:1780, in DataFrame._delay(self, delay, task, progressbar)
   1778     return task
   1779 else:
-> 1780     self.execute()
   1781     return task.get()

File ~\.conda\envs\nlp\lib\site-packages\vaex\dataframe.py:421, in DataFrame.execute(self)
    419         print(repr(task))
    420 if self.executor.tasks:
--> 421     self.executor.execute()

File ~\.conda\envs\nlp\lib\site-packages\vaex\execution.py:308, in ExecutorLocal.execute(self)
    307 def execute(self):
--> 308     for _ in self.execute_generator():
    309         pass

File ~\.conda\envs\nlp\lib\site-packages\vaex\execution.py:346, in ExecutorLocal.execute_generator(self, use_async)
    344     break
    345 tasks = _merge(tasks)
--> 346 run = Run(tasks)
    347 self.passes += 1
    348 dataset = run.dataset

File ~\.conda\envs\nlp\lib\site-packages\vaex\execution.py:120, in Run.__init__(self, tasks)
    118 if var not in self.dataset:
    119     if var not in others:
--> 120         raise RuntimeError(f'Oops, requesting column {var} from dataset, but it does not exist')
    121     else:
    122         pass  # ok, not a column, just a var or virtual column

RuntimeError: Oops, requesting column default from dataset, but it does not exist

Edit: After some more trying I found that it works as expected if parallel is set to False. However, I don't assume that it should only work that ways as it is not mentioned in the documentation.

JovanVeljanoski commented 1 year ago

Hi, thank you for the report. This appears to be a bug indeed. I've opened a PR with a test, and I hope we can fix this soon.

In the meantime, a simple workaround would be to bass the boolean expression right to the selection argument:

df = vaex.example()
pandas_generator = df.to_pandas_df(selection=df.x > 5 chunk_size=20)
for i in pandas_generator:
    print(i)

Thanks!