Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add query sharding support to ingesters #140

Merged
merged 1 commit into from
Aug 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
* [CHANGE] Ingester: default `-ingester.min-ready-duration` reduced from 1m to 15s. #126
* [CHANGE] Ingester: `-ingester.min-ready-duration` now start counting the delay after the ring's health checks have passed instead of when the ring client was started. #126
* [FEATURE] Query Frontend: Add `cortex_query_fetched_chunks_total` per-user counter to expose the number of chunks fetched as part of queries. This metric can be enabled with the `-frontend.query-stats-enabled` flag (or its respective YAML config option `query_stats_enabled`). #31
* [FEATURE] Query Frontend: Add experimental querysharding for the blocks storage. You can now enabled querysharding for blocks storage (`-store.engine=blocks`) by setting `-querier.parallelise-shardable-queries` to `true`. The following additional config and exported metrics have been added. #79 #80 #100
* [FEATURE] Query Frontend: Add experimental querysharding for the blocks storage. You can now enabled querysharding for blocks storage (`-store.engine=blocks`) by setting `-querier.parallelise-shardable-queries` to `true`. The following additional config and exported metrics have been added. #79 #80 #100 #140
* New config options:
* `-querier.total-shards`: The amount of shards to use when doing parallelisation via query sharding.
* `-blocks-storage.bucket-store.series-hash-cache-max-size-bytes`: Max size - in bytes - of the in-memory series hash cache in the store-gateway.
Expand Down
46 changes: 39 additions & 7 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/grafana/mimir/pkg/chunk/encoding"
"github.com/grafana/mimir/pkg/ingester/client"
"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/querier/querysharding"
"github.com/grafana/mimir/pkg/ring"
"github.com/grafana/mimir/pkg/storage/bucket"
mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb"
Expand Down Expand Up @@ -999,6 +1000,13 @@ func (i *Ingester) v2Query(ctx context.Context, req *client.QueryRequest) (*clie
return nil, err
}

// Check if query sharding is enabled for this query. If so, we need to remove the
// query sharding label from matchers and pass the shard info down the query execution path.
shard, matchers, err := querysharding.RemoveShardFromMatchers(matchers)
if err != nil {
return nil, err
}

i.metrics.queries.Inc()

db := i.getTSDB(userID)
Expand All @@ -1013,7 +1021,8 @@ func (i *Ingester) v2Query(ctx context.Context, req *client.QueryRequest) (*clie
defer q.Close()

// It's not required to return sorted series because series are sorted by the Mimir querier.
ss := q.Select(false, nil, matchers...)
hints := getSelectHintsForShard(int64(from), int64(through), shard)
ss := q.Select(false, hints, matchers...)
if ss.Err() != nil {
return nil, ss.Err()
}
Expand Down Expand Up @@ -1320,6 +1329,13 @@ func (i *Ingester) v2QueryStream(req *client.QueryRequest, stream client.Ingeste
return err
}

// Check if query sharding is enabled for this query. If so, we need to remove the
// query sharding label from matchers and pass the shard info down the query execution path.
shard, matchers, err := querysharding.RemoveShardFromMatchers(matchers)
if err != nil {
return err
}

i.metrics.queries.Inc()

db := i.getTSDB(userID)
Expand Down Expand Up @@ -1349,10 +1365,10 @@ func (i *Ingester) v2QueryStream(req *client.QueryRequest, stream client.Ingeste

if streamType == QueryStreamChunks {
level.Debug(spanlog).Log("msg", "using v2QueryStreamChunks")
numSeries, numSamples, err = i.v2QueryStreamChunks(ctx, db, int64(from), int64(through), matchers, stream)
numSeries, numSamples, err = i.v2QueryStreamChunks(ctx, db, int64(from), int64(through), matchers, shard, stream)
} else {
level.Debug(spanlog).Log("msg", "using v2QueryStreamSamples")
numSeries, numSamples, err = i.v2QueryStreamSamples(ctx, db, int64(from), int64(through), matchers, stream)
numSeries, numSamples, err = i.v2QueryStreamSamples(ctx, db, int64(from), int64(through), matchers, shard, stream)
}
if err != nil {
return err
Expand All @@ -1364,15 +1380,16 @@ func (i *Ingester) v2QueryStream(req *client.QueryRequest, stream client.Ingeste
return nil
}

func (i *Ingester) v2QueryStreamSamples(ctx context.Context, db *userTSDB, from, through int64, matchers []*labels.Matcher, stream client.Ingester_QueryStreamServer) (numSeries, numSamples int, _ error) {
func (i *Ingester) v2QueryStreamSamples(ctx context.Context, db *userTSDB, from, through int64, matchers []*labels.Matcher, shard *querysharding.ShardSelector, stream client.Ingester_QueryStreamServer) (numSeries, numSamples int, _ error) {
q, err := db.Querier(ctx, from, through)
if err != nil {
return 0, 0, err
}
defer q.Close()

// It's not required to return sorted series because series are sorted by the Mimir querier.
ss := q.Select(false, nil, matchers...)
hints := getSelectHintsForShard(from, through, shard)
ss := q.Select(false, hints, matchers...)
if ss.Err() != nil {
return 0, 0, ss.Err()
}
Expand Down Expand Up @@ -1433,15 +1450,16 @@ func (i *Ingester) v2QueryStreamSamples(ctx context.Context, db *userTSDB, from,
}

// v2QueryStream streams metrics from a TSDB. This implements the client.IngesterServer interface
func (i *Ingester) v2QueryStreamChunks(ctx context.Context, db *userTSDB, from, through int64, matchers []*labels.Matcher, stream client.Ingester_QueryStreamServer) (numSeries, numSamples int, _ error) {
func (i *Ingester) v2QueryStreamChunks(ctx context.Context, db *userTSDB, from, through int64, matchers []*labels.Matcher, shard *querysharding.ShardSelector, stream client.Ingester_QueryStreamServer) (numSeries, numSamples int, _ error) {
q, err := db.ChunkQuerier(ctx, from, through)
if err != nil {
return 0, 0, err
}
defer q.Close()

// It's not required to return sorted series because series are sorted by the Mimir querier.
ss := q.Select(false, nil, matchers...)
hints := getSelectHintsForShard(from, through, shard)
ss := q.Select(false, hints, matchers...)
if ss.Err() != nil {
return 0, 0, ss.Err()
}
Expand Down Expand Up @@ -2296,3 +2314,17 @@ func (i *Ingester) getInstanceLimits() *InstanceLimits {

return l
}

func getSelectHintsForShard(start, end int64, shard *querysharding.ShardSelector) *storage.SelectHints {
if shard == nil {
return nil
}

// If query sharding is enabled, we need to pass it along with hints.
return &storage.SelectHints{
Start: start,
End: end,
ShardIndex: shard.ShardIndex,
ShardCount: shard.ShardCount,
}
}