vaexio / vaex

Out-of-Core hybrid Apache Arrow/NumPy DataFrame for Python, ML, visualization and exploration of big tabular data at a billion rows per second 🚀
https://vaex.io
MIT License
8.22k stars 590 forks source link

[BUG-REPORT] struct.get() fails on pyarrow chunked arrays #2313

Open NickCrews opened 1 year ago

NickCrews commented 1 year ago
import vaex
import pyarrow as pa

dtype = pa.struct(
    [
        pa.field("a", pa.int64()),
        pa.field("b", pa.int64()),
    ]
)
arr = pa.array(
    [
        (1, 2),
        (3, 4),
        (5, 6),
    ],
    type=dtype,
)
ca = pa.chunked_array([arr])
df = vaex.from_arrays(
    normal=arr,
    chunked=ca,
)
print(df.to_pandas_df().to_markdown())
normal chunked
0 {'a': 1, 'b': 2} {'a': 1, 'b': 2}
1 {'a': 3, 'b': 4} {'a': 3, 'b': 4}
2 {'a': 5, 'b': 6} {'a': 5, 'b': 6}

Then:

# Works
df["normal"].struct.get("a")
# Fails
df["chunked"].struct.get("a")
KeyError                                  Traceback (most recent call last)
File ~/Library/Application Support/hatch/env/virtual/alatna-U4QVXPcf/alatna/lib/python3.9/site-packages/vaex/scopes.py:113, in _BlockScope.evaluate(self, expression, out)
    111 try:
    112     # logger.debug("try avoid evaluating: %s", expression)
--> 113     result = self[expression]
    114 except KeyError:
    115     # logger.debug("no luck, eval: %s", expression)
    116     # result = ne.evaluate(expression, local_dict=self, out=out)
    117     # logger.debug("in eval")
    118     # eval("def f(")

File ~/Library/Application Support/hatch/env/virtual/alatna-U4QVXPcf/alatna/lib/python3.9/site-packages/vaex/scopes.py:198, in _BlockScope.__getitem__(self, variable)
    197 if variable not in self.values:
--> 198     raise KeyError("Unknown variables or column: %r" % (variable,))
    200 return self.values[variable]

KeyError: 'Unknown variables or column: "struct_get(chunked, \'a\')"'

During handling of the above exception, another exception occurred:

AttributeError                            Traceback (most recent call last)
File ~/Library/Application Support/hatch/env/virtual/alatna-U4QVXPcf/alatna/lib/python3.9/site-packages/vaex/dataframe.py:2273, in DataFrame.data_type(self, expression, array_type, internal, axis)
   2272 try:
-> 2273     data = self.evaluate(expression, 0, 1, filtered=False, array_type=array_type, parallel=False)
   2274 except:

File ~/Library/Application Support/hatch/env/virtual/alatna-U4QVXPcf/alatna/lib/python3.9/site-packages/vaex/dataframe.py:3095, in DataFrame.evaluate(self, expression, i1, i2, out, selection, filtered, array_type, parallel, chunk_size, progress)
   3094 else:
-> 3095     return self._evaluate_implementation(expression, i1=i1, i2=i2, out=out, selection=selection, filtered=filtered, array_type=array_type, parallel=parallel, chunk_size=chunk_size, progress=progress)

File ~/Library/Application Support/hatch/env/virtual/alatna-U4QVXPcf/alatna/lib/python3.9/site-packages/vaex/dataframe.py:6562, in DataFrameLocal._evaluate_implementation(self, expression, i1, i2, out, selection, filtered, array_type, parallel, chunk_size, raw, progress)
   6561 for expression in expressions:
-> 6562     value = block_scope.evaluate(expression)
   6563     value = array_types.convert(value, array_type)

File ~/Library/Application Support/hatch/env/virtual/alatna-U4QVXPcf/alatna/lib/python3.9/site-packages/vaex/scopes.py:119, in _BlockScope.evaluate(self, expression, out)
    114 except KeyError:
    115     # logger.debug("no luck, eval: %s", expression)
    116     # result = ne.evaluate(expression, local_dict=self, out=out)
    117     # logger.debug("in eval")
    118     # eval("def f(")
--> 119     result = eval(expression, expression_namespace, self)
    120     result = auto_encode(self.df, expression, result)

File <string>:1, in <module>

File ~/Library/Application Support/hatch/env/virtual/alatna-U4QVXPcf/alatna/lib/python3.9/site-packages/vaex/arrow/numpy_dispatch.py:136, in autowrapper.<locals>.wrapper(*args, **kwargs)
    135 kwargs = {k: unwrap(v) for k, v, in kwargs.items()}
