vaexio / vaex

Out-of-Core hybrid Apache Arrow/NumPy DataFrame for Python, ML, visualization and exploration of big tabular data at a billion rows per second 🚀
https://vaex.io
MIT License
8.28k stars 589 forks source link

[FEATURE-REQUEST] Joining on more than one columns. #1948

Closed ashsharma96 closed 1 year ago

ashsharma96 commented 2 years ago

Hey Vaex Team, Any idea how we can join on more than one column in vaex? I've tried using that merging column hack but it gives error in my case. So how we join on more than one column.

JovanVeljanoski commented 2 years ago

Officially not supported. Unofficially - create a single key column on which to join by concatenating the multiple columns into one.

ashsharma96 commented 2 years ago

Hey @JovanVeljanoski I tried but its giving abnormal behavior. I'm using this code:

df['merging_column']= df.tm_mid.astype(str) +  df.tm_sku.astype(str)
product['merging_column']= product.tm_mid.astype(str) + product.tm_sku.astype(str)
mergedStuff = df.join(product, on ='merging_column', how='left', rsuffix='_').drop(['merging_column','tm_mid_', 'tm_sku_', 
'tm_sid_', 'merging_column_'])

and its throwing this error:

[02/24/22 16:49:59] ERROR    error evaluating: brand at rows                [dataframe.py]
(file:///opt/conda/lib/python3.7/site-packages/vaex/dataframe.py):[4001](file:///opt/conda/lib/python3.7/site- 
packages/vaex/dataframe.py#4001)
                         14679100-14679105                                               
                         Traceback (most recent call last):                              
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataframe.py", line 3993, in table_part                   
                             values = dict(zip(column_names,                             
                         df.evaluate(column_names)))                                     
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataframe.py", line 2998, in evaluate                     
                             return                                                      
                         self._evaluate_implementation(expression,                       
                         i1=i1, i2=i2, out=out, selection=selection,                     
                         filtered=filtered, array_type=array_type,                       
                         parallel=parallel, chunk_size=chunk_size,                       
                         progress=progress)                                              
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataframe.py", line 6353, in                              
                         _evaluate_implementation                                        
                             df.map_reduce(assign, lambda *_: None,                      
                         expression_to_evaluate, progress=progress,                      
                         ignore_filter=False, selection=selection,                       
                         pre_filter=use_filter, info=True,                               
                         to_numpy=False, name="evaluate")                                
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataframe.py", line 429, in map_reduce                    
                             return self._delay(delay, task)                             
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataframe.py", line 1688, in _delay                       
                             self.execute()                                              
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataframe.py", line 412, in execute                       
                             self.executor.execute()                                     
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/execution.py", line 308, in execute                       
                             for _ in self.execute_generator():                          
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/execution.py", line 435, in                               
                         execute_generator                                               
                             cancel=lambda: self._cancel(run),                           
                         unpack=True, run=run, use_async=use_async)                      
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/multithreading.py", line 106, in map                      
                             iterator = super(ThreadPoolIndex,                           
                         self).map(wrapped, cancellable_iter())                          
                           File "/opt/conda/lib/python3.7/concurrent/fu                  
                         tures/_base.py", line 575, in map                               
                             fs = [self.submit(fn, *args) for args in                    
                         zip(*iterables)]                                                
                           File "/opt/conda/lib/python3.7/concurrent/fu                  
                         tures/_base.py", line 575, in <listcomp>                        
                             fs = [self.submit(fn, *args) for args in                    
                         zip(*iterables)]                                                
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/multithreading.py", line 92, in                           
                         cancellable_iter                                                
                             for value in chunk_iterator:                                
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataset.py", line 1167, in                                
                         chunk_iterator                                                  
                             yield from                                                  
                         self.original.chunk_iterator(columns,                           
                         chunk_size=chunk_size, reverse=reverse)                         
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/join.py", line 64, in chunk_iterator                      
                             yield from                                                  
                         self.original.chunk_iterator(*args, **kwargs)                   
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataset.py", line 1237, in                                
                         chunk_iterator                                                  
                             self.right.chunk_iterator(columns_right,                    
                         chunk_size, reverse=reverse)):                                  
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataset.py", line 1114, in                                
                         chunk_iterator                                                  
                             yield from                                                  
                         self._default_chunk_iterator(self._columns,                     
                         columns, chunk_size, reverse=reverse)                           
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataset.py", line 508, in                                 
                         _default_chunk_iterator                                         
                             yield i1, i2, reader()                                      
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataset.py", line 499, in reader                          
                             chunks = {k: array_map[k][i1:i2] for k in                   
                         columns}                                                        
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataset.py", line 499, in <dictcomp>                      
                             chunks = {k: array_map[k][i1:i2] for k in                   
                         columns}                                                        
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/column.py", line 366, in __getitem__                      
                             ar_unfiltered = ar_unfiltered[i1:i2+1]                      
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataset.py", line 581, in __getitem__                     
                             ds = self.ds.__getitem__(item)                              
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataset.py", line 448, in __getitem__                     
                             return self.slice(item.start or 0,                          
                         item.stop or self.row_count)                                    
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataset.py", line 1015, in slice                          
                             assert filter.sum() == expected_length                      
                         AssertionError                                                  

                         During handling of the above exception,                         
                         another exception occurred:                                     

                         Traceback (most recent call last):                              
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataframe.py", line 3998, in table_part                   
                             values[name] = df.evaluate(name)                            
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataframe.py", line 2998, in evaluate                     
                             return                                                      
                         self._evaluate_implementation(expression,                       
                         i1=i1, i2=i2, out=out, selection=selection,                     
                         filtered=filtered, array_type=array_type,                       
                         parallel=parallel, chunk_size=chunk_size,                       
                         progress=progress)                                              
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataframe.py", line 6353, in                              
                         _evaluate_implementation                                        
                             df.map_reduce(assign, lambda *_: None,                      
                         expression_to_evaluate, progress=progress,                      
                         ignore_filter=False, selection=selection,                       
                         pre_filter=use_filter, info=True,                               
                         to_numpy=False, name="evaluate")                                
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataframe.py", line 429, in map_reduce                    
                             return self._delay(delay, task)                             
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataframe.py", line 1688, in _delay                       
                             self.execute()                                              
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataframe.py", line 412, in execute                       
                             self.executor.execute()                                     
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/execution.py", line 308, in execute                       
                             for _ in self.execute_generator():                          
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/execution.py", line 435, in                               
                         execute_generator                                               
                             cancel=lambda: self._cancel(run),                           
                         unpack=True, run=run, use_async=use_async)                      
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/multithreading.py", line 106, in map                      
                             iterator = super(ThreadPoolIndex,                           
                         self).map(wrapped, cancellable_iter())                          
                           File "/opt/conda/lib/python3.7/concurrent/fu                  
                         tures/_base.py", line 575, in map                               
                             fs = [self.submit(fn, *args) for args in                    
                         zip(*iterables)]                                                
                           File "/opt/conda/lib/python3.7/concurrent/fu                  
                         tures/_base.py", line 575, in <listcomp>                        
                             fs = [self.submit(fn, *args) for args in                    
                         zip(*iterables)]                                                
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/multithreading.py", line 92, in                           
                         cancellable_iter                                                
                             for value in chunk_iterator:                                
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataset.py", line 1167, in                                
                         chunk_iterator                                                  
                             yield from                                                  
                         self.original.chunk_iterator(columns,                           
                         chunk_size=chunk_size, reverse=reverse)                         
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/join.py", line 64, in chunk_iterator                      
                             yield from                                                  
                         self.original.chunk_iterator(*args, **kwargs)                   
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataset.py", line 1237, in                                
                         chunk_iterator                                                  
                             self.right.chunk_iterator(columns_right,                    
                         chunk_size, reverse=reverse)):                                  
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataset.py", line 1114, in                                
                         chunk_iterator                                                  
                             yield from                                                  
                         self._default_chunk_iterator(self._columns,                     
                         columns, chunk_size, reverse=reverse)                           
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataset.py", line 508, in                                 
                         _default_chunk_iterator                                         
                             yield i1, i2, reader()                                      
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataset.py", line 499, in reader                          
                             chunks = {k: array_map[k][i1:i2] for k in                   
                         columns}                                                        
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataset.py", line 499, in <dictcomp>                      
                             chunks = {k: array_map[k][i1:i2] for k in                   
                         columns}                                                        
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/column.py", line 366, in __getitem__                      
                             ar_unfiltered = ar_unfiltered[i1:i2+1]                      
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataset.py", line 581, in __getitem__                     
                             ds = self.ds.__getitem__(item)                              
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataset.py", line 448, in __getitem__                     
                             return self.slice(item.start or 0,                          
                         item.stop or self.row_count)                                    
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataset.py", line 1015, in slice                          
                             assert filter.sum() == expected_length                      
                         AssertionError

