apache / pinot

Apache Pinot - A realtime distributed OLAP datastore
https://pinot.apache.org/
Apache License 2.0
5.38k stars 1.26k forks source link

[multistage] Query with Anti Semi-Join Fails #10628

Open ankitsultana opened 1 year ago

ankitsultana commented 1 year ago

This query:

SELECT COUNT(*) FROM userAttributes_OFFLINE 
  WHERE daysSinceFirstTrip NOT IN (SELECT daysSinceFirstTrip FROM userAttributes_OFFLINE)

fails with:

2023/04/17 15:08:16.508 ERROR [OpChainSchedulerService] [query_worker_on_60908_port-7-thread-5] (OpChain{8_0_1}): Completed erroneously (8_0_1) Queued Count: 2, Executing Time: 21ms, Queued Time: 0ms {1000=class java.lang.Boolean cannot be cast to class java.lang.Number (java.lang.Boolean and java.lang.Number are in module java.base of loader 'bootstrap')
java.lang.ClassCastException: class java.lang.Boolean cannot be cast to class java.lang.Number (java.lang.Boolean and java.lang.Number are in module java.base of loader 'bootstrap')
        at org.apache.pinot.query.runtime.operator.utils.AggregationUtils.mergeMin(AggregationUtils.java:62)
        at org.apache.pinot.query.runtime.operator.utils.AggregationUtils$Accumulator.lambda$static$6(AggregationUtils.java:116)
        at org.apache.pinot.query.runtime.operator.utils.AggregationUtils$Accumulator.accumulate(AggregationUtils.java:174)
        at org.apache.pinot.query.runtime.operator.AggregateOperator.consumeInputBlocks(AggregateOperator.java:205)}

Calcite defines an ordering between boolean values (see SqlFunctions::compare) which is the same as Boolean.compareTo, i.e. false is less than true. I think we need to update AggregationUtils to add support for a boolean merger for min/max functions.

@somandal : Can you lmk your thoughts?

Edit: I think the issue can happen for all anti semi-join queries, if the the input to the aggregation doesn't have unique values already and hits the else block in AggregationUtils::accumulate

image

somandal commented 1 year ago

Yes, I came across this issue earlier (this function was the same before I moved it out from the AggregateOperator to AggregationUtils) when I tried to fix the initialize function for all aggregations to cast to double (like the merge functions do today). Didn't try to find the right fix for this since this was an existing issue at the time.

I think the problem here is that the aggregation functions typecast to double, and that doesn't work with boolean, so we should handle booleans separately for this case. for MIN, we should set it to 'false' if any value is 'false' otherwise 'true'. Similarly for max. I can take up a fix for this. cc @walterddr any additional thoughts?

Tried generating a plan for a similar query:

