voltrondata / spark-substrait-gateway

Implements a gateway that speaks the SparkConnect protocol and drives a backend using Substrait (over ADBC Flight SQL).
Apache License 2.0
16 stars 9 forks source link

Support Spark's "struct" function #67

Open pthatte1-bb opened 3 months ago

pthatte1-bb commented 3 months ago

Usages of the SparkSQL function def struct(cols: Column*): Column fail with an error message - "Exception iterating responses: Function struct not found in the Spark to Substrait mapping table."

EpsilonPrime commented 3 months ago

Support for structured data types is not strong in the engines with Substrait support. We will need to improve the backend support before we can make further progress here. In addition to supporting the type itself there are a myriad of operations (including access as if the struct was a dictionary using square brackets) that should also be implemented.

pat70 commented 3 months ago

The linked PR uses ExtensionFunctions to map unresolved_function when a backend supports a specific function (in this case - the struct function).

I've set the PR to draft for now and I'm trying to grok the comment about "supporting the type itself". Does this refer to Compound Types on this page: https://substrait.io/types/type_classes/#compound-types

EpsilonPrime commented 3 months ago

The struct type is defined (and documented in the compound types section). Implementing struct() on its own is fine -- it will need to be done eventually. The problem is what comes after you have the struct as we don't have anything that works on them.

pthatte1-bb commented 3 months ago

Re: what comes after you have the struct

The requested functionality unblocks some struct usages in our existing code for ColumnGroup-style handling. This snippet shows an oversimplified example of what is requested, and it runs locally using the linked-draft-PR's changes

(
    get_customer_database(spark_session)
    .select(struct(col('c_custkey'), col('c_name')).alias('test_struct'))
    .agg(min(col('test_struct').getField('c_custkey')))
    .show()
)
EpsilonPrime commented 3 months ago

DuckDB did add more struct support to Substrait this week but I believe we need nested expressions to handle this properly. Turns out the substrait-validator doesn't have support for nested expressions either. So support will need to be added to the validator and DuckDB. I've filed a request for DuckDB to look into nested expressions.

EpsilonPrime commented 3 months ago

Nested support has been added DuckDB's Substrait implementation today. It will be in their next release. So now just the validator and the gateway need updating.

pat70 commented 3 months ago

I see the commits. I'm taking a crack at another PR for the gateway.

pat70 commented 3 months ago

Re: Nested support has been added DuckDB's Substrait implementation today.

FYI I tested locally and this is the generated substrait that DuckDB accepts - image

EpsilonPrime commented 2 months ago

When DuckDB's release lands (in the next week or two) I'll take a crack at this.

EpsilonPrime commented 2 months ago

The problem with struct_pack is that you would need a version of the function with varying output signatures (one for every output type you could generate). Substrait introduced a feature called nested expressions that handles this.

Here's what the pyspark code above will look like with one change. DuckDB doesn't have any names for the constructed structure so struct_extract (used to implement get_field) has to use number references instead of string references. That's fixable since I have the names when I'm converting get_field. The other hidden gotcha is that the output names now needs a more complicated datastructure as each of the struct's fields needs to also be there.

extension_uris {
  extension_uri_anchor: 1
  uri: "/unknown.yaml"
}
extension_uris {
  extension_uri_anchor: 2
  uri: "/functions_structs.yaml"
}
extensions {
  extension_function {
    extension_uri_reference: 1
    function_anchor: 1
    name: "min:i64"
  }
}
extensions {
  extension_function {
    extension_uri_reference: 2
    function_anchor: 2
    name: "struct_extract:any_str"
  }
}
relations {
  root {
    input {
      project {
        common {
          emit {
            output_mapping: 0
          }
        }
        input {
          aggregate {
            common {
              direct {
              }
            }
            input {
              project {
                common {
                  emit {
                    output_mapping: 8
                  }
                }
                input {
                  read {
                    common {
                      direct {
                      }
                    }
                    base_schema {
                      names: "c_custkey"
                      names: "c_name"
                      names: "c_address"
                      names: "c_nationkey"
                      names: "c_phone"
                      names: "c_acctbal"
                      names: "c_mktsegment"
                      names: "c_comment"
                      struct {
                        types {
                          i64 {
                            nullability: NULLABILITY_NULLABLE
                          }
                        }
                        types {
                          string {
                            nullability: NULLABILITY_NULLABLE
                          }
                        }
                        types {
                          string {
                            nullability: NULLABILITY_NULLABLE
                          }
                        }
                        types {
                          i32 {
                            nullability: NULLABILITY_NULLABLE
                          }
                        }
                        types {
                          string {
                            nullability: NULLABILITY_NULLABLE
                          }
                        }
                        types {
                          decimal {
                            scale: 2
                            precision: 15
                            nullability: NULLABILITY_NULLABLE
                          }
                        }
                        types {
                          string {
                            nullability: NULLABILITY_NULLABLE
                          }
                        }
                        types {
                          string {
                            nullability: NULLABILITY_NULLABLE
                          }
                        }
                        nullability: NULLABILITY_REQUIRED
                      }
                    }
                    named_table {
                      names: "customer"
                    }
                  }
                }
                expressions {
                  nested {
                    struct {
                      fields {
                        selection {
                          direct_reference {
                            struct_field {
                            }
                          }
                          root_reference {
                          }
                        }
                      }
                      fields {
                        selection {
                          direct_reference {
                            struct_field {
                              field: 1
                            }
                          }
                          root_reference {
                          }
                        }
                      }
                    }
                  }
                }
              }
            }
            groupings {
            }
            measures {
              measure {
                function_reference: 1
                phase: AGGREGATION_PHASE_INITIAL_TO_RESULT
                output_type {
                  i64 {
                    nullability: NULLABILITY_REQUIRED
                  }
                }
                arguments {
                  value {
                    scalar_function {
                      function_reference: 2
                      arguments {
                        value {
                          selection {
                            direct_reference {
                              struct_field {
                              }
                            }
                            root_reference {
                            }
                          }
                        }
                      }
                      arguments {
                        value {
                          literal {
                            string: "c_custkey"
                          }
                        }
                      }
                    }
                  }
                }
              }
            }
          }
        }
        expressions {
          selection {
            direct_reference {
              struct_field {
              }
            }
            root_reference {
            }
          }
        }
      }
    }
    names: "result"
  }
}
version {
  minor_number: 52
  producer: "spark-substrait-gateway"
}
EpsilonPrime commented 1 month ago

Update: I have a PR that should work except for the returned data from DuckDB does not have names on the struct fields (they are empty strings). I've filed an issue but it should be possible to work around the issue by repackaging the data before returning it.

EpsilonPrime commented 1 month ago

91 is now ready for review. I will need to find someone who can review and merge it. However in the meantime it is available for playing with.

Note that there are two current limitations:

  1. Only one level of struct depth is supported.
  2. getField does not yet work so it is difficult to access subfields.

To address these two shortfalls requires keeping track of the types (and the associated names) throughout the plan conversion process. The refactoring in this PR makes that possible.