vaexio / vaex

Out-of-Core hybrid Apache Arrow/NumPy DataFrame for Python, ML, visualization and exploration of big tabular data at a billion rows per second 🚀
MIT License
8.28k stars 590 forks source link

[BUG-REPORT] `AssertionError` when evaluating any expression #2140

Open karotchykau opened 2 years ago

karotchykau commented 2 years ago

Description I have a 22 GB .parquet file. After opening the file via, outputs the following information (please note that column B is the only column that can contain NaN values)

rows: 14,169,839

column  type    unit    description expression
A   str     
B   str     
C   str     
D   str

However, once I perform any operation and evaluate it, I'll get an error. For instance, df["A"].isna().sum() (same for all other columns) or (df["A"].str.count("\n") > 1).sum() (same for all other columns), it will cause the following error to occur

AssertionError                            Traceback (most recent call last)
Input In [52], in <cell line: 1>()
----> 1 df["A"].isna().sum()

File ~/mambaforge/envs/temp_env/lib/python3.9/site-packages/vaex/, in Expression.sum(self, axis, binby, limits, shape, selection, delay, progress)
    908     del kwargs['dtype']
    909     kwargs['expression'] = expression.expression
--> 910     return self.ds.sum(**kwargs)
    911 else:
    912     return expression

File ~/mambaforge/envs/temp_env/lib/python3.9/site-packages/vaex/, in DataFrame.sum(self, expression, binby, limits, shape, selection, delay, progress, edges, array_type)
   1102 @docsubst
   1103 @stat_1d
   1104 def sum(self, expression, binby=[], limits=None, shape=default_shape, selection=False, delay=False, progress=None, edges=False, array_type=None):
   1105     """Calculate the sum for the given expression, possible on a grid defined by binby
   1107     Example:
   1123     :return: {return_stat_scalar}
   1124     """
-> 1125     return self._compute_agg('sum', expression, binby, limits, shape, selection, delay, edges, progress, array_type=array_type)
   1126     @delayed
   1127     def finish(*sums):
   1128         return vaex.utils.unlistify(waslist, sums)

File ~/mambaforge/envs/temp_env/lib/python3.9/site-packages/vaex/, in DataFrame._compute_agg(self, name, expression, binby, limits, shape, selection, delay, edges, progress, extra_expressions, array_type)
    934 stats = [compute(expression, binners, selection=selection, edges=edges) for expression in expressions]
    935 var = finish(binners, *stats)
--> 936 return self._delay(delay, progressbar.exit_on(var))

File ~/mambaforge/envs/temp_env/lib/python3.9/site-packages/vaex/, in DataFrame._delay(self, delay, task, progressbar)
   1773     return task
   1774 else:
-> 1775     self.execute()
   1776     return task.get()

File ~/mambaforge/envs/temp_env/lib/python3.9/site-packages/vaex/, in DataFrame.execute(self)
    415         print(repr(task))
    416 if self.executor.tasks:
--> 417     self.executor.execute()

File ~/mambaforge/envs/temp_env/lib/python3.9/site-packages/vaex/, in ExecutorLocal.execute(self)
    307 def execute(self):
--> 308     for _ in self.execute_generator():
    309         pass

File ~/mambaforge/envs/temp_env/lib/python3.9/site-packages/vaex/, in ExecutorLocal.execute_generator(self, use_async)
    430         logger.debug("Pass cancelled because of the global progress event: %r", self.signal_progress.callbacks)
    431     return ok_tasks and ok_executor and not all_stopped
--> 432 yield from, dataset.chunk_iterator(run.dataset_deps, chunk_size),
    433                                     dataset.row_count,
    434                                     progress=progress,
    435                                     cancel=lambda: self._cancel(run), unpack=True, run=run, use_async=use_async)
    436 duration_wallclock = time.time() - t0
    437 logger.debug("executing took %r seconds", duration_wallclock)

File ~/mambaforge/envs/temp_env/lib/python3.9/site-packages/vaex/, in, callable, iterator, count, on_error, progress, cancel, unpack, use_async, **kwargs_extra)
    104         iterator = (loop.run_in_executor(self, lambda value=value: wrapped(value)) for value in cancellable_iter())
    105     else:
--> 106         iterator = super(ThreadPoolIndex, self).map(wrapped, cancellable_iter())
    107 total = 0
    108 iterator = iter(buffer(iterator, self._max_workers + 3))

File ~/mambaforge/envs/temp_env/lib/python3.9/concurrent/futures/, in, fn, timeout, chunksize, *iterables)
    595 if timeout is not None:
    596     end_time = timeout + time.monotonic()
--> 598 fs = [self.submit(fn, *args) for args in zip(*iterables)]
    600 # Yield must be hidden in closure so that the futures are submitted
    601 # before the first iterator value is required.
    602 def result_iterator():

File ~/mambaforge/envs/temp_env/lib/python3.9/concurrent/futures/, in <listcomp>(.0)
    595 if timeout is not None:
    596     end_time = timeout + time.monotonic()