Query: SELECT COUNT(*) FROM a WHERE a.col1 NOT IN (SELECT a.col1 FROM a)
Explain: 
Execution Plan
LogicalAggregate(group=[{}], EXPR$0=[$SUM0($0)])
  LogicalExchange(distribution=[hash])
    LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
      LogicalFilter(condition=[IS NOT TRUE($2)])
        LogicalJoin(condition=[=($0, $1)], joinType=[left])
          LogicalExchange(distribution=[hash[0]])
            LogicalProject(col10=[$0])
              LogicalTableScan(table=[[a]])
          LogicalExchange(distribution=[hash[0]])
            LogicalAggregate(group=[{0}], agg#0=[MIN($1)])
              LogicalExchange(distribution=[hash[0]])
                LogicalAggregate(group=[{0}], agg#0=[MIN($1)])
                  LogicalProject(col1=[$0], $f1=[true])
                    LogicalTableScan(table=[[a]])
somandal commented 1 year ago

@ankitsultana @walterddr we could potentially do something like this:

  public interface Merger {
    /**
     * Initializes the merger based on the first input
     */
    default Object initialize(Object other, DataSchema.ColumnDataType dataType) {
      // TODO: Initialize as a double so that if only one row is returned it matches the type when many rows are
      //       returned
      return other == null ? dataType.getNullPlaceholder() : other;
    }

    /**
     * Merges the existing aggregate (the result of {@link #initialize(Object, DataSchema.ColumnDataType)}) with
     * the new value coming in (which may be an aggregate in and of itself).
     */
    Object merge(Object agg, Object value);
  }

  public abstract static class BaseMerger implements AggregationUtils.Merger {
    public final DataSchema.ColumnDataType _dataType;

    BaseMerger(DataSchema.ColumnDataType dataType) {
      _dataType = dataType;
    }
  }

We can have all the mergers extend the above BaseMerger so that they get the _dataType. Then we can use the _dataType within the min and max functions to implement it differently:

  private static class MergeMins extends BaseMerger {

    MergeMins(DataSchema.ColumnDataType dataType) {
      super(dataType);
    }

    @Override
    public Object merge(Object left, Object right) {
      if (_dataType == DataSchema.ColumnDataType.BOOLEAN) {
        return (Boolean) left && (Boolean) right;
      }
      return Math.min(((Number) left).doubleValue(), ((Number) right).doubleValue());
    }
  }

wdyt?

I checked how this works in the v1 engine, and it looks like we setup the ColumnType Boolean with storedType as INT and in the aggregation functions in v1 we look at the stored type to decide how to aggregate.

walterddr commented 1 year ago

hmm. can we explain the plan with ANTI JOIN and see what node it is generating?

ankitsultana commented 1 year ago

what prompts the aggregate function to be used here as MIN(bool)?

As an example take this query:

explain plan for select userUUID, deviceOS from userAttributes_OFFLINE where userUUID NOT IN (SELECT userUUID FROM userGroups_OFFLINE WHERE groupUUID = 'group-1')

This will have this plan:

Execution Plan
LogicalProject(userUUID=[$1], deviceOS=[$0])
  LogicalFilter(condition=[IS NOT TRUE($4)])
    LogicalJoin(condition=[=($2, $3)], joinType=[left])
      LogicalExchange(distribution=[hash[2]])
        LogicalProject(deviceOS=[$4], userUUID=[$6], userUUID0=[$6])
          LogicalTableScan(table=[[userAttributes_OFFLINE]])
      LogicalExchange(distribution=[hash[0]])
        LogicalAggregate(group=[{0}], agg#0=[MIN($1)])
          LogicalExchange(distribution=[hash[0]])
            LogicalAggregate(group=[{0}], agg#0=[MIN($1)])
              LogicalProject(userUUID=[$4], $f1=[true])
                LogicalFilter(condition=[=($3, 'group-1')])
                  LogicalTableScan(table=[[userGroups_OFFLINE]])

For NOT IN Calcite creates a left-join and adds a virtual column called f0 to the right-side which has all values as true. After the join, only those rows are kept that have this column value as false. In this particular case since the columns are assumed to be non-nullable, if the left-side has a userUUID which is not there in the result-set of the right side of the join, the value of this virtual column is assumed to be false.

I think nullability might also have a role to play here. One could ask why should the default value be false and not-null (in this case it could be because all columns are non-nullable on master right now).

somandal commented 1 year ago

We could probably map to the BOOL_AND/BOOL_OR within the AggregateOperator based on the column data type. I don't think modifying the logical plan to use BOOL_AND/BOOL_OR will be easy (and there may be many corner cases to worry about)

    public static final Map<String, Function<DataSchema.ColumnDataType, AggregationUtils.Merger>> MERGERS =
        ImmutableMap.<String, Function<DataSchema.ColumnDataType, AggregationUtils.Merger>>builder()
            .put("SUM", cdt -> AggregationUtils::mergeSum)
            .put("$SUM", cdt -> AggregationUtils::mergeSum)
            .put("$SUM0", cdt -> AggregationUtils::mergeSum)
            .put("MIN", cdt -> AggregationUtils::mergeMin)
            .put("MIN", cdt -> {
              if (cdt == DataSchema.ColumnDataType.BOOLEAN) {
                return AggregationUtils::mergeBoolAnd;
              } else {
                return AggregationUtils::mergeMin;
              }})
...
...
walterddr commented 1 year ago

ah. i think this was the issue

  LogicalFilter(condition=[IS NOT TRUE($4)])

...
            LogicalAggregate(group=[{0}], agg#0=[MIN($1)])
              LogicalProject(userUUID=[$4], $f1=[true])

^ the only reason why this is generated a LEFT JOIN is to handle the IS_NOT_TRUE --> b/c it is utilizing the nullability of LEFT JOIN to potentially give the IS_NOT_TRUE function a false result -->otherwise based on the plan it will always be true and thus the filter is useless.

however in Pinot's use case this should actually be sufficient to use INNER JOIN b/c the left userUUID we can guarantee to be non-nullable (otherwise we can throw?) thus we don't need to handle this situation. was wondering if that's better.

@somandal was adding the BOOL_AND and BOOL_OR resolves the issue? it is "one" solution but i don't think it is ideal b/c it makes several extra operators that does basically nothing :-/

somandal commented 1 year ago

however in Pinot's use case this should actually be sufficient to use INNER JOIN b/c the left userUUID we can guarantee to be non-nullable (otherwise we can throw?) thus we don't need to handle this situation. was wondering if that's better.

@somandal was adding the BOOL_AND and BOOL_OR resolves the issue? it is "one" solution but i don't think it is ideal b/c it makes several extra operators that does basically nothing :-/

BOOL_AND/BOOL_OR should solve the issue (have to actually try it out) but yes this may not be the ideal solution for this problem. Are you recommending a rule rewrite for this scenario?

In general though, if a plan tries to run MIN/MAX on a Boolean column though, shouldn't the aggregation functions handle this scenario correctly? I still think adding a fix to handle bools correctly is a useful fix to add. Today won't queries that run MIN()/MAX() on a boolean column fail?

but i don't think it is ideal b/c it makes several extra operators that does basically nothing :-/

I didn't understand this part, why do you say it makes extra operators that does basically nothing? The nodes and operators will still be the exact same as specified in the plan, we just decide which function to use based on the type.