Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question about distributed query exection in Thanos #405

Open
harry671003 opened this issue Jan 29, 2024 · 3 comments
Open

Question about distributed query exection in Thanos #405

harry671003 opened this issue Jan 29, 2024 · 3 comments

Comments

@harry671003
Copy link

harry671003 commented Jan 29, 2024

I recently did a proof of concept for distributed query execution using the Thanos promql-engine in Cortex. In my POC, I followed the methods outlined in https://howqueryengineswork.com/13-distributed-query.html.

How did the POC work?

POC

I'll try to outline at a high level how the POC worked.

  • An optimizer embedded in the query-frontend is aware of how the data in the storage layer (ingesters and store-gateways) are partitioned. The optimizer creates an optimized query plan.
  • The query-frontend then serializes the query plan into protobuf format and sends it to query-scheduler
  • The query-scheduler would split the plan into multiple query plan fragments.
  • The query-scheduler assigns the query plan fragments to different queriers.
  • One of the querier will be assigned the root fragment and it orchestrates the query execution.
    • It calls Series() and Next() on the child queriers and streams the series and the step vectors directly from the child queriers
    • Once the query execution is complete, it directly responds with the results back to the query-frontend

Query plan

Original query: count({__name__=~".+"})

Optimized plan

Fragmented plan

Each of the fragments are denoted in a different color.

Fragmented plan

Comparing to the Thanos approach

The original proposal in Thanos for distributed query execution tackles this problem in a different way:

  • Instead of serializing the plan, it opts for query rewrites.
  • Instead of streaming from child queriers, the "central" querier read the fully materialized query results from the child queriers.

Questions

  • Were there any particular tradeoffs considered opting for query-rewrites instead of serializing and fragmenting the plan?
  • Similarly, why did Thanos opt for fully materializing the query results in the child queriers?
  • Would Thanos be open to consider the serialization and fragmenting approach in the future?
@MichaHoffmann
Copy link
Contributor

@fpetkovski please correct me if I'm wrong. The approach that this repo takes is basically tailored to the situation where you have a natural sharding key (i.e. your leaf queriers are all distinguished by some region label) that you can shard aggregations with. That kinda corresponds to your images if you replace "partition=N" by "region=region_N".
So in the ideal case the root querier does only work with low cardinality pre-aggregated results so that materializing them should be most of the cost of the query already.
I dont fully understand the first bullet point, can you please elaborate what serializing and fragmenting mean and how it differs from the current approach?

@harry671003
Copy link
Author

Let me try to summarize my understanding of the approach in Thanos. Please correct me if I'm wrong.

  • Queriers in Thanos implements the Query gRPC service (code).
    • Has Query() and QueryRange() methods.
    • The query is passed as a string.
  • The "central" querier creates a distributed engine.
  • Plan phase
    • The DistributedExecutionOptimizer injects RemoteExecutions in the query plan.
    • The physical plan contains remote.Execution operators.
  • Execution phase:
    • During execution, the remote.Execution operator would call the Query() or QueryRange() gRPC APIs exposed by the remote querier passing the sub query string.
    • The remote querier executes the query and returns the fully materialized query results.

The difference between this approach and the POC approach is that in the sub query results are never fully materialized.

  • The remote queriers in the POC implements QueryServer service
    • They expose Series() and Next() methods
  • The root querier, calls Next() multiple times to read all []model.StepVector
  • In the child queriers, the partial query results are not fully materialized.

In Cortex, we're discussing about using this approach and I'm curious to understand if there are any trade offs. I'm also interested in learning what the Thanos community feels about this approach.

@harry671003
Copy link
Author

"An optimizer embedded in the query-frontend is aware of how the data in the storage layer (ingesters and store-gateways) are partitioned. The optimizer creates an optimized query plan."
I dont fully understand the first bullet point, can you please elaborate what serializing and fragmenting mean and how it differs from the current approach?

QueryPlanner

The query planner embedded in query-frontend in Cortex would use the PartitionInfo to create an optimized logical plan logicalplan.Plan. In my POC, the data stored in Cortex was split into disjoint partitions. A time-series will be present in only one partition.

The output of the planner is logicalplan.Plan which now has to be split into query fragments. A fragment is like a portion of the query plan. The logicalplan.Plan fragments gets serialized into protobuf format and are assigned to multiple queriers.
Note the difference is that plan itself serialized and send to the queriers.

I think these are the main differences between the approach in Thanos and the POC approach:

  • There is a central query planner that creates the optimized logical plan.
  • The plan is serialized and fragmented and assigned to the child queriers.
  • The child queriers will not materialize the query results. They stream batches of []model.StepVector to the parent querier.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants