apache / incubator-gluten

Gluten is a middle layer responsible for offloading JVM-based SQL engines' execution to native engines.
https://gluten.apache.org/
Apache License 2.0
1.22k stars 438 forks source link

[VL] Distinct aggregation OOM when getOutput #8025

Open ccat3z opened 3 days ago

ccat3z commented 3 days ago

Backend

VL (Velox)

Bug description

Distinct aggregation will merge all sorted spill file in getOutput() (SpillPartition::createOrderedReader). If there are too many spill files, reading the first batch of each file into memory will consume a significant amount of memory. In one of our internal cases, one task generated 300 spill files, which requires close to 3G of memory.

image

Possible workarounds:

  1. Increase kMaxSpillRunRows, 1M will generate too many spill files for hundreds million rows of input. https://github.com/apache/incubator-gluten/pull/7531
  2. Reduce kSpillWriteBufferSize to 1M or lower. Why it is set to 4M by default? Is there any experience in performance tuning?

Spark version

None

Spark configurations

No response

System information

No response

Relevant logs

No response

FelixYBW commented 2 days ago

Looks it's the same issue as shuffle spill. All spill merge should have the same issue. we should solve it by similar way.

1) is the tradeoff of spill file# and overhead memory. 2) It's set by pr #5088 , I didn't do any test on it. Not sure why I set it to 4M while Velox default 1M. Let's decrease the default value to 1M, also check the other configs

What's the vanilla spark's spill buffer size? is it configurable? in theory vanilla spark has the same issue as Gluten. @jinchengchenghh do you know?

FelixYBW commented 2 days ago

I can only find the configuration: spark.shuffle.spill.diskWriteBufferSize.

No spill merge one.

FelixYBW commented 2 days ago

Thank you, @ccat3z . I encounted the same issue in orderby operator and debugged several days!

jinchengchenghh commented 23 hours ago

The kSpillReadBufferSize controls the size to read each file, the ordered reader will create FileOutputStream for each file and allocate kSpillReadBufferSize (default 1MB) for one file. Can you try to adjust this value? Maybe we should add a new config to control the read buffer size for all the files. numFiles * bufferSize < threshold.

for (auto& fileInfo : files_) {
    streams.push_back(FileSpillMergeStream::create(
        SpillReadFile::create(fileInfo, bufferSize, pool, spillStats)));
  }

input_ = std::make_unique<common::FileInputStream>(
      std::move(file), bufferSize, pool_);

FileInputStream::FileInputStream(
    std::unique_ptr<ReadFile>&& file,
    uint64_t bufferSize,
    memory::MemoryPool* pool)
    : file_(std::move(file)),
      fileSize_(file_->size()),
      bufferSize_(std::min(fileSize_, bufferSize)),
      pool_(pool),
      readAheadEnabled_((bufferSize_ < fileSize_) && file_->hasPreadvAsync()) {
  VELOX_CHECK_NOT_NULL(pool_);
  VELOX_CHECK_GT(fileSize_, 0, "Empty FileInputStream");

  buffers_.push_back(AlignedBuffer::allocate<char>(bufferSize_, pool_)); // allocate buffer cause OOM
  if (readAheadEnabled_) {
    buffers_.push_back(AlignedBuffer::allocate<char>(bufferSize_, pool_));
  }
  readNextRange();
}

kSpillWriteBufferSize controls the serialization buffer, if up to this threshold, flush and compress the buffer.

jinchengchenghh commented 22 hours ago

Spark also open all the spill file to read.

final UnsafeSorterSpillMerger spillMerger = new UnsafeSorterSpillMerger(
        recordComparatorSupplier.get(), prefixComparator, spillWriters.size());
      for (UnsafeSorterSpillWriter spillWriter : spillWriters) {
        spillMerger.addSpillIfNotEmpty(spillWriter.getReader(serializerManager));
      }
      if (inMemSorter != null) {
        readingIterator = new SpillableIterator(inMemSorter.getSortedIterator());
        spillMerger.addSpillIfNotEmpty(readingIterator);
      }
      return spillMerger.getSortedIterator();

