Closed harry671003 closed 2 weeks ago
@fpetkovski please correct me if I'm wrong. The approach that this repo takes is basically tailored to the situation where you have a natural sharding key (i.e. your leaf queriers are all distinguished by some region label) that you can shard aggregations with. That kinda corresponds to your images if you replace "partition=N" by "region=region_N". So in the ideal case the root querier does only work with low cardinality pre-aggregated results so that materializing them should be most of the cost of the query already. I dont fully understand the first bullet point, can you please elaborate what serializing and fragmenting mean and how it differs from the current approach?
Let me try to summarize my understanding of the approach in Thanos. Please correct me if I'm wrong.
Query
gRPC service (code).
Query()
and QueryRange()
methods.DistributedExecutionOptimizer
injects RemoteExecutions
in the query plan.remote.Execution
operators.remote.Execution
operator would call the Query()
or QueryRange()
gRPC APIs exposed by the remote querier passing the sub query string.The difference between this approach and the POC approach is that in the sub query results are never fully materialized.
QueryServer
service
Series()
and Next()
methodsNext()
multiple times to read all []model.StepVector
In Cortex, we're discussing about using this approach and I'm curious to understand if there are any trade offs. I'm also interested in learning what the Thanos community feels about this approach.
"An optimizer embedded in the query-frontend is aware of how the data in the storage layer (ingesters and store-gateways) are partitioned. The optimizer creates an optimized query plan." I dont fully understand the first bullet point, can you please elaborate what serializing and fragmenting mean and how it differs from the current approach?
The query planner embedded in query-frontend in Cortex would use the PartitionInfo
to create an optimized logical plan logicalplan.Plan
. In my POC, the data stored in Cortex was split into disjoint partitions. A time-series will be present in only one partition.
The output of the planner is logicalplan.Plan
which now has to be split into query fragments. A fragment is like a portion of the query plan. The logicalplan.Plan
fragments gets serialized into protobuf format and are assigned to multiple queriers.
Note the difference is that plan itself serialized and send to the queriers.
I think these are the main differences between the approach in Thanos and the POC approach:
[]model.StepVector
to the parent querier.
I recently did a proof of concept for distributed query execution using the Thanos promql-engine in Cortex. In my POC, I followed the methods outlined in https://howqueryengineswork.com/13-distributed-query.html.
How did the POC work?
I'll try to outline at a high level how the POC worked.
Series()
andNext()
on the child queriers and streams the series and the step vectors directly from the child queriersQuery plan
Original query:
count({__name__=~".+"})
Fragmented plan
Each of the fragments are denoted in a different color.
Comparing to the Thanos approach
The original proposal in Thanos for distributed query execution tackles this problem in a different way:
Questions