influxdata / influxdb

Scalable datastore for metrics, events, and real-time analytics
https://influxdata.com
Apache License 2.0
28.16k stars 3.51k forks source link

Investigate performance querying Parquet files #25067

Open pauldix opened 1 week ago

pauldix commented 1 week ago

I noticed that querying a small number of Parquet files takes longer than I would expect. I think it's worth looking into where the time is being spent and building some tooling around doing profiling.

To reproduce, run the server with 1m segment durations (--segment-duration=1m). Then run the write side of the load generator on the one_mil with 10 writers (--builtin-spec=one_mil --w=10). This will write 100k rows/sec and will produce Parquet files in the segments with 6M rows. Run the generator for about 10 minutes to let it create a number of Parquet files, then stop the generator and let the sever run until it has persisted everything to disk.

Run queries that look for a single series in a 1m window of time. Also run queries that look for the entire full 10 minutes of time the generator was running. It might also be worth trying this with only a single full minute of persisted data (i.e. one Parquet file, only one persisted segment) to see how that compares.

You can use tracing or profiling or both. Add whatever we need to get some detailed instrumentation about where it's spending its time. Write up a PROFILING.md doc in the repo detailing how to run tests locally and gather profiling information for future users & developers.

hiltontj commented 1 week ago

I am still working with profiling and getting a hang of the tools / tricks around that, but thought I would share some of the things I tried/found earlier today.

Query Latency

I ran the load generator to write as described in the issue, then performed two query loads once all data was persisted (both with --querier-count=1), against influxdb3 servers compiled using the release and quick-bench1 profiles, respectively.

Query Latency (ms)

The query run was:

SELECT COUNT(*) FROM measurement_data
WHERE series_id = $sid
AND time < '2024-06-18T13:07:00.00'
AND time > '2024-06-18T13:06:00.00'

The series_id column is the only tag and a parameter ($sid) was used here to inject a random value. So, the query latency sits around 350ms for the release build and degrades to ~370ms for the quick-bench build (5% degradation).

@pauldix - is that similar to what you were seeing in terms of latency?

EXPLAIN ANALYZE

I ran the above query with EXPLAIN ANALYZE to see metrics on the different parts of the query plan. All parts except for ParquetExec have elapsed_compute < 30 µs. Here is the plan:

