apache / incubator-gluten

Gluten is a middle layer responsible for offloading JVM-based SQL engines' execution to native engines.
https://gluten.apache.org/
Apache License 2.0
1.12k stars 405 forks source link

Arrow CSV reader peak memory is very large #5766

Open liujiayi771 opened 3 months ago

liujiayi771 commented 3 months ago

Backend

VL (Velox)

Bug description

When reading large CSV files, for example, when a single CSV file in a table is 300M, the peak memory usage of arrow memory pool during single-threaded reading can reach 500M. If the CSV is 2G, the peak memory usage can also increase to 1.7G. It looks like there is no memory leak, but the peak memory usage is very high.

From the code of Arrow Dataset, it seems that we are using the Streaming reader, theoretically the memory consumption may not increase proportionally with the size of the CSV file.

I have added some codes in the release method of ArrowNativeMemoryPool to check the peak memory.

@Override
public void release() throws Exception {
  System.out.println("peak=" + listener.peak() +", current=" + listener.current());
  if (arrowPool.getBytesAllocated() != 0) {
    LOGGER.warn(
        String.format(
            "Arrow pool still reserved non-zero bytes, "
                + "which may cause memory leak, size: %s. ",
            Utils.bytesToString(arrowPool.getBytesAllocated())));
  }
  arrowPool.close();
}

I also added some logs in arrow codes to check the peak memory.

Result<RecordBatchGenerator> CsvFileFormat::ScanBatchesAsync(
    const std::shared_ptr<ScanOptions>& scan_options,
    const std::shared_ptr<FileFragment>& file) const {
  auto this_ = checked_pointer_cast<const CsvFileFormat>(shared_from_this());
  auto source = file->source();
  auto reader_fut =
      OpenReaderAsync(source, *this, scan_options, ::arrow::internal::GetCpuThreadPool());
  auto generator = GeneratorFromReader(std::move(reader_fut), scan_options->batch_size);
  WRAP_ASYNC_GENERATOR_WITH_CHILD_SPAN(
      generator, "arrow::dataset::CsvFileFormat::ScanBatchesAsync::Next");
  std::cout << "memory=" << default_memory_pool()->bytes_allocated() << ", max=" << default_memory_pool()->max_memory() << std::endl;
  return generator;
}
image image

Spark version

None

Spark configurations

No response

System information

No response

Relevant logs

No response

liujiayi771 commented 3 months ago

cc @jinchengchenghh @zhztheplayer, thanks.

FelixYBW commented 3 months ago

I remember Arrow cached all record batches before it streams to Spark. In Gazelle we initially have the same issue, then have to customize some logic to do real streaming. @zhztheplayer do you remember?

zhztheplayer commented 3 months ago

@zhztheplayer do you remember?

I can't recall that. But it doesn't make sense to buffer all data for a reader.

I suppose @jinchengchenghh is looking into this.

jinchengchenghh commented 3 months ago

I could not reproduce this issue, I test TPCH Q6 with data 600G, and print the peak every time arrow reserve memory.

  public void reserve(long size) {
    synchronized (this) {
      sharedUsage.inc(size);
    }
    System.out.println(sharedUsage.peak());
  }

This is the test result

18350080
17825792:============================================>          (94 + 18) / 116]
18350080
17825792
18350080
17825792
18350080
17825792
18350080
17825792:=============================================>         (95 + 18) / 116]
18350080
17825792
18350080
17825792:=============================================>         (97 + 18) / 116]
18350080
17825792:==============================================>        (98 + 18) / 116]
18350080
17825792

After I change the --master from local[18] to local[2], same peak memory

liujiayi771 commented 3 months ago

@jinchengchenghh I will test the latest code.

FelixYBW commented 3 months ago

@jinchengchenghh can you print in the record batch construction and destruction function to confirm? there should be only 1 record batch alive, no more than 3.

liujiayi771 commented 2 months ago

@jinchengchenghh Have you checked the size of a single CSV file?

jinchengchenghh commented 2 months ago

I assume you use a middle commit of csv reader, there is redundant colVector.retain() in function ArrowUtil.loadBatch() in a middle version not the merged version, it may cause the vector does not release even if the column batch close. I delete colVector.retain for another issue, not sure if it is the root cause of this issue .@liujiayi771

jinchengchenghh commented 2 months ago

The printed information is each time we request memory from arrow memory pool, not the recordbatch. The batch consists of ArrowWritableColumnVector in java side, it use ArrowArray to bridge to C++ side, and then convert to Velox Vector, release it immediately. @FelixYBW

liujiayi771 commented 2 months ago

@jinchengchenghh I will test the latest code today.

liujiayi771 commented 2 months ago

