apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.44k stars 2.23k forks source link

The ColumnarToRow Spark optimization is not applied when using nested fields from an Iceberg table #10828

Open cccs-jc opened 3 months ago

cccs-jc commented 3 months ago

Feature Request / Improvement

Prior to the introduction of the ColumnarToRow in Spark 3.0.0, columnar data was converted into Spark's internal rows using generated code that copies data from ColumnBatch vectors to an internal row.

In Spark 3.0.0 and optimization was introduce which provides a row iterator which retrieves values directly from the ColumnBatch vectors thus avoiding a copy.

This optimization is not used when reading from Iceberg tables, specifically when filters are applied to nested structures. When reading using a "spark table" then the optimization is applied. When using Iceberg the optimization is applied. However, it is not applied if you put a filter on a sub-field. The code below reproduces the issue.

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import time

spark = (SparkSession
    .builder
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.local.warehouse", "file:///tmp/iceberg")
    .config("spark.sql.catalog.local.type", "hadoop")
    .getOrCreate()
    )

spark.sql("drop table if exists local.jcc.test")

spark.sql("""
create table local.jcc.test (
    id long,
    src_start_num long,
    source struct<sub1 string, start_num long> )
using iceberg
""")

# place the value of src_start_num inside a struct
spark.sql("""
insert into table local.jcc.test
select
    id,
    src_start_num,
    named_struct(
        'sub1', uuid(),
        'start_num', src_start_num
    ) as source
from (
    select
        id,
        floor(rand() * 100000000) as src_start_num
    from
        range(1, 1000000000)
)
""")
print("------ warm up ------")
df = spark.read.parquet("file:///tmp/iceberg/jcc/test/data")
df.count()
df = spark.read.format("iceberg").load("local.jcc.test")
df.count()

print("--------- parquet root filter, explain plan contains a ColumnarToRow ----------")
start = time.time()
spark.read.parquet("file:///tmp/iceberg/jcc/test/data")
df = df.select("id").where("src_start_num = 300000")
df.write.format("noop").mode("append").save("/dev/null")
end = time.time()
print(f"took: {end-start} seconds")
df.explain(mode="formatted")

print("--------- parquet sub filter, explain plan contains a ColumnarToRow ----------")
start = time.time()
df = spark.read.parquet("file:///tmp/iceberg/jcc/test/data")
df = df.select("id").where("source.start_num = 300000")
df.write.format("noop").mode("append").save("/dev/null")
end = time.time()
print(f"took: {end-start} seconds")
df.explain(mode="formatted")

print("--------- iceberg root filter, explain plan contains a ColumnarToRow ----------")
start = time.time()
df = spark.read.format("iceberg").load("local.jcc.test")
df = df.select("id").where("src_start_num = 300000")
df.write.format("noop").mode("append").save("/dev/null")
end = time.time()
print(f"took: {end-start} seconds")
df.explain(mode="formatted")

print("--------- iceberg sub filter, explain plan DOES NOT contain ColumnarToRow ----------")
start = time.time()
df = spark.read.format("iceberg").load("local.jcc.test")
df = df.select("id").where("source.start_num = 300000")
df.write.format("noop").mode("append").save("/dev/null")
end = time.time()
print(f"took: {end-start} seconds")
df.explain(mode="formatted")
------ warm up ------
--------- parquet root filter ----------
took: 22.319010496139526 seconds
== Physical Plan ==
* Project (4)
+- * Filter (3)
   +- * ColumnarToRow (2)
      +- Scan parquet  (1)