AggregateExec: mode=Final, gby=[], aggr=[COUNT(*)], metrics=[output_rows=1, elapsed_compute=24.792µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   
  CoalescePartitionsExec, metrics=[output_rows=14, elapsed_compute=7.208µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             
    AggregateExec: mode=Partial, gby=[], aggr=[COUNT(*)], metrics=[output_rows=14, elapsed_compute=22.459µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            
      ProjectionExec: expr=[], metrics=[output_rows=6, elapsed_compute=472ns]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           
        CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=6, elapsed_compute=1.198µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   
          FilterExec: series_id@0 = series-number-10 AND time@1 < 1718716020000000000 AND time@1 > 1718715960000000000, metrics=[output_rows=6, elapsed_compute=27.43µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                
            CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=6, elapsed_compute=5.558µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               
              FilterExec: series_id@0 = series-number-10 AND time@1 < 1718716020000000000 AND time@1 > 1718715960000000000, metrics=[output_rows=6, elapsed_compute=29.971µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           
                ParquetExec: file_groups={14 groups: [[...]}, projection=[series_id, time], predicate=series_id@20 = series-number-10 AND time@21 < 1718716020000000000 AND time@21 > 1718715960000000000, pruning_predicate=CASE WHEN series_id_null_count@2 = series_id_row_count@3 THEN false ELSE series_id_min@0 <= series-number-10 AND series-number-10 <= series_id_max@1 END AND CASE WHEN time_null_count@5 = time_row_count@6 THEN false ELSE time_min@4 < 1718716020000000000 END AND CASE WHEN time_null_count@5 = time_row_count@6 THEN false ELSE time_max@7 > 1718715960000000000 END, required_guarantees=[series_id in (series-number-10)], metrics=[
    output_rows=6,
    elapsed_compute=14ns,
    row_groups_pruned_bloom_filter=0,
    row_groups_pruned_statistics=60,
    pushdown_rows_filtered=264186,
    row_groups_matched_bloom_filter=0,
    predicate_evaluation_errors=0,
    num_predicate_creation_errors=0,
    bytes_scanned=775190,
    page_index_rows_filtered=784384,
    file_scan_errors=0,
    file_open_errors=0,
    row_groups_matched_statistics=1,
    time_elapsed_processing=343.043747ms,
    time_elapsed_scanning_total=7.47275ms,
    time_elapsed_opening=4.19587217s,
    time_elapsed_scanning_until_data=7.435623ms,
    page_index_eval_time=56.9µs,
    pushdown_eval_time=1.013507ms
]

I also tried the same query with different conditions placed on time, i.e., to query across the entire 10 minute span of generated data. The ParquetExec's time_elapsed_opening2 metric is always around 4s. This is CPU time, so is split across cores, but given this number is always about the same, I suspect it is opening all files on each query, regardless of the bounds placed on time. time_elapsed_processing3 is also around 350ms for both time bounds.

My Mac has 14 cores and 4.19 seconds / 14 is ~300ms, which, combined with other elapsed times in the plan should get pretty close to making up the ~350ms query latencies seen above.

1 - quick-bench is the same as the existing quick-release profile, but with debuginfo turned on, I will open a PR with this. 2 - time_elapsed_openeing refers to:

Wall clock time elapsed for file opening. Time between when FileOpener::open is called and when the FileStream receives a stream for reading.

3 - time_elapsed_processing refers to:

Wall clock time elapsed for data decompression + decoding. Time spent waiting for the FileStream's input.

pauldix commented 1 week ago

Yeah, that's around what I was seeing, I think. I'm curious how it looks if you remove the segment info files for all but one segment and then restart the server and test again. Then it would only have a single Parquet file to open.

It definitely opens all Parquet files as we don't have a pre-filter step. It likely opens them up to at least parse the metadata to determine which files can be pruned. So testing with just a single file would be more interesting.

hiltontj commented 1 week ago

@pauldix ripped out all the segments but one, so that there was a single, full parquet file with 6M rows.

Query Latency

Latency was ~220ms. Surprisingly bad, but that is down ~40% from the ~370 ms when there were 10 files.

image

EXPLAIN ANALYZE

The analyzed query plan also still has surprisingly large numbers for the time_elapsed_opening metric in the ParquetExec, which is also down ~40% from the previous run.

AggregateExec: mode=Final, gby=[], aggr=[COUNT(*)], metrics=[output_rows=1, elapsed_compute=14.543µs]
  CoalescePartitionsExec, metrics=[output_rows=14, elapsed_compute=7µs]
    AggregateExec: mode=Partial, gby=[], aggr=[COUNT(*)], metrics=[output_rows=14, elapsed_compute=23.916µs]
      ProjectionExec: expr=[], metrics=[output_rows=6, elapsed_compute=430ns]
        CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=6, elapsed_compute=805ns]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      
          FilterExec: series_id@0 = series-number-10, metrics=[output_rows=6, elapsed_compute=26.138µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  
            CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=6, elapsed_compute=8.336µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                
              FilterExec: series_id@0 = series-number-10, metrics=[output_rows=6, elapsed_compute=30.263µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              
                ParquetExec: file_groups={14 groups: [[dbs/load_test/measurement_data/2024-06-18T13-04/4294967292.parquet:0..28876795], [dbs/load_test/measurement_data/2024-06-18T13-04/4294967292.parquet:28876795..57753590], [dbs/load_test/measurement_data/2024-06-18T13-04/4294967292.parquet:57753590..86630385], [dbs/load_test/measurement_data/2024-06-18T13-04/4294967292.parquet:86630385..115507180], [dbs/load_test/measurement_data/2024-06-18T13-04/4294967292.parquet:115507180..144383975], ...]}, projection=[series_id], predicate=series_id@20 = series-number-10, pruning_predicate=CASE WHEN series_id_null_count@2 = series_id_row_count@3 THEN false ELSE series_id_min@0 <= series-number-10 AND series-number-10 <= series_id_max@1 END, required_guarantees=[series_id in (series-number-10)],
metrics=[
output_rows=6,
elapsed_compute=14ns,
row_groups_pruned_statistics=5,
bytes_scanned=491843,
file_scan_errors=0,
num_predicate_creation_errors=0,
row_groups_pruned_bloom_filter=0,
row_groups_matched_bloom_filter=0,
pushdown_rows_filtered=264186,
page_index_rows_filtered=784384,
predicate_evaluation_errors=0,
row_groups_matched_statistics=1,
file_open_errors=0,
time_elapsed_scanning_until_data=6.414416ms,
pushdown_eval_time=1.17086ms,
time_elapsed_opening=2.552581209s,
page_index_eval_time=19.556µs,
time_elapsed_scanning_total=6.449458ms,
time_elapsed_processing=131.954461ms
]