apache / drill

Apache Drill is a distributed MPP query layer for self describing data
https://drill.apache.org/
Apache License 2.0
1.93k stars 980 forks source link

dir/*.parquet UNION ALL dir/*.json reading slow #2814

Closed pandalanax closed 1 year ago

pandalanax commented 1 year ago

Describe the bug We have a 20GB parquet file which we save in HDFS and regenerate monthly at the moment. The file is stored in 128MB parts so ~160 .parquet files. This is called our history_data We UNION these files with .json files which have newer data. Example given:

Parquet file ranges from 2020-01-31 up to 2023-06-29. Json files ranges from 2023-07-01 up to current date.

View Creation:

CREATE VIEW sensor_data AS (
  SELECT
    `timestamp`,
    measurement_count,
    version,
    serial_number,
    co2_ppm,
    temperature_celsius,
    relative_humidity_percent,
    from dfs.storage.history_data
UNION
  SELECT
    CAST(d AS INT) AS `timestamp`,
    CAST(m AS INT) AS measurement_count,
    CAST(v AS VARCHAR) AS version, 
    CAST(s AS VARCHAR) AS serial_number,
    CAST(c AS INT) AS co2_ppm,
    CAST(t AS DOUBLE) AS temperature_celsius,
    CAST(r AS DOUBLE) AS relative_humidity_percent,
  FROM dfs.storage.`raw_monthly/2023-07/*.json`)

When issuing a query like:

SELECT   * FROM  dfs.storage.sensor_data order by `timestamp` limit 10

we observed that the Major Fragment 03-xx-xx containing:

only acts with MIN(# parquet files, # json files) threads/processes. E.g.: 160 parquet files & 9 Json files == 9/9 Minor Fragments Reporting (super slow) 160 parquet files & 160 Json files == 160/160 Minor Fragments Reporting (fastest) 160 parquet files & 320 Json files == 160/160 Minor Fragments Reporting (also ok fast)

Is there a config for this as right now we are unable to read with a higher number than the MIN(# parquet files, # json files). I also tried to SELECT * FROM JSON UNION ALL SELECT * FROM PARQUET (reverse) but with no effect.

Expected behavior Thoughts: In our setup MIN(# parquet files, # json files) fails to deliver performance because the json files are < 110MB, which is ok reading, but the same threads are now reading a 20GB file which takes very long. Instead of determining the Fragments via # of files, would it make more sense to look at the actual blocks occupied for the files and go from there? Especially when trying to fit 1 file in 1 hdfs block size.

Drill version 1.17.0

Additional context 4 Drillbits. ALTER SESSION SET planner.width.max_per_query = 400 ALTER SESSION SET planner.width.max_per_node = 100

160/160 Minor Fragments Reporting image

9/9 Minor Fragments Reporting image

pandalanax commented 1 year ago

to summarize: If planner.width.max_per_query > $(actual # of files for one part of union)

the planner.width.max_per_query shrinks to $(actual # of files for one part of union). This could make sense now that i think about it as there is only an amount of parallel reading possible per file.

However: the files are replicated throughout the HDFS Cluster (also 4 nodes, replication is 4). In my understanding it should be possible to read one file via >= 1 thread. Am i wrong here?

Another idea: Is it maybe possible to split JSON_SUB_SCAN and PARQUET_ROW_GROUP_SCAN into own Major fragments when doing UNION ALL? (same with KAFKA_SUB_SCAN btw)

cgivre commented 1 year ago

Hi @pandalanax Can you please try with an updated version of Drill to see if this issue still exists? Drill 1.17 is several years old at this point and there have been MANY improvements to it since then.

One more thing, have you tried using the Drill metastore? That might help in query planning.

pandalanax commented 1 year ago

We created a workaround for the time being while we are upgrading the Drill version in our cluster.
Workaround: create n empty json files via

for n in {1..160}; do hdfs dfs -touch /path/to/json/files/tmp${n}.json; done

with n then matching the number of parquet files. Will close for now and reopen if the issue persists with 1.21. Will also try out Drill metastore, thanks for the suggestion!