--> 598 fs = [self.submit(fn, *args) for args in zip(*iterables)]
    600 # Yield must be hidden in closure so that the futures are submitted
    601 # before the first iterator value is required.
    602 def result_iterator():

File ~/mambaforge/envs/temp_env/lib/python3.9/site-packages/vaex/, in<locals>.cancellable_iter()
     91 def cancellable_iter():
---> 92     for value in chunk_iterator:
     93         yield value
     94         if cancelled:

File ~/mambaforge/envs/temp_env/lib/python3.9/site-packages/vaex/arrow/, in DatasetArrowBase.chunk_iterator(self, columns, chunk_size, reverse, start, end)
    168     return
    170 workers = get_main_io_pool()._max_workers
--> 171 for chunks_future in buffer(self._chunk_producer(columns, chunk_size, start=start, end=end or self._row_count), workers+3):
    172     chunks = chunks_future.result()
    173     chunks_ready_list.append(chunks)

File ~/mambaforge/envs/temp_env/lib/python3.9/site-packages/vaex/, in buffer(i, n)
      5         values.append(next(i))
      6     while True:
----> 7         values.append(next(i))
      8         yield values.pop(0)
      9 except StopIteration:

File ~/mambaforge/envs/temp_env/lib/python3.9/site-packages/vaex/arrow/, in DatasetArrowBase._chunk_producer(self, columns, chunk_size, reverse, start, end)
    112     chunks = {name: chunks_physical.get(name, chunks_partition.get(name)) for name in columns}
    113     return chunks
--> 114 assert length > 0
    115 if start > chunk_start:
    116     # this means we have to cut off a piece of the beginning
    117     if end < chunk_end:
    118         # AND the end


After that, I started digging deeper and encountered some weird behavior. If I do df[2_087_185:2_087_185 + 2] or df[2_087_185 + 2:2_087_185 + 4], it'll output 2 rows as usual, however, once I pick two rows in between df[2_087_185 + 1:2_087_185 + 3], I'll get the following error (quite similar to the error above)

[07/28/22 17:01:04] ERROR    error evaluating: A at rows 0-2
                             Traceback (most recent call last):                              
                               File "/Users/Mikita_Karotchykau/mambaforge/e                  
                   ", line 4089, in table_part                              
                                 values = dict(zip(column_names,                             
                               File "/Users/Mikita_Karotchykau/mambaforge/e                  
                   ", line 3090, in evaluate                                
                             i1=i1, i2=i2, out=out, selection=selection,                     
                             filtered=filtered, array_type=array_type,                       
                             parallel=parallel, chunk_size=chunk_size,                       
                               File "/Users/Mikita_Karotchykau/mambaforge/e                  
                   ", line 6428, in                                         
                                 arrays[expression] =                                        
                             arrays[expression][0:end-start]  # materialize                  
                             fancy columns (lazy, indexed)                                   
                               File "/Users/Mikita_Karotchykau/mambaforge/e                  
                   ", line 582, in __getitem__                                
                                 for chunk_start, chunk_end, chunks in                       
                               File "/Users/Mikita_Karotchykau/mambaforge/e                  
                   ", line 1055, in chunk_iterator                            
                                 yield from                                                  
                             chunk_size=chunk_size, reverse=reverse,                         
                             start=self.start, end=self.end)                                 
                               File "/Users/Mikita_Karotchykau/mambaforge/e                  
                             /", line 171, in chunk_iterator                       
                                 for chunks_future in                                        
                             chunk_size, start=start, end=end or                             
                             self._row_count), workers+3):                                   
                               File "/Users/Mikita_Karotchykau/mambaforge/e                  
                   ", line 5, in buffer                                     
                               File "/Users/Mikita_Karotchykau/mambaforge/e                  
                             /", line 114, in _chunk_producer                      
                                 assert length > 0                                           

                             During handling of the above exception,                         
                             another exception occurred:                                     

                             Traceback (most recent call last):                              
                               File "/Users/Mikita_Karotchykau/mambaforge/e                  
                   ", line 4094, in table_part                              
                                 values[name] = df.evaluate(name)                            
                               File "/Users/Mikita_Karotchykau/mambaforge/e                  
                   ", line 3090, in evaluate                                
                             i1=i1, i2=i2, out=out, selection=selection,                     
                             filtered=filtered, array_type=array_type,                       
                             parallel=parallel, chunk_size=chunk_size,                       
                               File "/Users/Mikita_Karotchykau/mambaforge/e                  
                   ", line 6428, in                                         
                                 arrays[expression] =                                        
                             arrays[expression][0:end-start]  # materialize                  
                             fancy columns (lazy, indexed)                                   
                               File "/Users/Mikita_Karotchykau/mambaforge/e                  
                   ", line 582, in __getitem__                                
                                 for chunk_start, chunk_end, chunks in                       
                               File "/Users/Mikita_Karotchykau/mambaforge/e                  
                   ", line 1055, in chunk_iterator                            
                                 yield from                                                  
                             chunk_size=chunk_size, reverse=reverse,                         
                             start=self.start, end=self.end)                                 
                               File "/Users/Mikita_Karotchykau/mambaforge/e                  
                             /", line 171, in chunk_iterator                       
                                 for chunks_future in                                        
                             chunk_size, start=start, end=end or                             
                             self._row_count), workers+3):                                   
                               File "/Users/Mikita_Karotchykau/mambaforge/e                  
                   ", line 5, in buffer                                     
                               File "/Users/Mikita_Karotchykau/mambaforge/e                  
                             /", line 114, in _chunk_producer                      
                                 assert length > 0                                           
