facebookincubator / velox

A composable and fully extensible C++ execution engine library for data management systems.
https://velox-lib.io/
Apache License 2.0
3.54k stars 1.17k forks source link

SparkSQL decode 2nd signature cannot be supported with current simple function signature API #8528

Open codyschierbeck opened 10 months ago

codyschierbeck commented 10 months ago

Description

Thank you to @kgpai for discussing this issue with me beforehand. I'm currently experimenting with some of the suggestions you gave.

This is in reference to a design issue I ran into while implementing SparkSQL decode, draft PR found here: #8456

I chose to exclude a SparkSQL decode signature in this PR as it seemed to be too complicated to be supported with Velox as it currently exists. I'm hoping there is something that I've missed that will actually allow me to include the following function signature in this PR, found here: https://spark.apache.org/docs/latest/api/sql/index.html#decode

decode(expr, search, result [, search, result ] ... [, default]) - Compares expr to each search value in order. If expr is equal to a search value, decode returns the corresponding result. If no match is found, then it returns default. If default is omitted, it returns null.

In my local pyspark testing of this function I've found that the type of each "result" argument must match, but the type of each "search" type can be Any, and don't need to match the "expr" type or the result type. This prevents a proper signature from specificying the Variadic args types.

If expr needed to match result I could just do the following

registerFunction<sparksql::DecodeFunction, Varchar, Any, Variadic<Varchar>>(
      {prefix + "decode"});
registerFunction<sparksql::DecodeFunction, Varbinary, Any, Variadic<Varbinary>>... etc

but unfortunately I can't if I want to match the functionality exactly.

My current signature for this is:

registerFunction<sparksql::DecodeFunction, Varchar, Any, Variadic<Any>>(
      {prefix + "decode"});

Where the Varchar is a string encoding of the returned value, copied almost exactly from how ToStringFunctionCastTo implements this in https://github.com/facebookincubator/velox/blob/main/velox/expression/tests/GenericViewTest.cpp.

My ideal signature includes a lot of quality of life features that are not necessary or maybe even possible, but would be along the lines of

registerFunction<sparksql::DecodeFunction, T1, Any, VariadicPair<Any, T1>>, Optional<T1>(
      {prefix + "decode"});

Where T1 would ensure that the return, result, and default values all share the same type. "Pair" would ensure the size of Variadic arguments is even, and optional would allow me to not have to register this function signature twice to represent an optional argument.

What I believe that Velox is missing which prevents me from implementing this signature is the following:

  1. return type "Any". I understand that returning "Any" does not make sense, but the ability to register a function once with a placeholder that matches an argument type that proceeds it would be very helpful, instead of having to register a signature for each possible type that result can be, including nested complex types.
  2. A "VariadicPair" or some ability to specify an attribute of the VardiacArgs. Instead of VariadicPair, maybe a VariadicTuple<Any, T1>(2) to represent this case, which would allow for VaradicTuple<Varchar, Int, Varbinary>(3) later if a future use case calls for it. This is already somewhat emulatable within the current framework, so just quality of life.
  3. Args following a varadic arg. My understanding is that Velox leverages DuckDb parser for parsing, and that recognizing the end of variadic args is a difficult problem to solve (which I assume is why they must be the last arguments of a function). Even if recognizing when variadic args end wasn't already a difficult problem to solve, implementing this has additional complexity because of the dependency.
  4. Optional. Just quality of life, not necessary.

This signature may be already supportable by Velox, and if that's the case I would love to learn how.

Additional Notes

Every suggestion after returning "Any" type can already be emulated within a call. For example:

  TypeKind resultType = TypeKind::INVALID;
  FOLLY_ALWAYS_INLINE bool call(
      out_type<Varchar>& result,
      const arg_type<Any>& expr,
      const arg_type<Variadic<Any>>& args) {
    if (resultType == TypeKind::INVALID) {
      if (args.size() == 1) {
        resultType = args[0].kind();
        return makeItem(result, args[0]);
      } else if (args.size() > 1) {
        resultType = args[2].kind();
      } else {
        VELOX_USER_FAIL("Placeholder error");
      }
    }

    int i = 0;
    for (const auto& arg : args) {
      if (i % 2 != 0 && i != args.size() - 1) {
        i++;
        continue;
      } else if (i == args.size() - 1) {
        return makeItem(result, args[i]);
      }
      if (checkEqual(expr, arg)) {
        return makeItem(result, args[i + 1]);
      }
      i++;
    }
    return false;
  }