@jinchengchenghh I tested the latest code, and the peak memory usage is still relatively high. I did not add logs in ArrowReservationListener.reserve. Printing logs there did not output anything in my case. I added two methods in ArrowReservationListener, and printed peak and current in ArrowNativeMemoryPool.release.

public long peak() {
  return sharedUsage.peak();
}

public long current() {
  return sharedUsage.current();
}
@Override
public void release() throws Exception {
  System.out.println("peak=" + listener.peak() + ", current=" + listener.current());
  if (arrowPool.getBytesAllocated() != 0) {
    LOGGER.warn(
        String.format(
            "Arrow pool still reserved non-zero bytes, "
                + "which may cause memory leak, size: %s. ",
            Utils.bytesToString(arrowPool.getBytesAllocated())));
  }
  arrowPool.close();
}

I created a Parquet table and used spark-sql --local to read the data from a CSV table to insert overwrite into the Parquet table. My dataset is a 100GB TPC-DS. I first tested the store_sales table, where each CSV file is 700MB in size. The log output is as follows, the peak memory is about 920MB:

peak=964689920, current=8388608
peak=964689920, current=8388608
peak=964689920, current=8388608
peak=964689920, current=8388608
peak=964689920, current=8388608
peak=956301312, current=8388608

I continued testing the catalog_sales table, where each CSV file is 1.15GB in size. The log output is as follows, the peak memory is about 1064MB:

peak=1124073472, current=8388608
peak=1140850688, current=8388608
peak=1115684864, current=8388608
peak=1124073472, current=8388608
peak=1149239296, current=8388608
peak=1115684864, current=8388608

I constructed a larger catalog_sales table with a single 30GB CSV file. The log output is as follows, the peak memory is about 6GB:

peak=6601834496, current=8388608

The peak memory logs that I printed should only be used by the CSV reader. But this issue is not that urgent for me at the moment. After splitting the large CSV file into smaller files, it still works normally.

jinchengchenghh commented 2 months ago

I think it is because arrow does not support to add file start and length to split a file, so it's peak memory is high for a very big CSV file.

FelixYBW commented 2 months ago

Do you mean arrow csv doesn't support split? each partition must have one or more csv files, instead of part of a large csv file.

jinchengchenghh commented 2 months ago

Yes.

jinchengchenghh commented 2 months ago

Arrow is easy to support file offset and length, we just need to use RandomAccessFile to generate InputStream. FileSource class constructor is

  using CustomOpen = std::function<Result<std::shared_ptr<io::RandomAccessFile>>()>;

  FileSource(std::shared_ptr<io::RandomAccessFile> file, int64_t size,
             Compression::type compression = Compression::UNCOMPRESSED)
      : custom_open_([=] { return ToResult(file); }),
        custom_size_(size),
        compression_(compression) {}
  static Result<std::shared_ptr<InputStream>> GetStream(
      std::shared_ptr<RandomAccessFile> file, int64_t file_offset, int64_t nbytes);

https://github.com/apache/arrow/blob/main/cpp/src/arrow/dataset/file_base.cc#L110

I can help implement it on demand.

FelixYBW commented 2 months ago

Arrow is easy to support file offset and length, we just need to use RandomAccessFile to generate InputStream. FileSource class constructor is

  using CustomOpen = std::function<Result<std::shared_ptr<io::RandomAccessFile>>()>;

  FileSource(std::shared_ptr<io::RandomAccessFile> file, int64_t size,
             Compression::type compression = Compression::UNCOMPRESSED)
      : custom_open_([=] { return ToResult(file); }),
        custom_size_(size),
        compression_(compression) {}
  static Result<std::shared_ptr<InputStream>> GetStream(
      std::shared_ptr<RandomAccessFile> file, int64_t file_offset, int64_t nbytes);

https://github.com/apache/arrow/blob/main/cpp/src/arrow/dataset/file_base.cc#L110

I can help implement it on demand.

Thank you, Chengcheng. Let's hold on until we get requests

liujiayi771 commented 2 months ago

@jinchengchenghh Spark will split a single CSV file into multiple partitions for reading. We need to pass start and length to Arrow. I have currently resolved this issue through some hacks; otherwise, it would cause the same CSV file to be read multiple times.

FelixYBW commented 2 months ago

@jinchengchenghh Spark will split a single CSV file into multiple partitions for reading. We need to pass start and length to Arrow. I have currently resolved this issue through some hacks; otherwise, it would cause the same CSV file to be read multiple times.

@jinchengchenghh Do we pass csv file multiple times to arrow if they are split by Spark? @zhztheplayer how do we solve issue in Gazelle?

jinchengchenghh commented 2 months ago

I mark this format as spiltable false, so it should not split. https://github.com/apache/incubator-gluten/blob/main/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala#L60