filodb / FiloDB

Distributed Prometheus time series database
Apache License 2.0
1.43k stars 225 forks source link

perf(query): pushdown aggregations when inner plan preserves tschema/shard-key labels #1761

Closed alextheimer closed 5 months ago

alextheimer commented 6 months ago

Pull Request checklist

We can pushdown an aggregation when the inner plan's materialization is guaranteed to produce ExecPlans that each yield disjoint sets of series. This occurs under different conditions for each of the planners:

For example, consider:

sum(max(foo{shardKeyLabel=~".*"}) by (shardKeyLabel))

Suppose this data lives on two partitions. Then this might have its pushdown-optimized execution planned as:

  Sum                   // 6
    Sum                 // 5
      Max(local_data)   // shardKeyLabel=A -> 3, shardKeyLabel=B -> 2
    Sum                 // 1
      Max(remote_data)  // shardKeyLabel=C -> 1

By inspection, the above result is correct. But suppose a query instead does not preserve shard-key labels. If data lives on a single partition, the result is always trivially correct:

  Sum                 // 5
    Max(local_data)   // label1=A -> 3, labelB -> 2

-- but if data is housed in more than one partition, the result may be incorrect:

sum(max(foo{shardKeyLabel=~".*"}) by (label1))

  Sum                   // 6
    Sum                 // 5
      Max(local_data)   // label1=A -> 3, label1=B -> 2
    Sum                 // 1
      Max(remote_data)  // label1=A -> 1

This discrepancy happens because aggregation occurs across values that account for the same series (in this case: label1=A). To prevent this, pushdowns must only occur when the inner vector tree preserves labels that guarantee the sets of series returned by all materialized child plans will be disjoint.

Example planner behavior:

// Can be pushed down because shard-key labels are preserved.
count(sum(foo{_ws_="demo",_ns_=~".*Ns"}) by (_ws_,_ns_))
~~~~~~~~~~~~~~~~ OLD PLAN ~~~~~~~~~~~~~~~~
T~AggregatePresenter(aggrOp=Count, aggrParams=List(), rangeParams=RangeParams(1634774330,300,1634777330))
-E~LocalPartitionReduceAggregateExec(aggrOp=Count, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0)))
--T~AggregateMapReduce(aggrOp=Count, aggrParams=List(), without=List(), by=List())
---T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(100,1,1000))
----E~MultiPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0)))
-----E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-661271212],raw)
------T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List(_ws_, _ns_))
-------T~PeriodicSamplesMapper(start=1634774330000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None)
--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634774030000,1634777330000), filters=List(ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-661271212],raw)
------T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List(_ws_, _ns_))
-------T~PeriodicSamplesMapper(start=1634774330000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None)
--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634774030000,1634777330000), filters=List(ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-661271212],raw)
-----E~PromQlRemoteExec(PromQlQueryParams(sum(foo{_ws_="demo",_ns_="remoteNs"}) by (_ws_,_ns_),1634774330,300,1634777330,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,true,true,false,false,true,10,false), queryEndpoint=remotePartition-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0)))
~~~~~~~~~~~~~~~ NEW PLAN ~~~~~~~~~~~~~~~
T~AggregatePresenter(aggrOp=Count, aggrParams=List(), rangeParams=RangeParams(100,1,1000))
-E~MultiPartitionReduceAggregateExec(aggrOp=Count, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0)))
--E~LocalPartitionReduceAggregateExec(aggrOp=Count, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1467305314],raw)
---T~AggregateMapReduce(aggrOp=Count, aggrParams=List(), without=List(), by=List())
----E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1467305314],raw)
-----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List(_ws_, _ns_))
------T~PeriodicSamplesMapper(start=1634774330000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None)
-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634774030000,1634777330000), filters=List(ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1467305314],raw)
-----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List(_ws_, _ns_))
------T~PeriodicSamplesMapper(start=1634774330000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None)
-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634774030000,1634777330000), filters=List(ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1467305314],raw)
--E~PromQlRemoteExec(PromQlQueryParams(count(sum(foo{_ws_="demo",_ns_="remoteNs"}) by (_ws_,_ns_)),1634774330,300,1634777330,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,true,true,false,false,true,10,false), queryEndpoint=remotePartition-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0)))