Where the first block initializes a query global return type. Before each call to makeItem we can check the respective args .kind() value against resultType and throw an exception if they don't match. There's a possibility this signature wouldn't be fuzzable though, since dictionary vector encodings could lead to exceptions being thrown at different points of execution between common and simplified eval. Even though this exception would be argument dependant, whether it throws or not is dependant on which row was passed to evaluation first.

mbasmanova commented 10 months ago

This would need to be implemented as a VectorFunction, I think.

return type "Any". I understand that returning "Any" does not make sense, but the ability to register a function once with a placeholder that matches an argument type that proceeds it would be very helpful, instead of having to register a signature for each possible type that result can be, including nested complex types.

This is possible using Generic<T1>. See flatten function:

  registerFunction<
      ArrayFlattenFunction,
      Array<Generic<T1>>,
      Array<Array<Generic<T1>>>>({prefix + "flatten"});
codyschierbeck commented 10 months ago

Thank you, I'll take a look now and try to implement

mbasmanova commented 10 months ago

@codyschierbeck The decode function looks like a special case of a SWITCH expression. I wonder if it would be simpler to just re-write decode as switch.

CC: @rui-mo

rui-mo commented 10 months ago

@mbasmanova I find Spark generates a CaseWhen expression to compute decode as createExpr. Maybe we could do the similar thing.

codyschierbeck commented 10 months ago

@codyschierbeck The decode function looks like a special case of a SWITCH expression. I wonder if it would be simpler to just re-write decode as switch.

CC: @rui-mo

Hi @mbasmanova,

In re-rewriting decode as switch, would that mean treating decode as a Spark specific special form?

mbasmanova commented 10 months ago

@codyschierbeck I expect there is some code in Gluten that converts Spark's expression tree into Velox's expression tree. Here, we wold convert decode call not into core::CallTypedExpr("decode"), but into core::CallTypedExpr("switch"). @rui-mo Rui may have some pointers. I expect all the code will be outside of Velox.

codyschierbeck commented 10 months ago

@PHILO-HE Thoughts? I think in this case Gluten may have to parse the input before modifying the expression tree.

  1. Extract the first argument as "expr" for example
  2. Replace every second argument with something like "arg == expr" to conform to switches Condition requirement
./_build/debug/velox/expression/tests/velox_expression_runner_test --sql "switch(2, 'hello', false, 'bye')"
E0125 15:32:32.370146 179580 Exceptions.h:68] Line: /home/sdp/pinterest/velox/velox/expression/SwitchExpr.cpp:240, Function:resolveType, Expression: conditionType->kind() == TypeKind::BOOLEAN (BIGINT vs. BOOLEAN) Condition of  SWITCH statement is not bool, Source: RUNTIME, ErrorCode: INVALID_STATE
terminate called after throwing an instance of 'facebook::velox::VeloxRuntimeError'
  what():  Exception: VeloxRuntimeError
Error Source: RUNTIME
Error Code: INVALID_STATE
Reason: (BIGINT vs. BOOLEAN) Condition of  SWITCH statement is not bool 
rui-mo commented 10 months ago

@codyschierbeck Spark has implemented the needed replacement at createExpr. After that takes place, Gluten receives CaseWhen expression with EqualNullSafe as condition, which can be converted to Velox SWITCH expression for computing.

PHILO-HE commented 10 months ago

Hi @codyschierbeck, I note Spark has two implementations for decode function, depending on the input pattern.

  1. decode(expr, { key1, value1 } [, ...] [, defValue]) For this input pattern, if key1 matches expr, the function returns value1. Otherwise, go to compare expr with the key of next pair. As @rui-mo mentioned, Spark actually translates it to CaseWhen during creating Spark plan node. So Gluten will receive CaseWhen for this case, then will translate it to Velox's switch expression. CaseWhen/switch has already been supported by Gluten & Velox. I just confirmed the above statement by testing decode function with this input pattern.
  2. decode(expr, charSet) The decode function has a different meaning for this input pattern. It decodes expr by using charset. I think you only need to implement this one in Velox. And I note Velox has implemented from_utf8 Presto function, which decodes UTF-8 string from binary. It may be similar to the 2nd decode function except that Spark decode supports more charSet other than UTF-8. Since UTF-8 is commonly used, I think you can only consider UTF-8 in the first PR.

See Spark source code for decode function.