I'm attaching the part of the error but it was showing error for all the columns. Abnormal behavior is like sometime this code runs perfectly and most of the time it throws these errors. Also first time it throws error but if I re run the code at the same time then it works fine. Any idea about this?

JovanVeljanoski commented 2 years ago

Without a reproducible example, i really can't offer much advice. I would refer you to this response, it is perfectly applicable here also: https://github.com/vaexio/vaex/issues/1729#issuecomment-1039054654

Some things to consider:

ashsharma96 commented 2 years ago

@JovanVeljanoski Thanks for the immediate reply. Replying to what you have asked:

  1. I may have duplicate values on left but not on right.
  2. No, I don't have any na/nan values in the join key on left or right.
  3. All type of joins shows errors.
  4. materialize didn't helped me. Same error coming.
root-11 commented 2 years ago

Overview:

image

The LEFT dataset is 25M rows, the RIGHT is 10.5M.

Worst case would be nested for loop with 25M*10.5M= 262.5M rows. This would never fit into ram.

The typical workload is a LEFT join with 3-5% duplication and 20%-100% match on RIGHT.

In attempts to find a method that works for worst case and is fast enough for the common case I've settled with:

1/5. Create LEFT COMPOSITE KEY INDEX using pool.apply(index task, LEFT dataframe) as HDF5 in chunks of 500_000 rows (50 tasks)

2/5. Create RIGHT COMPOSITE KEY INDEX using pool.apply(index task, RIGHT dataframe, limited to entries in LEFT COMPOSITE KEY INDEX) as HDF5 in chunks of 500_000 rows (21 tasks).

3/5. Create a task queue with the 153 join tasks

4/5. Do the join in chunks (with 500_000 + 500_000 rows) for each processor and store each result as HDF5 in the working directory. As the left and right composite key index are stored, all procs can access them in read-only mode. There is no inter-process communication. The tasks can probably be vectorised. The coordination mechanism is the task queue that only declares the search index limits.

What about join skewness? Let's argue there are no matches in the RIGHT in index % 3 == 0. Whenever a processor seeks to perform join, it will read the left comp. key index and find no matches in the right comp. key index. The size of the output chunk is now zero. As IO probably will be the bottleneck, the processor can quickly move onto the next task, this means that the queue of tasks will be consumed by the frequency of matches. Thereby the processor load will balance itself.

5/5. Create a virtual dataset (HDF5) from all the chunks. This should take milliseconds.

Preview shortcut for vaex: Compute first 5 hits for grid (0,0) and first five hits for grid (-1,-1) in as single task CPU.

composite key reference implementations: