vaexio / vaex

Out-of-Core hybrid Apache Arrow/NumPy DataFrame for Python, ML, visualization and exploration of big tabular data at a billion rows per second 🚀
https://vaex.io
MIT License
8.31k stars 591 forks source link

[BUG-REPORT] How to use vaex.open partititioning argument? #2281

Open sungreong opened 2 years ago

sungreong commented 2 years ago

Description Hello, I want to partially import a parquet file partitioned from vaex. However, looking at the documentation, there is no such thing, so I am posting it in a bug report. In pyarrow, you can extract rows from patitioned data using the filter function. However, vaex couldn't find these functions, so I'm asking.

In other words, I wonder if it is possible to use the filter function supported by pyarrow in vaex and for examples.

Software information

Additional information

generate data (paritioning)

import vaex
import numpy as np , pandas as pd
from sklearn.datasets import make_classification
import pyarrow as pa
import pyarrow.parquet as pq
X , y = make_classification(n_samples=100000, n_features=5,n_classes=2,seed=1234)
X_pd= pd.DataFrame(X,columns =[ f"feature_{i}" for i in range(X.shape[1])])
X_pd['class'] =y
X_pd['class1'] =np.random.randint(0,5,len(y))
X_pd.to_parquet(
    path="./test_vaex", 
    engine='pyarrow',
    compression='snappy',
    partition_cols=['class','class1']
    )

filter

filter data using pyarrow parquet

filters = [("class","=",0),("class1","in",{1,2})]
df_pq_filtered = pq.read_table("./test_vaex",filters=filters )
df_pq_filtered.shape

output : (20106, 7)

filter data using vaex (I am not sure )

df_vaex_filtered = vaex.open("./test_vaex",filters=filters) ## NOT WORKING
df_vaex_filtered.shape

output : (100000, 7)

filter data using vaex (just try)

df_vaex_partition = vaex.open('./test_vaex/', partitioning=['class1'])
df_vaex_partition[df_vaex_partition['class1']=='class=0'] # raise Error
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python3.9/dist-packages/vaex/dataframe.py", line 4223, in __repr__
    return self._head_and_tail_table(format='plain')
  File "/usr/local/lib/python3.9/dist-packages/vaex/dataframe.py", line 3962, in _head_and_tail_table
    N = _len(self)
  File "/usr/local/lib/python3.9/dist-packages/vaex/dataframe.py", line 72, in _len
    return o.__len__()
  File "/usr/local/lib/python3.9/dist-packages/vaex/dataframe.py", line 4311, in __len__
    self._cached_filtered_length = int(self.count())
  File "/usr/local/lib/python3.9/dist-packages/vaex/dataframe.py", line 965, in count
    return self._compute_agg('count', expression, binby, limits, shape, selection, delay, edges, progress, array_type=array_type)
  File "/usr/local/lib/python3.9/dist-packages/vaex/dataframe.py", line 939, in _compute_agg
    return self._delay(delay, progressbar.exit_on(var))
  File "/usr/local/lib/python3.9/dist-packages/vaex/dataframe.py", line 1778, in _delay
    self.execute()
  File "/usr/local/lib/python3.9/dist-packages/vaex/dataframe.py", line 420, in execute
    self.executor.execute()
  File "/usr/local/lib/python3.9/dist-packages/vaex/execution.py", line 308, in execute
    for _ in self.execute_generator():
  File "/usr/local/lib/python3.9/dist-packages/vaex/execution.py", line 432, in execute_generator
    yield from self.thread_pool.map(self.process_part, dataset.chunk_iterator(run.dataset_deps, chunk_size),
  File "/usr/local/lib/python3.9/dist-packages/vaex/multithreading.py", line 110, in map
    for value in iterator:
  File "/usr/local/lib/python3.9/dist-packages/vaex/itertools.py", line 5, in buffer
    values.append(next(i))
  File "/usr/lib/python3.9/concurrent/futures/_base.py", line 609, in result_iterator
    yield fs.pop().result()
  File "/usr/lib/python3.9/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/usr/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/usr/lib/python3.9/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/usr/local/lib/python3.9/dist-packages/vaex/multithreading.py", line 86, in wrapped
    return callable(self.local.index, *args, **kwargs, **kwargs_extra)
  File "/usr/local/lib/python3.9/dist-packages/vaex/execution.py", line 500, in process_part
    self.process_tasks(thread_index, i1, i2, chunks, run, df, tasks)
  File "/usr/local/lib/python3.9/dist-packages/vaex/execution.py", line 520, in process_tasks
    filter_mask = filter_scope.evaluate(vaex.dataframe.FILTER_SELECTION_NAME)
  File "/usr/local/lib/python3.9/dist-packages/vaex/scopes.py", line 113, in evaluate
    result = self[expression]
  File "/usr/local/lib/python3.9/dist-packages/vaex/scopes.py", line 156, in __getitem__
    mask_values = selection.evaluate(self.df, variable, self.i1, self.i2, self)#, self.filter_mask)
  File "/usr/local/lib/python3.9/dist-packages/vaex/selections.py", line 132, in evaluate
    result = scope.evaluate(self.boolean_expression)
  File "/usr/local/lib/python3.9/dist-packages/vaex/scopes.py", line 119, in evaluate
    result = eval(expression, expression_namespace, self)
  File "<string>", line 1, in <module>
  File "/usr/local/lib/python3.9/dist-packages/vaex/arrow/numpy_dispatch.py", line 136, in wrapper
    result = f(*args, **kwargs)
  File "/usr/local/lib/python3.9/dist-packages/vaex/functions.py", line 48, in decorated
    return f(x, *args, **kwargs)
  File "/usr/local/lib/python3.9/dist-packages/vaex/functions.py", line 1006, in str_equals
    x = _to_string_sequence(x)
  File "/usr/local/lib/python3.9/dist-packages/vaex/column.py", line 607, in _to_string_sequence
    return convert.column_from_arrow_array(x).string_sequence
  File "/usr/local/lib/python3.9/dist-packages/vaex/arrow/convert.py", line 85, in column_from_arrow_array
    return numpy_array_from_arrow_array(arrow_array)
  File "/usr/local/lib/python3.9/dist-packages/vaex/arrow/convert.py", line 125, in numpy_array_from_arrow_array
    dtype = vaex.array_types.to_numpy_type(arrow_array.type)
  File "/usr/local/lib/python3.9/dist-packages/vaex/array_types.py", line 315, in to_numpy_type
    return numpy_dtype_from_arrow_type(data_type, strict=strict)
  File "/usr/local/lib/python3.9/dist-packages/vaex/array_types.py", line 332, in numpy_dtype_from_arrow_type
    raise NotImplementedError(f'Cannot convert {arrow_type}')
NotImplementedError: Cannot convert dictionary<values=string, indices=int32, ordered=0>
Ben-Epstein commented 2 years ago

@sungreong @maartenbreddels I'm not sure how/why this is happening but I have a quick workaround for you...

For some reason your int classes (class and class1) are having their types destroyed. Vaex somehow thinks they are dictionaries, which explains the error above and why you cannot filter.

image

The float columns seem fine, and you can filter on them

image

So you can cast to an int and itll work fine

image

I don't get why it's happening however, but I'm sure Maarten will :)

Ben-Epstein commented 2 years ago

It has something to do with the partition cols @maartenbreddels because if you remove those then the filter and the dtypes work as expected

image
sungreong commented 2 years ago

Thanks But I do not understand partitioning fuction in vaex.open Can you write the example using partitioning in vaex.open

JovanVeljanoski commented 2 years ago

@sungreong If you don't mind having the data in memory, just open it via arrow as in your example, then use vaex.from_arrow_table to pass it to vaex.

Otherwise the example from @Ben-Epstein is the way I would approach it also.