Open pedroerp opened 1 month ago
Hi @pedroerp , thanks for the proposal!
Since dictionaries cannot wrap around more than one vector, at times merge join may return fewer than outputBatchSize_ rows.
Do you have a plan to extend DictionaryVector to wrap more than one vectors, so that in some cases it can avoid too small-sized batches? For example after filter, partition, etc.
Do you have a plan to extend DictionaryVector to wrap more than one vectors, so that in some cases it can avoid too small-sized batches?
that's a great question. I've been debating that myself, and don't remember seeing discussion about this in the past.
@mbasmanova @oerling @Yuhta @xiaoxmeng thoughts?
Do you have a plan to extend DictionaryVector to wrap more than one vectors, so that in some cases it can avoid too small-sized batches?
that's a great question. I've been debating that myself, and don't remember seeing discussion about this in the past.
@mbasmanova @oerling @Yuhta @xiaoxmeng thoughts?
I've been thinking about this for a while. If the batch size is too small, it will not benefit from the vectorization model. In some systems such as duckdb, they cache upstream operator's output if the batch size is very small, see https://github.com/duckdb/duckdb/blob/d9efdd14270245c4369096e909acecea174d86cc/src/include/duckdb/execution/physical_operator.hpp#L221.
Here is a related discussion: https://github.com/facebookincubator/velox/issues/7801
That is slightly different though; I guess there is a point is potentially making dictionaries able to wrap around more than one inner vector.
Do you have a plan to extend DictionaryVector to wrap more than one vectors, so that in some cases it can avoid too small-sized batches?
that's a great question. I've been debating that myself, and don't remember seeing discussion about this in the past.
@mbasmanova @oerling @Yuhta @xiaoxmeng thoughts?
This is possible but will be a very big change and need to fix the UDFs to handle this. A more practical approach would be flatten it if there is no repetition in indices, or concatenate the dictionary values if indices are repeating.
This is possible but will be a very big change and need to fix the UDFs to handle this.
I guess this should be mostly covered if we assume decoded vector does the right thing. Ideally we shouldn't have too many vectorized functions actually relying on the dictionaries directly. Though I'm sure we have at least a few today. But agreed that this will be pretty involved and touch many parts of the codebase.
@pedroerp Even the interface of DecodedVector
need to be changed. index
, indices
, data
, base
in DecodedVector
will no longer make sense and we will need remove them and fix all the use sites as well. Also worse than that, we end up with a slower DecodedVector
that cannot handle complex types, and might need to move back to handle encoding manually for complex types and performance critical code.
Description
Merge joins are useful to join two sorted streams of data. For queries that require joining two large tables, it is a common pattern to store them as bucketed and sorted tables (sort at write-time), so that merge joins can be used to join them at read-time. If the streams are already sorted on the join key, merge join often (a) presents lower memory requirements since they do not need to materialize hash tables (the build side), and (b) are commonly faster since memory patterns are more predictable (it avoids hash table lookups). Merge joins are, however, harder to parallelize so their implementation is commonly single-threaded for a given stream.
Current Velox Implementation
Velox's current merge-join algorithm works in the following way. Starting from the left, it picks the first key and finds all consecutive occurrences of it. Then it looks for matches on the right side, and finds all consecutive occurrences. Once these two ranges of matches are found, it produces an output with the cartesian product of them, appending each row and corresponding left and right projections to the output batch. It then moves to the next key value on the left, and the same process is repeated. If keys without matches are found, they are ignored (or added, depending on left or right outer join semantics).
Since input streams are commonly large, only a window of batches from left and right are kept in memory, being discarded when all matches from keys on that buffer were produced, and loading more when the current buffer is exhausted but matches are still possible.
Opportunities
There are three main opportunities to can make this process more efficient:
A Vectorized Merge-Join Algorithm
The proposed vectorized merge join will always produce dictionaries, so it will first allocate a buffer to store the indices from the left and one buffer to store indices for right projections . Once key matches are identified, instead of copying projections from each row to the output, only left and right dictionary indices will be written.
Output and Buffer Alignment
Output is always produced outputBatchSize rows at a time. The vectorized merge join operator will generally return dictionaries which are wrapped around input (left) vectors. Since dictionaries cannot wrap around more than one vector, at times merge join may return fewer than outputBatchSize rows. Dictionaries for right projections are generated optimistically created; it will start by wrapping the current right vector, but if the output happens to span more than one right vector, it gets copied and flattened.
In internal experiments, we have seen the vectorized implementation to be ~10x more efficient, presenting 4x speed up to some real world pipelines.
Cc: @mbasmanova @Yuhta @xiaoxmeng @arhimondr @bikramSingh91 @kagamiori @majetideepak @aditi-pandit @FelixYBW