--> 136 result = f(*args, **kwargs)
    137 return wrap(result)

File ~/Library/Application Support/hatch/env/virtual/alatna-U4QVXPcf/alatna/lib/python3.9/site-packages/vaex/struct.py:136, in assert_struct_dtype_argument.<locals>.wrapper(struct, *args, **kwargs)
    135 assert_struct_dtype(struct)
--> 136 return func(struct, *args, **kwargs)

File ~/Library/Application Support/hatch/env/virtual/alatna-U4QVXPcf/alatna/lib/python3.9/site-packages/vaex/struct.py:218, in struct_get(x, field)
    217 _check_valid_struct_fields(x, [field])
--> 218 return x.field(field)

AttributeError: 'pyarrow.lib.ChunkedArray' object has no attribute 'field'

During handling of the above exception, another exception occurred:

KeyError                                  Traceback (most recent call last)
File ~/Library/Application Support/hatch/env/virtual/alatna-U4QVXPcf/alatna/lib/python3.9/site-packages/vaex/scopes.py:113, in _BlockScope.evaluate(self, expression, out)
    111 try:
    112     # logger.debug("try avoid evaluating: %s", expression)
--> 113     result = self[expression]
    114 except KeyError:
    115     # logger.debug("no luck, eval: %s", expression)
    116     # result = ne.evaluate(expression, local_dict=self, out=out)
    117     # logger.debug("in eval")
    118     # eval("def f(")

File ~/Library/Application Support/hatch/env/virtual/alatna-U4QVXPcf/alatna/lib/python3.9/site-packages/vaex/scopes.py:198, in _BlockScope.__getitem__(self, variable)
    197 if variable not in self.values:
--> 198     raise KeyError("Unknown variables or column: %r" % (variable,))
    200 return self.values[variable]

KeyError: 'Unknown variables or column: "struct_get(chunked, \'a\')"'

During handling of the above exception, another exception occurred:

AttributeError                            Traceback (most recent call last)
File ~/Library/Application Support/hatch/env/virtual/alatna-U4QVXPcf/alatna/lib/python3.9/site-packages/IPython/core/formatters.py:707, in PlainTextFormatter.__call__(self, obj)
    700 stream = StringIO()
    701 printer = pretty.RepresentationPrinter(stream, self.verbose,
    702     self.max_width, self.newline,
    703     max_seq_length=self.max_seq_length,
    704     singleton_pprinters=self.singleton_printers,
    705     type_pprinters=self.type_printers,
    706     deferred_pprinters=self.deferred_printers)
--> 707 printer.pretty(obj)
    708 printer.flush()
    709 return stream.getvalue()

File ~/Library/Application Support/hatch/env/virtual/alatna-U4QVXPcf/alatna/lib/python3.9/site-packages/IPython/lib/pretty.py:410, in RepresentationPrinter.pretty(self, obj)
    407                         return meth(obj, self, cycle)
    408                 if cls is not object \
    409                         and callable(cls.__dict__.get('__repr__')):
--> 410                     return _repr_pprint(obj, self, cycle)
    412     return _default_pprint(obj, self, cycle)
    413 finally:

File ~/Library/Application Support/hatch/env/virtual/alatna-U4QVXPcf/alatna/lib/python3.9/site-packages/IPython/lib/pretty.py:778, in _repr_pprint(obj, p, cycle)
    776 """A pprint that just redirects to the normal repr function."""
    777 # Find newlines and replace them with p.break_()
--> 778 output = repr(obj)
    779 lines = output.splitlines()
    780 with p.group():

File ~/Library/Application Support/hatch/env/virtual/alatna-U4QVXPcf/alatna/lib/python3.9/site-packages/vaex/expression.py:815, in Expression.__repr__(self)
    814 def __repr__(self):
--> 815     return self._repr_plain_()

File ~/Library/Application Support/hatch/env/virtual/alatna-U4QVXPcf/alatna/lib/python3.9/site-packages/vaex/expression.py:845, in Expression._repr_plain_(self)
    843     expression = expression[:57] + '...'
    844 info = 'Expression = ' + expression + '\n'
--> 845 dtype = self.dtype
    846 if self.expression in self.ds.get_column_names(hidden=True):
    847     state = "column"

File ~/Library/Application Support/hatch/env/virtual/alatna-U4QVXPcf/alatna/lib/python3.9/site-packages/vaex/expression.py:506, in Expression.dtype(self)
    504 @property
    505 def dtype(self):
