apache / arrow

Apache Arrow is the universal columnar format and multi-language toolbox for fast data interchange and in-memory analytics
https://arrow.apache.org/
Apache License 2.0
14.55k stars 3.54k forks source link

[Python][Substrait] Acero consumer is unable to consume count function from substrait query plan #32369

Open asfimport opened 2 years ago

asfimport commented 2 years ago

SQL


SELECT
    o_orderpriority,
    count(*) AS order_count
FROM
    orders
GROUP BY
    o_orderpriority

The substrait plan generated from SQL, using Isthmus.

 

substrait count: 

https://github.com/substrait-io/substrait/blob/main/extensions/functions_aggregate_generic.yaml

 

Running the substrait plan with Acero returns this error:


E   pyarrow.lib.ArrowInvalid: JsonToBinaryStream returned INVALID_ARGUMENT:(relations[0].root.input.aggregate.measures[0].measure) arguments: Cannot find field.  

 

From substrait query plan:

relations[0].root.input.aggregate.measures[0].measure


"measure": {
  "functionReference": 0,
  "args": [],
  "sorts": [],
  "phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT",
  "outputType": {
    "i64": {
      "typeVariationReference": 0,
      "nullability": "NULLABILITY_REQUIRED"
    }
  },
  "invocation": "AGGREGATION_INVOCATION_ALL",
  "arguments": []
}

"extensions": [{
  "extensionFunction": {
    "extensionUriReference": 1,
    "functionAnchor": 0,
    "name": "count:opt"
  }
}],

Count is a unary function and should be consumable, but isn't in this case.

Reporter: Richard Tia / @richtia

PRs and other links:

Note: This issue was originally created as ARROW-17061. Please see the migration documentation for further details.

asfimport commented 2 years ago

Vibhatha Lakmal Abeykoon / @vibhatha: @richtia  could you please try the following?

https://github.com/vibhatha/arrow/blob/arrow-17061/python/pyarrow/tests/test_substrait.py#L177-L274

 

asfimport commented 2 years ago

Richard Tia / @richtia: So I actually tried again using the example in the issue:

 

 


>   ???
E   pyarrow.lib.ArrowNotImplementedError: Only unary aggregate functions are currently supported 

Here's the plan:


{
  "extensionUris": [{
    "extensionUriAnchor": 1,
    "uri": "AGGREGATE_URI_PLACEHOLDER"
  }],
  "extensions": [{
    "extensionFunction": {
      "extensionUriReference": 1,
      "functionAnchor": 0,
      "name": "count"
    }
  }],
  "relations": [{
    "root": {
      "input": {
        "aggregate": {
          "common": {
            "direct": {
            }
          },
          "input": {
            "project": {
              "common": {
                "emit": {
                  "outputMapping": [9]
                }
              },
              "input": {
                "read": {
                  "common": {
                    "direct": {
                    }
                  },
                  "baseSchema": {
                    "names": ["O_ORDERKEY", "O_CUSTKEY", "O_ORDERSTATUS", "O_TOTALPRICE", "O_ORDERDATE", "O_ORDERPRIORITY", "O_CLERK", "O_SHIPPRIORITY", "O_COMMENT"],
                    "struct": {
                      "types": [{
                        "i32": {
                          "typeVariationReference": 0,
                          "nullability": "NULLABILITY_REQUIRED"
                        }
                      }, {
                        "i32": {
                          "typeVariationReference": 0,
                          "nullability": "NULLABILITY_REQUIRED"
                        }
                      }, {
                        "string": {
                          "typeVariationReference": 0,
                          "nullability": "NULLABILITY_REQUIRED"
                        }
                      }, {
                        "decimal": {
                          "scale": 2,
                          "precision": 15,
                          "typeVariationReference": 0,
                          "nullability": "NULLABILITY_REQUIRED"
                        }
                      }, {
                        "date": {
                          "typeVariationReference": 0,
                          "nullability": "NULLABILITY_REQUIRED"
                        }
                      }, {
                        "string": {
                          "typeVariationReference": 0,
                          "nullability": "NULLABILITY_REQUIRED"
                        }
                      }, {
                        "string": {
                          "typeVariationReference": 0,
                          "nullability": "NULLABILITY_REQUIRED"
                        }
                      }, {
                        "i32": {
                          "typeVariationReference": 0,
                          "nullability": "NULLABILITY_REQUIRED"
                        }
                      }, {
                        "string": {
                          "typeVariationReference": 0,
                          "nullability": "NULLABILITY_REQUIRED"
                        }
                      }],
                      "typeVariationReference": 0,
                      "nullability": "NULLABILITY_REQUIRED"
                    }
                  },
                    "local_files": {
                      "items": [
                        {
                          "uri_file": "file://FILENAME_PLACEHOLDER_0",
                          "parquet": {}
                        }
                      ]
                    }
                }
              },
              "expressions": [{
                "selection": {
                  "directReference": {
                    "structField": {
                      "field": 5
                    }
                  },
                  "rootReference": {
                  }
                }
              }]
            }
          },
          "groupings": [{
            "groupingExpressions": [{
              "selection": {
                "directReference": {
                  "structField": {
                    "field": 0
                  }
                },
                "rootReference": {
                }
              }
            }]
          }],
          "measures": [{
            "measure": {
              "functionReference": 0,
              "args": [],
              "sorts": [],
              "phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT",
              "outputType": {
                "i64": {
                  "typeVariationReference": 0,
                  "nullability": "NULLABILITY_REQUIRED"
                }
              },
              "invocation": "AGGREGATION_INVOCATION_ALL",
              "arguments": []
            }
          }]
        }
      },
      "names": ["O_ORDERPRIORITY", "ORDER_COUNT"]
    }
  }],
  "expectedTypeUrls": []
} 

 

 

 

asfimport commented 1 year ago

Apache Arrow JIRA Bot: This issue was last updated over 90 days ago, which may be an indication it is no longer being actively worked. To better reflect the current state, the issue is being unassigned per project policy. Please feel free to re-take assignment of the issue if it is being actively worked, or if you plan to start that work soon.