linkedin / transport

A framework for writing performant user-defined functions (UDFs) that are portable across a variety of engines including Apache Spark, Apache Hive, and Presto.
BSD 2-Clause "Simplified" License
291 stars 72 forks source link

Generated hive udf for working with struct values fails #120

Open mohit10verma opened 1 year ago

mohit10verma commented 1 year ago

I created a simple UDF and a unit test for it in the transport-udfs-examples module. The UDF increments the first integer field of a struct by 1. This is the UDF and the unit test:

public class StructElementIncrementByOneFunction extends StdUDF1<StdStruct, StdStruct> implements TopLevelStdUDF {

  @Override
  public List<String> getInputParameterSignatures() {
    return ImmutableList.of(
        "row(integer, integer)"
    );
  }

  @Override
  public String getOutputParameterSignature() {
    return "row(integer, integer)";
  }

  @Override
  public StdStruct eval(StdStruct myStruct) {
    int currVal = ((StdInteger) myStruct.getField(0)).get();
    myStruct.setField(0, getStdFactory().createInteger(currVal + 1));
    return myStruct;
  }

  @Override
  public String getFunctionName() {
    return "struct_element_increment_by_one";
  }

  @Override
  public String getFunctionDescription() {
    return "increment first element by one";
  }
}

A unit test for it:

public class TestStructElementIncrementByOneFunction extends AbstractStdUDFTest {

  @Override
  protected Map<Class<? extends TopLevelStdUDF>, List<Class<? extends StdUDF>>> getTopLevelStdUDFClassesAndImplementations() {
    return ImmutableMap.of(
        StructElementIncrementByOneFunction.class, ImmutableList.of(StructElementIncrementByOneFunction.class));
  }

  @Test
  public void testStructElementIncrementByOneFunction() {
    StdTester tester = getTester();
    tester.check(functionCall("struct_element_increment_by_one", row(1, 3)), row(2, 3), "row(integer,integer)");
    tester.check(functionCall("struct_element_increment_by_one", row(-2, 3)), row(-1, 3), "row(integer,integer)");
  }

The generated spark_2.11 and spark_2.12 UDFs run correctly. But the generated hive artifact fails with the below exception (I am running the gradle task ./gradlew hiveTask for testing).

java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to java.util.ArrayList at org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector.setStructFieldData(StandardStructObjectInspector.java:221) at com.linkedin.transport.hive.data.HiveStruct.setField(HiveStruct.java:53) at com.linkedin.transport.examples.StructElementIncrementByOneFunction.eval(StructElementIncrementByOneFunction.java:33) at com.linkedin.transport.examples.StructElementIncrementByOneFunction.eval(StructElementIncrementByOneFunction.java:16) at com.linkedin.transport.hive.StdUdfWrapper.evaluate(StdUdfWrapper.java:162) at org.apache.hadoop.hive.ql.exec.ExprNodeGenericFuncEvaluator._evaluate(ExprNodeGenericFuncEvaluator.java:186) at org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator.evaluate(ExprNodeEvaluator.java:77) at org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator.evaluate(ExprNodeEvaluator.java:65) at org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:81) at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:838) at org.apache.hadoop.hive.ql.exec.TableScanOperator.process(TableScanOperator.java:97) at org.apache.hadoop.hive.ql.exec.FetchOperator.pushRow(FetchOperator.java:425) at org.apache.hadoop.hive.ql.exec.FetchOperator.pushRow(FetchOperator.java:417) at org.apache.hadoop.hive.ql.exec.FetchTask.fetch(FetchTask.java:140) at org.apache.hadoop.hive.ql.Driver.getResults(Driver.java:1693) at org.apache.hive.service.cli.operation.SQLOperation.getNextRowSet(SQLOperation.java:347) at org.apache.hive.service.cli.operation.OperationManager.getOperationNextRowSet(OperationManager.java:220) at org.apache.hive.service.cli.session.HiveSessionImpl.fetchResults(HiveSessionImpl.java:685) at org.apache.hive.service.cli.CLIService.fetchResults(CLIService.java:455) at org.apache.hive.service.cli.CLIService.fetchResults(CLIService.java:447) at com.linkedin.transport.test.hive.HiveTester.assertFunctionCall(HiveTester.java:123) at com.linkedin.transport.test.spi.SqlStdTester.check(SqlStdTester.java:31) at com.linkedin.transport.test.spi.StdTester.check(StdTester.java:38) at com.linkedin.transport.examples.TestStructElementIncrementByOneFunction.testStructElementIncrementByOneFunction(TestStructElementIncrementByOneFunction.java:30)

I don't have any local changes. Am I doing something wrong or is this something to fix?

mohit10verma commented 1 year ago

I tried to fix this by making the following change in the hive implementation of StdUdfWrapper::wrap() .

    try {
      Object hiveObject = hiveDeferredObject.get();
      if (hiveObject != null) {
        if (stdData instanceof StdStruct) {
          if (hiveObject.getClass().isArray()) {
            Object[] hiveObjects = (Object[]) hiveObject;
            ((PlatformData) stdData).setUnderlyingData(new ArrayList<>(Arrays.asList(hiveObjects)));
          }
        } else {
          ((PlatformData) stdData).setUnderlyingData(hiveObject);
        }
        return stdData;
      } else {
        return null;
      }
    } catch (HiveException e) {
      throw new RuntimeException("Cannot extract Hive Object from Deferred Object");
    }

But after the above fix, I ran into more class cast issues because HiveInteger created in HiveFactory is of type JavaIntObjectInspector and that runs into trouble during serialization in Hive's FetchFormatter::SerDeUtils.toThriftPayload -> SerDeUtils::buildJSONString. This is because JavaIntObjectInspector::get tries to cast IntWritable to Integer directly.

I'll leave it to the experts of this codebase to fix this issue /guide me.

aastha25 commented 1 year ago

Mohit, please share the minimum code and steps to reproduce the error locally. You can raise a PR in transport and add the steps in the PR description.

mohit10verma commented 1 year ago

Hello, I posted all the details here already. To repro this issue, you can creat a new UDF in transport-udfs-examples module and then write a unit test for it. The code for these are in the bug description.

The previous comment was my attempt to fix it but I believe it was not complete.

aastha25 commented 1 year ago

from the error trace, im not sure which line number is throwing the error. For ex: it would help to know what is StructElementIncrementByOneFunction.java:33

Can you refer the struct fields using names and not index? Ex: ((StdInteger) myStruct.getField("int_field")).get()

mohit10verma commented 1 year ago

Created a PR to repro the issue: https://github.com/linkedin/transport/pull/123/files