facebookincubator / velox

A composable and fully extensible C++ execution engine library for data management systems.
https://velox-lib.io/
Apache License 2.0
3.54k stars 1.17k forks source link

Maintain memory budget in Exchange #6005

Open mbasmanova opened 1 year ago

mbasmanova commented 1 year ago

Description

In Prestissimo, we are seeing OOMs caused by Exchange buffering up too much data. Often, TableWriter stage runs on a single (or a few) node and pulls data from fast producers in the previous stage (final aggregation). TableWriter runs single-threaded and is not able to consume data fast enough. This leads to OOMs.

To solve this issue we want to introduce flow control in Exchange. We want to keep track of how much data is already buffered and only ask for more data if there space. We would also want to make sure to limit the number of upstream produces (sources) we ask at the same time.

See https://github.com/facebookincubator/velox/pull/5953

CC: @pranjalssh @oerling @xiaoxmeng

mbasmanova commented 1 year ago

First, we want to fix Exchange to report stats (from ExchangeClient and ExchagceQueue) live. Currently these are reported only at the end of processing and if query fails (OOM) these are not reported at all.

Next, we want to fix PartitionedOutput operator to generate pages of no more that 1MB (give or take) to allow for honoring maxBytes limit specified in get-results requests from the downstream workers.

mbasmanova commented 1 year ago

PartitionedOutput::getOutput has logic to limit the size of each page to 32MB / . When all data goes to the same partition, each page ends up being pretty large, 32MB. Hence, get-results request is not able to honor maxBytes limit specified in the request unless that limit is >= 32MB.

      blockingReason_ = destination->advance(
          maxBufferedBytes_ / destinations_.size(),

      maxBufferedBytes_(ctx->task->queryCtx()
                            ->queryConfig()
                            .maxPartitionedOutputBufferSize()) {

  // Returns the target size for a Task's buffered output. The
  // producer Drivers are blocked when the buffered size exceeds
  // this. The Drivers are resumed when the buffered size goes below
  // PartitionedOutputBufferManager::kContinuePct % of this.
  uint64_t maxPartitionedOutputBufferSize() const {
    static constexpr uint64_t kDefault = 32UL << 20;
    return get<uint64_t>(kMaxPartitionedOutputBufferSize, kDefault);
  }
mbasmanova commented 1 year ago

Also, on the receiving end we have single ExchangeClient with a single ExchangeQueue, but multiple Exchange operators fetching pages from the queue. Each "page" in the queue may have multiple PrestoPages inside. Exchange operators are processing these PrestoPages one at a time and waiting for downstream operators to consume these before processing next page. Hence, Exchange operators may hold pages in memory for a long time.

ExchangeClient may need to check the memory pool's used bytes in order to decide whether it can request more data from the producers.

Example, plan:

Exchange runs multi-threaded and pull data from a single Exchange Queue. It feeds pages into LocalExchange's queue. TableWriter runs single-threaded and consumes these pages slowly.

It is not enough for ExchangeClient to enforce a memory cap on the queue as pages fetched from the queue may sit in Exchange operators for a long time.

pranjalssh commented 1 year ago

Thanks for the detailed writeup.

It is not enough for ExchangeClient to enforce a memory cap on the queue as pages fetched from the queue may sit in Exchange operators for a long time.

Is this memory usage by exchange operators really significant? It should be no more than 32MB * number of drivers. While exchange client can easily use several GBs

pranjalssh commented 1 year ago

In Java implementation:

PartitionedOutput operator also has a max page size of 1MB like the current proposal: Code

PartitionedOutput operator breaks if page size > requested size. So response is always within limit. Code.

On the exchange side, it tracks average response size in a small window(not global average). It tracks both empty responses and responses with data, so it can request for more data when many sources reply as empty.