Spark use the PriorityQueue<UnsafeSorterIterator> priorityQueue to get the record to merge.

Comparator<UnsafeSorterIterator> comparator = (left, right) -> {
      int prefixComparisonResult =
        prefixComparator.compare(left.getKeyPrefix(), right.getKeyPrefix());
      if (prefixComparisonResult == 0) {
        return recordComparator.compare(
          left.getBaseObject(), left.getBaseOffset(), left.getRecordLength(),
          right.getBaseObject(), right.getBaseOffset(), right.getRecordLength());
      } else {
        return prefixComparisonResult;
      }
    };
    priorityQueue = new PriorityQueue<>(numSpills, comparator);

It has config to control the read buffer size (default 1 MB) as following:

  private[spark] val UNSAFE_SORTER_SPILL_READ_AHEAD_ENABLED =
    ConfigBuilder("spark.unsafe.sorter.spill.read.ahead.enabled")
      .internal()
      .version("2.3.0")
      .booleanConf
      .createWithDefault(true)

  private[spark] val UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE =
    ConfigBuilder("spark.unsafe.sorter.spill.reader.buffer.size")
      .internal()
      .version("2.1.0")
      .bytesConf(ByteUnit.BYTE)
      .checkValue(v => 1024 * 1024 <= v && v <= MAX_BUFFER_SIZE_BYTES,
        s"The value must be in allowed range [1,048,576, ${MAX_BUFFER_SIZE_BYTES}].")
      .createWithDefault(1024 * 1024)

class UnsafeSorterSpillReader

if (readAheadEnabled) {
        this.in = new ReadAheadInputStream(serializerManager.wrapStream(blockId, bs),
                bufferSizeBytes);
      } else {
        this.in = serializerManager.wrapStream(blockId, bs);
      }
      this.din = new DataInputStream(this.in);

It only needs to load one record at one time, after loaded, it will put the UnsafeSorterIterator reader to the priorityQueue again to load next record.

jinchengchenghh commented 22 hours ago

In that case, we need to respect UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE. Velox spill RowVector, so we must read the buffer size ahead. But it's better to add a config for total buffer size when UNSAFE_SORTER_SPILL_READ_AHEAD_ENABLED is false.

FelixYBW commented 1 hour ago

Thank you, @jinchengchenghh . With the tuning of kMaxSpillRunRows and kSpillWriteBufferSize. one of my task succeed but the other one still fails. Looks like it still have some large memory allocation in getoutput.

FelixYBW commented 41 minutes ago

The kSpillReadBufferSize controls the size to read each file, the ordered reader will create FileOutputStream for each file and allocate kSpillReadBufferSize (default 1MB) for one file. Can you try to adjust this value? Maybe we should add a new config to control the read buffer size for all the files. numFiles * bufferSize < threshold.

Can you add it as config in Gluten?

FelixYBW commented 9 minutes ago

The kSpillReadBufferSize controls the size to read each file, the ordered reader will create FileOutputStream for each file and allocate kSpillReadBufferSize (default 1MB) for one file. Can you try to adjust this value? Maybe we should add a new config to control the read buffer size for all the files. numFiles * bufferSize < threshold.

should we propose the way of https://github.com/apache/incubator-gluten/pull/7861?

FelixYBW commented 2 minutes ago

In that case, we need to respect UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE. Velox spill RowVector, so we must read the buffer size ahead. But it's better to add a config for total buffer size when UNSAFE_SORTER_SPILL_READ_AHEAD_ENABLED is false.

So the worst case of Vanilla spark is also 1M buffer per file, right? Let's hornor the value of spark.unsafe.sorter.spill.reader.buffer.size then. It may be set in queries.