This error is repeated 8 times, 2 times for each column (because there are 2 rows). Also, it outputs the following table # A B C D
0 error error error error
1 error error error error

This is really weird because all these rows from these selections above are just normal rows, there are no NaN values, etc. These are just regular strings.

I hope I've provided enough information.

Software information

Additional information Please note that there was a similar error log in issue#1948, however, I don't think RAM is the issue here, since I tried to output only 2 rows (see above).

JovanVeljanoski commented 2 years ago


Can you share your version of pyarrow please?

Is there something special about the contents of the file or the parquet files?

Can you maybe use df.dtypes or df.A.dtype to see the dtype of a column (the ) might render it a bit differently.

I can't reproduce it with a similar parquet file (dataset) that we use for testing:

karotchykau commented 2 years ago

Can you share your version of pyarrow please?

# Name                    Version                   Build  Channel
pyarrow                   8.0.0           py39hc4c82e3_1_cpu    conda-forge

Is there something special about the contents of the file or the parquet files?

There is a single 22 GB parquet file. This file was created incrementally from dictionaries. Here is what I mean

schema = pa.schema([(x, pa.string()) for x in ["A", "B", "C", "D"]])
pqwriter = None
for data_dict in data_dicts:
    table = pa.Table.from_pydict(data_dict, schema=schema)
    if pqwriter is None:
        pqwriter = pq.ParquetWriter("dataset.parquet", schema)

if pqwriter:

Can you maybe use df.dtypes or df.A.dtype to see the dtype of a column (the ) might render it a bit differently.

df.dtypes outputs

A    string
B    string
C    string
D    string
dtype: object

df.A.dtype outputs

karotchykau commented 2 years ago

I've tried to put every data_dict in a different parquet file and it seems to work - the error has disappeared (vaex.open_many is used). However, I don't get why the approach above (when you use ParquetWriter to write to a single file) doesn't work.

JovanVeljanoski commented 2 years ago

Hmm that is really strange since the way you write the parquet file is similar to how vaex exports it.. So I tried to reproduce your example like this:

import vaex
import pyarrow as pa
import pyarrow.parquet as pq

n = 10_000
d = {'A': ['tom', 'sally', 'john'] * n,
     'B': ['Bostom', 'Toronto', 'Miami'] * n,
     'C': ['1234', '23423', '2134'] * n,
     'D': ['Cake', None, 'Coffee'] * n,

data_dicts = [d for i in range(1_000)]

schema = pa.schema([(x, pa.string()) for x in ["A", "B", "C", "D"]])
pqwriter = None
for data_dict in data_dicts:
    table = pa.Table.from_pydict(data_dict, schema=schema)
    if pqwriter is None:
        pqwriter = pq.ParquetWriter("tmp.parquet", schema)

if pqwriter:

And then I run

df ='tmp.parquet')
print(f'number of rows: {df.shape[0]:,}')

df.D.isna().sum()  # this works.

I see no problems..

Can you help me reproduce the issue?

Regarding your workaround - that is valid. You can also export the dataframe you get via open_many to a single parquet file, or for a (much!) better performance to arrow or hdf5.

karotchykau commented 2 years ago

Regarding your workaround - that is valid. You can also export the dataframe you get via open_many to a single parquet file, or for a (much!) better performance to arrow or hdf5.

Thank you. It works for now.

So I tried to reproduce your example like this

This code works perfectly fine for me as well. I've tried many ways to reproduce, however, I've encountered something else. Maybe the issue below is somehow related to the main one. So, I tried to run the following code

import vaex
import pyarrow as pa
import pyarrow.parquet as pq

n = 10_000
d = {"A": ["abc"] * n}

data_dicts = [d for i in range(1_000)]

schema = pa.schema([(x, pa.string()) for x in ["A"]])
pqwriter = None
for data_dict in data_dicts:
    table = pa.Table.from_pydict(data_dict, schema=schema)
    if pqwriter is None:
        pqwriter = pq.ParquetWriter("tmp.parquet", schema)

if pqwriter:

df ="tmp.parquet")
print(f"number of rows: {df.shape[0]:,}")
print(df["A"].isna().sum())  # Fails here.

My kernel dies in Jupyter but this is not related to out of memory issues. In PyCharm I get the following information

Process finished with exit code 138 (interrupted by signal 10: SIGBUS)

Perhaps all these things are somehow related to macOS? Please note that I have a standard Intel processor here.

Please let me now if you can reproduce.

JovanVeljanoski commented 2 years ago


Your code above runs perfectly well for me.

If you are running it through PyCharm, can you try it via ipython terminal or otherwise, just to eliminate issues related to pycharm? We've had issues in the past that for whatever reason pycharm was the cause.