risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://go.risingwave.com/slack
Apache License 2.0
7.05k stars 580 forks source link

feat(executor): Stateful operators should stream newly arriving messages #2897

Open jon-chuang opened 2 years ago

jon-chuang commented 2 years ago

With https://github.com/singularity-data/risingwave/issues/2428 we observe the issue that there is a need to fetch entries per update, but sometimes (actually, quite frequently for tpch), updates come in batches of 1 or 2. Thus, we cannot exploit concurrency because the system is still written in a batch update fashion.

Hence, we should properly exploit concurrent streaming by allowing new messages to be added to a queue instantly, multiplexed into sync_queue and async_queue, sync_queue is a dequeue of work that can be performed instantly, async queue is an unbounded queue of work that completes async. Hopefully, we can reduce the overall latency this way.

Notes:

  1. This pattern is probably best applied to all stateful operators with async fetch and local cache (e.g. hash agg, hash join)

Here is a diagram illustrating how diverting the I/O from the main dataflow path can reduce the overall latency of the system. 716DFD4C-2468-44DC-9DC3-CB28578E1550

Here is a more detailed diagram taking into account stream chunk builder timeout and PK op dependency queue. The select_all priority is top to bottom.

1DF52856-7617-4EFA-B68F-EA31D3470577

TennyZhuang commented 2 years ago

updates come in batches of 1 or 2.

How does it happen? Which executor produces the small chunk?

fuyufjh commented 2 years ago

updates come in batches of 1 or 2.

Same question. Can this be solved by turning the checkpoint interval larger?

lmatz commented 2 years ago

I guess the point is that executors should not wait for an input chunk to be fully processed before processing the next input chunk, if CPU is idle.

To generalize a bit, we should do things as if vectorization provides 0 benefits. But since vectorization actually makes things faster, we should have a tradeoff/balance here.

yuhao-su commented 2 years ago

To generalize a bit, we should do things as if vectorization provides 0 benefits. But since vectorization actually makes things faster, we should have a tradeoff/balance here.

Vectorization can be preserved in sync_queue here. For async_queue there might be a tradeoff.

jon-chuang commented 2 years ago

But since vectorization actually makes things faster, we should have a tradeoff/balance here.

The tradeoff is latency v.s. throughput. So the solution I thought best was to wait for a fixed timeout (proposed to be 1ms) since last message before forwarding accumulated chunks. Every operator that does accumulation should follow this behaviour. There is already an issue for this theme: https://github.com/singularity-data/risingwave/issues/2755

How does it happen? Which executor produces the small chunk?

I believe it was tpch q14 and q3. q5 as well probably. We should have output metrics for chunk sizes at each operator...

Vectorization can be preserved in sync_queue here. For async_queue there might be a tradeoff.

I don't think vectorization makes that much of a difference for stateful operators, which must spend time doing random lookup at best and at worst, async I/O. Correct me if I'm wrong. Probably vectorization is only useful for simple agg/simple arithmetic expressions (e.g. projection expression) or other stateless operators?

jon-chuang commented 2 years ago

Same question. Can this be solved by turning the checkpoint interval larger?

How to do this?

skyzh commented 2 years ago

Same question. Can this be solved by turning the checkpoint interval larger?

How to do this?

In risingwave.toml we have a checkpoint interval config. Simply update it and start risedev cluster.

jon-chuang commented 2 years ago

I had the checkpoint interval set to 100ms.

What I found was that it was the updates which would result in cache hit that were very few in number. I guess it was usually one side of the updates (the right side prior to the select bias fix?) that was producing very few rows/s.

Should be investigated further... (with metrics)

yuhao-su commented 2 years ago

Also might cause a problem producing duplicate pks... can be resolved if we make join key part of pk?

pk:       1st col
Join key: 2nd col

I 1 2 unmatched
D 1 2 unmatched
I 1 3 matched
D 1 3 matched

After reorder due to async fetching

I 1 3 matched
I 1 2 unmatched
D 1 2 unmatched
D 1 3 matched

This produce 2 identical insert PK to the downstream
fuyufjh commented 2 years ago

I guess the point is that executors should not wait for an input chunk to be fully processed before processing the next input chunk, if CPU is idle.

Yes, I agree this is his point. But I think this proposal would not improve the latency directly, because latency is determined by barrier, and barrier cannot pass until all pending messages in async_queue to be processed. In another words, the quick messages do not help to improve the latency of barriers.