--> 506     return self.df.data_type(self)

File ~/Library/Application Support/hatch/env/virtual/alatna-U4QVXPcf/alatna/lib/python3.9/site-packages/vaex/dataframe.py:2275, in DataFrame.data_type(self, expression, array_type, internal, axis)
   2273         data = self.evaluate(expression, 0, 1, filtered=False, array_type=array_type, parallel=False)
   2274     except:
-> 2275         data = self.evaluate(expression, 0, 1, filtered=True, array_type=array_type, parallel=False)
   2276 if data_type is None:
   2277     # means we have to determine it from the data
   2278     if isinstance(data, np.ndarray):

File ~/Library/Application Support/hatch/env/virtual/alatna-U4QVXPcf/alatna/lib/python3.9/site-packages/vaex/dataframe.py:3095, in DataFrame.evaluate(self, expression, i1, i2, out, selection, filtered, array_type, parallel, chunk_size, progress)
   3093     return self.evaluate_iterator(expression, s1=i1, s2=i2, out=out, selection=selection, filtered=filtered, array_type=array_type, parallel=parallel, chunk_size=chunk_size, progress=progress)
   3094 else:
-> 3095     return self._evaluate_implementation(expression, i1=i1, i2=i2, out=out, selection=selection, filtered=filtered, array_type=array_type, parallel=parallel, chunk_size=chunk_size, progress=progress)

File ~/Library/Application Support/hatch/env/virtual/alatna-U4QVXPcf/alatna/lib/python3.9/site-packages/vaex/dataframe.py:6562, in DataFrameLocal._evaluate_implementation(self, expression, i1, i2, out, selection, filtered, array_type, parallel, chunk_size, raw, progress)
   6559 block_scope.mask = filter_mask
   6561 for expression in expressions:
-> 6562     value = block_scope.evaluate(expression)
   6563     value = array_types.convert(value, array_type)
   6564     values.append(value)

File ~/Library/Application Support/hatch/env/virtual/alatna-U4QVXPcf/alatna/lib/python3.9/site-packages/vaex/scopes.py:119, in _BlockScope.evaluate(self, expression, out)
    113     result = self[expression]
    114 except KeyError:
    115     # logger.debug("no luck, eval: %s", expression)
    116     # result = ne.evaluate(expression, local_dict=self, out=out)
    117     # logger.debug("in eval")
    118     # eval("def f(")
--> 119     result = eval(expression, expression_namespace, self)
    120     result = auto_encode(self.df, expression, result)
    121     self.values[expression] = wrap(result)

File <string>:1, in <module>

File ~/Library/Application Support/hatch/env/virtual/alatna-U4QVXPcf/alatna/lib/python3.9/site-packages/vaex/arrow/numpy_dispatch.py:136, in autowrapper.<locals>.wrapper(*args, **kwargs)
    134 args = list(map(unwrap, args))
    135 kwargs = {k: unwrap(v) for k, v, in kwargs.items()}
--> 136 result = f(*args, **kwargs)
    137 return wrap(result)

File ~/Library/Application Support/hatch/env/virtual/alatna-U4QVXPcf/alatna/lib/python3.9/site-packages/vaex/struct.py:136, in assert_struct_dtype_argument.<locals>.wrapper(struct, *args, **kwargs)
    133 @functools.wraps(func)
    134 def wrapper(struct, *args, **kwargs):
    135     assert_struct_dtype(struct)
--> 136     return func(struct, *args, **kwargs)

File ~/Library/Application Support/hatch/env/virtual/alatna-U4QVXPcf/alatna/lib/python3.9/site-packages/vaex/struct.py:218, in struct_get(x, field)
    174 """Return a single field from a struct array. You may also use the shorthand notation `df.name[:, 'field']`.
    175 
    176 Please note, in case of duplicated field labels, a field can't be uniquely identified. Please
   (...)
    214 
    215 """
    217 _check_valid_struct_fields(x, [field])
--> 218 return x.field(field)

AttributeError: 'pyarrow.lib.ChunkedArray' object has no attribute 'field'

My current workaround is

@vaex.register_function(on_expression=True)
def struct_get(column: pa.Array | pa.ChunkedArray, field: str | int) -> pa.StructArray:
    """Workaround for a bug where Expression.struct.get() breaks for chunked arrays"""
    if isinstance(column, pa.Array):
        return column.field(field)
    elif isinstance(column, pa.ChunkedArray):
        return pa.chunked_array([chunk.field(field) for chunk in column.chunks])
    else:
        raise ValueError("Can only get struct from Array or ChunkedArray")