NVIDIA / spark-rapids-tools

User tools for Spark RAPIDS
Apache License 2.0
56 stars 38 forks source link

[BUG] Include Additional Metadata Fields in PhotonScan Node Parsing #1385

Open parthosa opened 1 month ago

parthosa commented 1 month ago

Photon Scan nodes contain additional metadata fields not present in Spark Scan nodes, such as RequiredDataFilters and DictionaryFilters.

Example of a complete JSON representation of a PhotonScan node:

  "nodeName" : "PhotonScan parquet ",
  "simpleString" : "PhotonScan parquet [ss_sold_time_sk#806,ss_hdemo_sk#810,ss_store_sk#812,ss_sold_date_sk#828] DataFilters: [isnotnull(ss_hdemo_sk#810), isnotnull(ss_sold_time_sk#806), isnotnull(ss_store_sk#812)], DictionaryFilters: [], Format: parquet, Location: InMemoryFileIndex(1 paths)[s3://ndsv2-data/parquet_sf3000/store_sales], PartitionFilters: [], ReadSchema: struct<ss_sold_time_sk:int,ss_hdemo_sk:int,ss_store_sk:int>, RequiredDataFilters: [isnotnull(ss_hdemo_sk#810), isnotnull(ss_sold_time_sk#806), isnotnull(ss_store_sk#812)]",
  "metadata" : {
    "Location" : "InMemoryFileIndex(1 paths)[s3://*****/parquet_sf3000/store_sales]",
    "ReadSchema" : "struct<ss_sold_time_sk:int,ss_hdemo_sk:int,ss_store_sk:int>",
    "Format" : "parquet",
    "RequiredDataFilters" : "[isnotnull(ss_hdemo_sk#810), isnotnull(ss_sold_time_sk#806), isnotnull(ss_store_sk#812)]",
    "DictionaryFilters" : "[]",
    "PartitionFilters" : "[]",
    "DataFilters" : "[isnotnull(ss_hdemo_sk#810), isnotnull(ss_sold_time_sk#806), isnotnull(ss_store_sk#812)]"
  },

We store this metadata in data_source_information.csv and should ensure these additional fields are included when parsing the PhotonScan node.

parthosa commented 1 month ago

The metadata properties RequiredDataFilters and DictionaryFilters are present only in Photon event logs. This metadata information is saved in the data_source_information.csvfile.

Current schema of data_source_information.csv:

root
 |-- appIndex: integer (nullable = true)
 |-- sqlID: integer (nullable = true)
 |-- sql_plan_version: integer (nullable = true)
 |-- nodeId: integer (nullable = true)
 |-- format: string (nullable = true)
 |-- buffer_time: integer (nullable = true)
 |-- scan_time: integer (nullable = true)
 |-- data_size: long (nullable = true)
 |-- decode_time: integer (nullable = true)
 |-- location: string (nullable = true)
 |-- pushedFilters: string (nullable = true)
 |-- schema: string (nullable = true)
 |-- data_filters: string (nullable = true)
 |-- partition_filters: string (nullable = true)
 |-- from_final_plan: boolean (nullable = true)

After addition of these there will be two new columns:

root
 |-- appIndex: integer (nullable = true)
..
 |-- data_filters: string (nullable = true)
 |-- partition_filters: string (nullable = true)
 |-- required_data_filters: string(nullable = true)
 |-- dictionary_filters: string(nullable = true)

Questions

cc: @amahussein @tgravescs @mattahrens