Instead, this might improve the throughput by batching all "cache-missed" data together to emit requests to S3, hopefully. But as you know performance issues are always very tricky, it can only work as expect when the bottleneck is there.

jon-chuang commented 2 years ago

Actually some of the discussion from my end might be wrong, there is indeed vectorization for hash tables (vertical vectorization). It's basically fine to apply it in our case as we always need to first probe the hashmap cache prior to multiplexing into sync/async. The probing itself can be vectorized, as long as the cache size is small and fits in L3 cache... See e.g. https://www.cockroachlabs.com/blog/vectorized-hash-joiner/ for reference.

Though, it probably doesn't matter much since our system currently switches contexts so frequently that most of that cache is going to be polluted anyway... but it may still be worth trying in the future.

If the system is doing a lot of work, it may be worth pinning stateful operators to particular threads. I'm not sure if this is currently the situation in tokio... (whether tasks are pinned to particular threads for their entire lifespan)

jon-chuang commented 2 years ago

In another words, the quick messages do not help to improve the latency of barriers.

Actually, I disagree here.

Consider the following: arrival time: 0ms message(10 sync, 10 async) async I/O: latency 5ms arrival time: 1ms message(10 sync, 10 async) async I/O: latency 5ms

Total time for main: 5 10 2 = 100ms Total time for concurrent fetch (https://github.com/singularity-data/risingwave/issues/2428): 5 + 5 = 10ms Total time for this streaming messages (this issue): 5 + 1 = 6ms

Basically, even with concurrent fetch, if your message frequency < async I/O latency (due to high streaming throughput), you will expand out that latency to the async I/O latency. Rather than combatting with more parallelism (more shards), you could just process more messages' async I/O concurrently by adopting the streaming approach.

I also tend to think that the faster we allow the downstream operators to do useful work, the better utilization and latency we can get from our system. For instance, downstream async I/O could begin earlier as a result.

Probably there is some math to show this by modelling as fluid flows...

jon-chuang commented 2 years ago

Also might cause a problem producing duplicate pks... can be resolved if we make join key part of pk?

From our discussion offline @yuhao-su and I concluded this will cause unnecessary columns to be present in downstream output of joins, especially bad for many-way joins.

We discussed and I think I have a solution, I will write an RFC/design doc. Basically, its just having updates with existing PK in the queue wait in special queues until their prior update is completed, while also allowing for prefetching relevant join rows for those stalled rows.

fuyufjh commented 2 years ago

In another words, the quick messages do not help to improve the latency of barriers.

Actually, I disagree here.

Consider the following:

arrival time: 0ms message(10 sync, 10 async) async I/O: latency 5ms

arrival time: 1ms message(10 sync, 10 async) async I/O: latency 5ms

Total time for main: 5 10 2 = 100ms

Total time for concurrent fetch (https://github.com/singularity-data/risingwave/issues/2428): 5 + 5 = 10ms

Total time for this streaming messages (this issue): 5 + 1 = 6ms

Basically, even with concurrent fetch, if your message frequency < async I/O latency (due to high streaming throughput), you will expand out that latency to the async I/O latency. Rather than combatting with more parallelism (more shards), you could just process more messages' async I/O concurrently by adopting the streaming approach.

I also tend to think that the faster we allow the downstream operators to do useful work, the better utilization and latency we can get from our system. For instance, downstream async I/O could begin earlier as a result.

Probably there is some math to show this by modelling as fluid flows...

OK, I misunderstood it before. This approach seems an enhanced version of concurrent fetch to me.

Furthermore, I think it's possible to preserve the message order in this case. Similar to the TCP protocol, a later packet/fetch may be received first, but we can just put it in the queue until all its prior packet done

jon-chuang commented 2 years ago

a later packet/fetch may be received first, but we can just put it in the queue until all its prior packet done

Yes, but actually, all we need to preserve is update ordering for a given PK. Would you agree?

fuyufjh commented 2 years ago

a later packet/fetch may be received first, but we can just put it in the queue until all its prior packet done

Yes, but actually, all we need to preserve is update ordering for a given PK. Would you agree?

In another word, only insert messages (without any delete/update event between them) are reorderable?

jon-chuang commented 2 years ago

In another word, only insert messages (without any delete/update event between them) are reorderable?

Actually, no, invidivual insert, delete, update can be reordered as long as the order of the row + op is preserved for a given PK. That is unless PK can change for a given row...

fuyufjh commented 2 years ago

In another word, only insert messages (without any delete/update event between them) are reorderable?

Actually, no, invidivual insert, delete, update can be reordered as long as the order of the row + op is preserved for a given PK. That is unless PK can change for a given row...

Yes, but this algorithm need to maintain a set of PK in this epoch, which seems a little complex to me.

In practice, I prefer the simplest solution - just preserve the order of all rows, no matter what they are. Will this harm the performance?

jon-chuang commented 2 years ago

well, it would if majority of the rows hit cache, but a small number result in async I/O. Then there would be a large fraction of rows waiting for rows ahead of them.

I agree however that that's a simpler solution.

jon-chuang commented 2 years ago

That is unless PK can change for a given row...

Actually, is this even possible? Seems like no right? An update only occurs if a row identified by PK is overriden with a new value with the given PK.

jon-chuang commented 2 years ago

Yes, but this algorithm need to maintain a set of PK in this epoch, which seems a little complex to me.

Its slightly complex, but maybe I will write up a solution and we can judge if its worth it to test out. My sense is that its not significantly complex.

lmatz commented 2 years ago

Yes, but this algorithm need to maintain a set of PK in this epoch, which seems a little complex to me.

Its slightly complex, but maybe I will write up a solution and we can judge if its worth it to test out. My sense is that its not significantly complex.

Is it possible to implement it in a way that we still can disallow the reordering of rows but only do concurrent state fetching for consecutive data chunks?

I think it would be very helpful for us to understand what each technique can bring to us 🤔

jon-chuang commented 2 years ago

Is it possible to implement it in a way that we still can disallow the reordering of rows but only do concurrent state fetching for consecutive data chunks?

Yes. But as discussed:

well, it would if majority of the rows hit cache, but a small number result in async I/O. Then there would be a large fraction of rows waiting for rows ahead of them.

Basically, if you expect ocassional cache miss but high cache hit, as well as low fraction of concurrent PK in progress, downstream can process new rows much more quickly if you allow out of order.


Also, here is the pseudocode for hashjoin.

Now that I've written it out, its really not that complex!

Just an additional small hashmap (probably < 1000 entries) for pending PK entries:

async fn fetch_and_process_join_key(join_key: Row) {
    let jk_entries = self.cache.fetch_both_sides(join_key).await;

    // once we fetch the JK entry via async I/O, we can process it 
    // and process or queue any dependent entries in the out-of-order queue.
    for row in self.async_queue.get(join_key) {
        self.process(row, entries);
        let pk_entry = self.pk_in_progress.get_mut(row.pk).unwrap();
        // process the next row for this PK
        if !pk_entry.is_empty() {
            self.process_or_queue(pk_entry.pop_head());
        } else {
          // there were no pending rows for this pk
          self.pk_in_progress.delete(row.pk)
        }
    }
}

fn process_chunk(&mut self, chunk: StreamChunk) {
    for row in chunk {
      // delay the processing until pk dependency completes
      if let Some(mut in_progress_rows) = self.pk_in_progress.get_mut(row.pk) { 
        // clone it so we can throw away this chunk
        in_progress_rows.push(pk.row.to_owned_row());
        if self.cache.get_both_sides(row.join_key).is_none() 
          && !self.async_queue.contains(row.join_key) 
        {
          // trigger a prefetch of the JKE (fetch without adding row to the async queue).
          self.task_stream.push(fetch_and_process_join_key(row.join_key.clone());
        }
      } else {
        // We are ok to process the row in any order because there is no outstanding
        // task for that PK preceeding this row.
        self.process_or_queue(row)
    }
}

fn process_or_queue(&mut self, row: &RowRef) {
  if let Some(entries) = self.cache.get_both_sides(row.join_key) {
    // if available in cache, process instantly
    self.process(row, entries);
  } else {
    // else, not available locally, add to async queue
    let entry = self.async_queue.entry(row.join_key).or_insert_with(Vec::new);
    if entry.is_empty() {
        // trigger the fetch of the join key data
        self.task_stream.push(fetch_and_process_join_key(row.join_key.clone());
    }
    // clone this row and add it to the rows queued for the given join key.
    entry.push(pk.row.to_owned_row());
  }
}

In summary, we maintain two maps for pending rows: JK rows (processed once JK entries are fetched) and PK pending rows (added to queue or processed once dependent PK completes).

jon-chuang commented 2 years ago

Note that flink also has a knob allowing trading off latency for throughput in terms of the buffering timeout:

https://www.ververica.com/blog/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink