Open jlowe opened 2 weeks ago
First suspect was the Parquet chunked batch reader, but when the query was re-run with spark.rapids.sql.reader.chunked=false the extra batches in the scan persisted.
At this point I think it is likely something to do with the different types of readers. This is happening in the cloud and it looks like the multi-threaded combining reader or just the multi-threaded reader is the cause of this.
This is definitely the multithreaded coalesing reader, as @tgravescs confirmed. There are two configs that relate to this, see https://github.com/NVIDIA/spark-rapids/blob/branch-24.12/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala#L1119-L1139. I suspect the code is not considering the fact that other data could show up while waiting for the GPU semaphore and thus be more efficient with what it has at the time it finally wakes up with the semaphore held.
While investigating some NDS queries, noticed that sometimes Parquet scans are producing more batches than required for the target batch size. For example, this screenshot of a query plan snippet from query23a running on Dataproc shows a Parquet scan producing 447 batches, averaging over two batches per task, followed by a filter that removes no rows, and then a coalesce that reduces the batch number from 447 to 156. That implies many tasks were producing more batches during the scan than necessary, and it's likely we would get sub-linear scaling if we processed the Parquet data in one-shot rather than many.