Skip to content

Commit

Permalink
Add query sharding support to ingesters
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci committed Aug 17, 2021
1 parent c084db8 commit c3f4560
Show file tree
Hide file tree
Showing 3 changed files with 281 additions and 62 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
* [CHANGE] Ingester: default `-ingester.min-ready-duration` reduced from 1m to 15s. #126
* [CHANGE] Ingester: `-ingester.min-ready-duration` now start counting the delay after the ring's health checks have passed instead of when the ring client was started. #126
* [FEATURE] Query Frontend: Add `cortex_query_fetched_chunks_total` per-user counter to expose the number of chunks fetched as part of queries. This metric can be enabled with the `-frontend.query-stats-enabled` flag (or its respective YAML config option `query_stats_enabled`). #31
* [FEATURE] Query Frontend: Add experimental querysharding for the blocks storage. You can now enabled querysharding for blocks storage (`-store.engine=blocks`) by setting `-querier.parallelise-shardable-queries` to `true`. The following additional config and exported metrics have been added. #79 #80 #100
* [FEATURE] Query Frontend: Add experimental querysharding for the blocks storage. You can now enabled querysharding for blocks storage (`-store.engine=blocks`) by setting `-querier.parallelise-shardable-queries` to `true`. The following additional config and exported metrics have been added. #79 #80 #100 #140
* New config options:
* `-querier.total-shards`: The amount of shards to use when doing parallelisation via query sharding.
* `-blocks-storage.bucket-store.series-hash-cache-max-size-bytes`: Max size - in bytes - of the in-memory series hash cache in the store-gateway.
Expand Down
46 changes: 39 additions & 7 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/grafana/mimir/pkg/chunk/encoding"
"github.com/grafana/mimir/pkg/ingester/client"
"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/querier/querysharding"
"github.com/grafana/mimir/pkg/ring"
"github.com/grafana/mimir/pkg/storage/bucket"
mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb"
Expand Down Expand Up @@ -999,6 +1000,13 @@ func (i *Ingester) v2Query(ctx context.Context, req *client.QueryRequest) (*clie
return nil, err
}

// Check if query sharding is enabled for this query. If so, we need to remove the
// query sharding label from matchers and pass the shard info down the query execution path.
shard, matchers, err := querysharding.RemoveShardFromMatchers(matchers)
if err != nil {
return nil, err
}

i.metrics.queries.Inc()

db := i.getTSDB(userID)
Expand All @@ -1013,7 +1021,8 @@ func (i *Ingester) v2Query(ctx context.Context, req *client.QueryRequest) (*clie
defer q.Close()

// It's not required to return sorted series because series are sorted by the Mimir querier.
ss := q.Select(false, nil, matchers...)
hints := getSelectHintsForShard(int64(from), int64(through), shard)
ss := q.Select(false, hints, matchers...)
if ss.Err() != nil {
return nil, ss.Err()
}
Expand Down Expand Up @@ -1320,6 +1329,13 @@ func (i *Ingester) v2QueryStream(req *client.QueryRequest, stream client.Ingeste
return err
}

// Check if query sharding is enabled for this query. If so, we need to remove the
// query sharding label from matchers and pass the shard info down the query execution path.
shard, matchers, err := querysharding.RemoveShardFromMatchers(matchers)
if err != nil {
return err
}

i.metrics.queries.Inc()

db := i.getTSDB(userID)
Expand Down Expand Up @@ -1349,10 +1365,10 @@ func (i *Ingester) v2QueryStream(req *client.QueryRequest, stream client.Ingeste

if streamType == QueryStreamChunks {
level.Debug(spanlog).Log("msg", "using v2QueryStreamChunks")
numSeries, numSamples, err = i.v2QueryStreamChunks(ctx, db, int64(from), int64(through), matchers, stream)
numSeries, numSamples, err = i.v2QueryStreamChunks(ctx, db, int64(from), int64(through), matchers, shard, stream)
} else {
level.Debug(spanlog).Log("msg", "using v2QueryStreamSamples")
numSeries, numSamples, err = i.v2QueryStreamSamples(ctx, db, int64(from), int64(through), matchers, stream)
numSeries, numSamples, err = i.v2QueryStreamSamples(ctx, db, int64(from), int64(through), matchers, shard, stream)
}
if err != nil {
return err
Expand All @@ -1364,15 +1380,16 @@ func (i *Ingester) v2QueryStream(req *client.QueryRequest, stream client.Ingeste
return nil
}

func (i *Ingester) v2QueryStreamSamples(ctx context.Context, db *userTSDB, from, through int64, matchers []*labels.Matcher, stream client.Ingester_QueryStreamServer) (numSeries, numSamples int, _ error) {
func (i *Ingester) v2QueryStreamSamples(ctx context.Context, db *userTSDB, from, through int64, matchers []*labels.Matcher, shard *querysharding.ShardSelector, stream client.Ingester_QueryStreamServer) (numSeries, numSamples int, _ error) {
q, err := db.Querier(ctx, from, through)
if err != nil {
return 0, 0, err
}
defer q.Close()

// It's not required to return sorted series because series are sorted by the Mimir querier.
ss := q.Select(false, nil, matchers...)
hints := getSelectHintsForShard(from, through, shard)
ss := q.Select(false, hints, matchers...)
if ss.Err() != nil {
return 0, 0, ss.Err()
}
Expand Down Expand Up @@ -1433,15 +1450,16 @@ func (i *Ingester) v2QueryStreamSamples(ctx context.Context, db *userTSDB, from,
}

// v2QueryStream streams metrics from a TSDB. This implements the client.IngesterServer interface
func (i *Ingester) v2QueryStreamChunks(ctx context.Context, db *userTSDB, from, through int64, matchers []*labels.Matcher, stream client.Ingester_QueryStreamServer) (numSeries, numSamples int, _ error) {
func (i *Ingester) v2QueryStreamChunks(ctx context.Context, db *userTSDB, from, through int64, matchers []*labels.Matcher, shard *querysharding.ShardSelector, stream client.Ingester_QueryStreamServer) (numSeries, numSamples int, _ error) {
q, err := db.ChunkQuerier(ctx, from, through)
if err != nil {
return 0, 0, err
}
defer q.Close()

// It's not required to return sorted series because series are sorted by the Mimir querier.
ss := q.Select(false, nil, matchers...)
hints := getSelectHintsForShard(from, through, shard)
ss := q.Select(false, hints, matchers...)
if ss.Err() != nil {
return 0, 0, ss.Err()
}
Expand Down Expand Up @@ -2296,3 +2314,17 @@ func (i *Ingester) getInstanceLimits() *InstanceLimits {

return l
}

func getSelectHintsForShard(start, end int64, shard *querysharding.ShardSelector) *storage.SelectHints {
if shard == nil {
return nil
}

// If query sharding is enabled, we need to pass it along with hints.
return &storage.SelectHints{
Start: start,
End: end,
ShardIndex: shard.ShardIndex,
ShardCount: shard.ShardCount,
}
}

0 comments on commit c3f4560

Please sign in to comment.