prestodb / presto

The official home of the Presto distributed SQL query engine for big data
http://prestodb.io
Apache License 2.0
15.74k stars 5.28k forks source link

Writer scaling fails for Parquet with smaller files #23063

Open ZacBlanco opened 6 days ago

ZacBlanco commented 6 days ago

Your Environment

Expected Behavior

INSERT statements with large amounts of data being written into parquet format should properly scale writers when scale-writers=true is set no matter the size of the files or format.

Current Behavior

We have observed in internal benchmarking that depending on the data source and writer options, that there are some cases where the Parquet writer does not properly scale out to make use of the entire cluster. This occurs specifically with the parquet writer is writing to partitioned tables.

We discovered this issue when comparing hardware metrics while running some INSERTS on different formats and datasets. i.e. the following two charts are from writing to the tpcds.sf100.inventory table where one table is partitioned and the other is not.

Non-partitioned Insert

Partitioned Insert

The big difference between these two charts shows that in the partitioned insert, only one worker is utilized, indicated by the large amount of network RX compared to every other node in the cluster. This has been traced to being an issue with the way that ScaledWriterScheduler works in conjunction with the ParquetWriter.

The ScaledWriterScheduler increases the number of writers based on the condition that there are full tasks and that the amount of data written per node meets a minimum threshold. What we see is that when using the Parquet writer that the amount of written data to the nodes is almost always 0. When it is greater than 0, the fullTasks threshold never seems to be met.

The reason this occurs is that the ParquetWriter caches data in memory before flushing it to disk. Only once data is flushed to disk are the written bytes counted. If you are working on a partitioned table where the file(s) in each partition are less than the writer's buffer then the bytes written isn't reported until the writer is closed. At this point when the writer is closed, a new tasks becomes available. Hence the writes never scale out to additional workers because every time a worker finished writing a file, only then are the bytes written being is reported.

Possible Solution

I see a few solutions here:

  1. Change the Parquet writer to report written bytes that are not flushed to disk yet
  2. Add a config to change the Parquet writer's buffer size so we can set it to a smaller value for specific queries
  3. Introduce or use a separate metric for write scaling? We already have optimized-scale-writer-producer-buffer but that did not seem to help scale out writers for this case either.

Steps to Reproduce

This can be reproduced on a laptop using the HiveQueryRunner

  1. Run an instance of MariaDB and the HMS
  2. Start the HiveQueryRunner with scale-writers=true and hive.parquet-optimized-writer-enabled=true
  3. Create three tables: an unpartitioned, a partitioned ORC, and a partitioned Parquet.
CREATE TABLE IF NOT EXISTS inventory (
  inv_item_sk INT, -- identifier not null primary key foreign key i_item_sk
  inv_warehouse_sk INT, -- identifier not null primary key foreign key w_warehouse_sk
  inv_quantity_on_hand INT,
  inv_date_sk INT) -- identifier not null primary key foreign key d_date_sk
WITH (
  format = 'PARQUET',
  partitioned_by = array['inv_date_sk']
);
CREATE TABLE IF NOT EXISTS inventory_orc (
  inv_item_sk INT, -- identifier not null primary key foreign key i_item_sk
  inv_warehouse_sk INT, -- identifier not null primary key foreign key w_warehouse_sk
  inv_quantity_on_hand INT,
  inv_date_sk INT) -- identifier not null primary key foreign key d_date_sk
WITH (
  format = 'ORC',
  partitioned_by = array['inv_date_sk']
);
CREATE TABLE IF NOT EXISTS inventory_unp (
  inv_item_sk INT, -- identifier not null primary key foreign key i_item_sk
  inv_warehouse_sk INT, -- identifier not null primary key foreign key w_warehouse_sk
  inv_quantity_on_hand INT,
  inv_date_sk INT) -- identifier not null primary key foreign key d_date_sk
WITH (
  format = 'PARQUET',
);
  1. Run an INSERT from the tpcds connector:
    INSERT INTO <table>
    SELECT
    cast(inv_item_sk as INT), -- identifier not null primary key foreign key i_item_sk
    cast(inv_warehouse_sk as INT), -- identifier not null primary key foreign key w_warehouse_sk
    cast(inv_quantity_on_hand as INT),
    cast(inv_date_sk as INT) FROM tpcds.sf1000.inventory;
  2. The write performance to the partitioned parquet table will be much lower than the ORC or unpartitioned table.

Context

It has prevented us from generating larger benchmark datasets using only Presto

ZacBlanco commented 6 days ago

cc: @aaneja @imjalpreet

yingsu00 commented 6 days ago

@ZacBlanco Nice findings. Do you know why ORC partitioned table doesn't have this problem? When does the ORC writer update the stats?

ZacBlanco commented 6 days ago

I think it's just that the Parquet default row group size is larger than the ORC one, and that causes issues for this particular query/dataset

With Parquet, we don't flush to disk until hitting the row group limit within the writer. By default it is set to 128MB.

With ORC, I believe there are multiple different limits which are configurable and default to values lower than the Parquet ones. The default max stripe size for ORC is 64MB: https://github.com/prestodb/presto/blob/50084588fa98ab9b9dbb98485247ea37453c419c/presto-orc/src/main/java/com/facebook/presto/orc/DefaultOrcWriterFlushPolicy.java#L50-L61 (see top of this file for the defaults)

So if you were to run an insert on both ORC and Parquet where files in each partition were below the maximum stripe size (say, 64MB for the default size for ORC), both ORC and Parquet would probably have this issue as they both use the same type of output stream to retrieve the written byte count

yingsu00 commented 6 days ago

I saw that in OrcFileWriter

    @Override
    public long getWrittenBytes()
    {
        return orcWriter.getWrittenBytes() + orcWriter.getBufferedBytes();
    }

But in ParquetFileWriter

@Override
    public long getWrittenBytes()
    {
        return parquetWriter.getWrittenBytes();
    }

and ParquetWriter

    public long getWrittenBytes()
    {
        return outputStream.longSize();
    }

    public long getBufferedBytes()
    {
        return columnWriters.stream().mapToLong(ColumnWriter::getBufferedBytes).sum();
    }
imjalpreet commented 6 days ago

+1, this is one difference I also observed.