risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://go.risingwave.com/slack
Apache License 2.0
7.08k stars 585 forks source link

(performance) Iterating over hash loop join should be done concurrently (single-threaded async) with `futures::join` #2428

Open jon-chuang opened 2 years ago

jon-chuang commented 2 years ago

Is your feature request related to a problem? Please describe. Currently, we are doing the following:

lookup cache -> await lookup s3 -> process hash entry -> lookup cache -> await lookup s3 -> process hash entry

Describe the solution you'd like

We should do the following:

lookup cache -> await lookup s3                                              ->  yield: process hash entry 
           yield: lookup cache -> await lookup s3                                       ->  yield: process hash entry 
                               yield: lookup cache -> await lookup s3-> yield: process hash entry 

To better take advantage of async.

Issues:

  1. currently, the outer body probably takes hold of a mutable references. It may be challenging to share such resources amongst loop body iterations concurrently. The solution may be mutexes but trying not to hold mutex locks across await boundaries.
  2. Since we can guarantee that the hash keys are mutually exclusive over iterations, one way is to use a sharded hashmap e.g. https://docs.rs/dashmap/latest/dashmap/ (but we need async, in particular, tokio-compatible RwLock)

This might be over optimization if the batch-sizes are generally small and we don't have a large number of iterations. Else, its potentially very bad for large amount of cache misses.

With this optimization, we should generally process a batch in every 1 RTT for arbitrary number of cache misses.

Describe alternatives you've considered NIL

Additional context Related: https://github.com/singularity-data/risingwave/issues/1870 Related: https://github.com/singularity-data/risingwave/issues/1405

BugenZhao commented 2 years ago

Good catch! FYI, we've previously also applied this idea in #568.

jon-chuang commented 2 years ago

Yes, I agree with the approach of fetching all unique keys in advanced first. This can help us to avoid complicated concurrent async logic in the loop body.

Also, pushing the key updates could also be done after the loop.

skyzh commented 2 years ago

+1 for this! One thing special about join is that it process one chunk at a time, while HashAgg process one barrier at a time. We can first improve next function to fetch all join states for a chunk, and later optimize it to be applied to all chunks in one barrier.

https://github.com/singularity-data/risingwave/pull/568 should also be a good example and a good reference. As everything is done in a single thread, we can use Option::take to transfer ownership. Therefore, we won't use dashmap. I previously found several soundness issues in dashmap, I'm not sure if it is fully fixed for now.

yuhao-su commented 2 years ago

Another difference between agg and join is 1 group key in agg produce corresponds to 1 row while 1 hash key corresponds to multiple rows. Anyway, would you like to give it a try? You can assign this issue to yourself if so.

jon-chuang commented 2 years ago

Ok, but I'll wait for https://github.com/singularity-data/risingwave/pull/2357 to be merged first.

jon-chuang commented 2 years ago

Yes, I agree with the approach of fetching all unique keys in advanced first. This can help us to avoid complicated concurrent async logic in the loop body.

Cache locality is probably better as well if we have no awaits in the loop body.