trinodb / trino

Official repository of Trino, the distributed SQL query engine for big data, formerly known as PrestoSQL (https://trino.io)
https://trino.io
Apache License 2.0
9.83k stars 2.85k forks source link

High CPU usage when reading parquet with wide schema #22434

Open davseitsev opened 1 week ago

davseitsev commented 1 week ago

We have a use case when we store big structures to iceberg tables. Recently we have found a case where a structure has 45K+ nested fields totally. Simple select queries fail with Exceeded CPU limit of 48.00h. For example:

select * from events.trms.trm_crud
  where (
     laid='3f263312-22f0-22eb-868e-ee4cccb49114'
  or taid='3f263312-22f0-22eb-868e-ee4cccb49114'
  or luid='3f263312-22f0-22eb-868e-ee4cccb49114'
  )
  and ts >= from_unixtime(1672531200)
  and ts <  from_unixtime(1718235603)
order by ts;

Select count() with the same filters return 0 records, which means data filtering consume all the CPU.

image

We ran CPU profiler on one of the workers and it showed that all the CPU is consumed by IcebergPageSourceProvider.createParquetPageSource method.

image

There 2 loops which take all the CPU time. In the first one we try to find ColumnChunkMetaData by iterating through all the columns: https://github.com/trinodb/trino/blob/ad97087b3939818e33511712b48b05419f4463cb/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java#L496-L500

In the second one we iterate over all the columns to find column index: https://github.com/trinodb/trino/blob/ad97087b3939818e33511712b48b05419f4463cb/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetTypeUtils.java#L134-L151

As we do it for all the primitive fields, totally we call equals/equalsIgnoreCase more than 2B times. And it happens at least for each file (there are 1400 files in the table).

We are discussing changing schema of this table to use JSON. But anyway maybe it worth improving for smaller schemas.

Trino version: 444

wendigo commented 1 week ago

cc @raunaqmorarka

raunaqmorarka commented 1 week ago

@davseitsev could you try benchmarking your case with https://github.com/trinodb/trino/pull/22451 ? There could be further improvements made, but I wanted to see if those changes are sufficient for your scenario.

davseitsev commented 1 week ago

Sure I will check it

davseitsev commented 1 week ago

Now it's a little bit better.

image

In comparison to previous results I would say it's about 1.35x times better.

The flame graph looks similar:

image
raunaqmorarka commented 1 week ago

That still looks strange because I removed the usage of ColumnPath#get from getColumnChunkMetaData, but it's still somehow showing in your flamegraph.

davseitsev commented 1 week ago

I will check it

davseitsev commented 1 week ago

Ok, it's my bad, I didn't deploy the change properly.

image image
raunaqmorarka commented 1 week ago

Okay, so we have improved significantly with the new changes but this needs further improvement still. Would you be able to share a sample file, table definition and test query with me so that I can assess potential optimizations more easily ?

raunaqmorarka commented 5 days ago

@davseitsev I've updated my PR with some more changes, I think the changes should help some more. Can you try out the updated PR as well ?

davseitsev commented 4 days ago

Sure I will test it and post the results today. Also I'm trying to write some unit test with our wide schema which can be used to estimate the changes.

davseitsev commented 4 days ago

Here is the result with the latest changes:

image image
raunaqmorarka commented 4 days ago

Thanks @davseitsev We've essentially removed the bottleneck from getDescriptors call, but getColumnChunkMetaData still needs improvement. I will look for ways to address that in a different PR.

davseitsev commented 3 days ago

Here is the test you can use to understand the use case and profile it locally TestIcebergWideSchemaRead.java.txt And there is table schema which is used in the test to create test table and generate test data: create_table_no_formatting.txt

Test flame graph look similar to production one:

image

If you have new changes I can test it on real data to compare total CPU time to previous results.

BTW I just thought, what about generating HashMap index here: https://github.com/trinodb/trino/blob/ad97087b3939818e33511712b48b05419f4463cb/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java#L180-L184 for all the columns in block, something like:

 Map<ArrayWrapper, ColumnChunkMetaData> blocks = metadata.getColumns().stream().collect(Collectors.toMap(
        column -> new ArrayWrapper(column.getPath().toArray()),
        column -> column
));

And obtain necessary ColumnChunkMetaData from the map to avoid iterating over all the columns again and again. Can it cause any side effects?

raunaqmorarka commented 23 hours ago

@davseitsev PTAL at https://github.com/trinodb/trino/pull/22538 . I think this should resolve the remaining bottlenecks as well. Please try it out and let me know.