(1) Scan parquet 
Output [2]: [id#37L, src_start_num#38L]
Batched: true
Location: InMemoryFileIndex [file:/tmp/iceberg/jcc/test/data]
PushedFilters: [IsNotNull(src_start_num), EqualTo(src_start_num,300000)]
ReadSchema: struct<id:bigint,src_start_num:bigint>

(2) ColumnarToRow [codegen id : 1]
Input [2]: [id#37L, src_start_num#38L]

(3) Filter [codegen id : 1]
Input [2]: [id#37L, src_start_num#38L]
Condition : (isnotnull(src_start_num#38L) AND (src_start_num#38L = 300000))

(4) Project [codegen id : 1]
Output [1]: [id#37L]
Input [2]: [id#37L, src_start_num#38L]

--------- parquet sub filter ----------
took: 23.818222522735596 seconds
== Physical Plan ==
* Project (4)
+- * Filter (3)
   +- * ColumnarToRow (2)
      +- Scan parquet  (1)

(1) Scan parquet 
Output [2]: [id#46L, source#48]
Batched: true
Location: InMemoryFileIndex [file:/tmp/iceberg/jcc/test/data]
PushedFilters: [IsNotNull(source.start_num), EqualTo(source.start_num,300000)]
ReadSchema: struct<id:bigint,source:struct<start_num:bigint>>

(2) ColumnarToRow [codegen id : 1]
Input [2]: [id#46L, source#48]

(3) Filter [codegen id : 1]
Input [2]: [id#46L, source#48]
Condition : (isnotnull(source#48.start_num) AND (source#48.start_num = 300000))

(4) Project [codegen id : 1]
Output [1]: [id#46L]
Input [2]: [id#46L, source#48]

--------- iceberg root filter ----------
took: 24.27478837966919 seconds
== Physical Plan ==
* Project (4)
+- * Filter (3)
   +- * ColumnarToRow (2)
      +- BatchScan local.jcc.test (1)

(1) BatchScan local.jcc.test
Output [2]: [id#62L, src_start_num#63L]
local.jcc.test (branch=null) [filters=src_start_num IS NOT NULL, src_start_num = 300000, groupedBy=]

(2) ColumnarToRow [codegen id : 1]
Input [2]: [id#62L, src_start_num#63L]

(3) Filter [codegen id : 1]
Input [2]: [id#62L, src_start_num#63L]
Condition : (isnotnull(src_start_num#63L) AND (src_start_num#63L = 300000))

(4) Project [codegen id : 1]
Output [1]: [id#62L]
Input [2]: [id#62L, src_start_num#63L]

--------- iceberg sub filter ----------
took: 75.11750912666321 seconds
== Physical Plan ==
* Project (3)
+- * Filter (2)
   +- BatchScan local.jcc.test (1)

(1) BatchScan local.jcc.test
Output [2]: [id#79L, source#81]
local.jcc.test (branch=null) [filters=source.start_num IS NOT NULL, source.start_num = 300000, groupedBy=]

(2) Filter [codegen id : 1]
Input [2]: [id#79L, source#81]
Condition : (isnotnull(source#81.start_num) AND (source#81.start_num = 300000))

(3) Project [codegen id : 1]
Output [1]: [id#79L]
Input [2]: [id#79L, source#81]

The query "iceberg sub filter" does not use the ColumnarToRow optimization and it takes much longer to execute.

took: 75.11750912666321 seconds == Physical Plan ==

I'm using Spark 3.5.0 and Iceberg 1.5

Query engine

Spark

amogh-jahagirdar commented 3 months ago

Thanks for the repro steps, I did some debugging with it and it seems this has to do with vectorized reads not being supported for nested fields. https://github.com/apache/iceberg/blob/main/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java#L154 if you trace from here you'll see that if there's a nested type, we don't perform a vectorized read and we'll end up surfacing to spark that columnar execution is not supported etc.

It certainly seems like this is an area for improvement, although I don't recall the context as to why it's not supported today.

RussellSpitzer commented 3 months ago

It isn't supported because we basically copied the Spark Vectorized read code and at the time we copied it nested structs weren't supported.

cccs-jc commented 3 months ago

ha okay, so there is probably no show stopper in replicating what has been done in the Spark Vectorized read since then.

Is there plans to update the Iceberg SparkBatch soon?

cccs-jc commented 2 months ago

@RussellSpitzer Is this something scheduled to be implemented in the next version of Iceberg ?