Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose tsdb status in receiver #5402

Merged
merged 3 commits into from Jun 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -20,6 +20,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5352](https://github.com/thanos-io/thanos/pull/5352) Cache: Add cache metrics to groupcache.
- [#5391](https://github.com/thanos-io/thanos/pull/5391) Receive: Add relabeling support.
- [#5408](https://github.com/thanos-io/thanos/pull/5391) Receive: Add support for consistent hashrings.
- [#5391](https://github.com/thanos-io/thanos/pull/5391) Receive: Implement api/v1/status/tsdb.

### Changed

Expand Down
1 change: 1 addition & 0 deletions cmd/thanos/receive.go
Expand Up @@ -212,6 +212,7 @@ func runReceive(
TLSConfig: rwTLSConfig,
DialOpts: dialOpts,
ForwardTimeout: time.Duration(*conf.forwardTimeout),
GetTSDBStats: dbs.Stats,
})

grpcProbe := prober.NewGRPC()
Expand Down
6 changes: 6 additions & 0 deletions docs/components/receive.md
Expand Up @@ -12,6 +12,12 @@ For more information please check out [initial design proposal](../proposals-don

> NOTE: As the block producer it's important to set correct "external labels" that will identify data block across Thanos clusters. See [external labels](../storage.md#external-labels) docs for details.

## TSDB stats

Thanos Receive supports getting TSDB stats using the `/api/v1/status/tsdb` endpoint. Use the `THANOS-TENANT` HTTP header to get stats for individual Tenants. The output format of the endpoint is compatible with [Prometheus API](https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats).

Note that each Thanos Receive will only expose local stats and replicated series will not be included in the response.

## Example

```bash
Expand Down
126 changes: 126 additions & 0 deletions pkg/api/status/v1.go
@@ -0,0 +1,126 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package status

import (
"fmt"
"math"
"net/http"

"github.com/go-kit/log"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/index"
v1 "github.com/prometheus/prometheus/web/api/v1"
"github.com/thanos-io/thanos/pkg/api"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/logging"
)

// Stat holds the information about individual cardinality.
type Stat struct {
Name string `json:"name"`
Value uint64 `json:"value"`
}

func convertStats(stats []index.Stat) []Stat {
result := make([]Stat, 0, len(stats))
for _, item := range stats {
item := Stat{Name: item.Name, Value: item.Count}
result = append(result, item)
}
return result
}

// TSDBStatus has information of cardinality statistics from postings.
type TSDBStatus struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could remove these structs if prometheus/prometheus#10783 gets merged.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was merged, we can re-use the struct now 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I need to go get the latest prometheus version from master/main, or do we need to wait for a new point release? I'm not sure how Thanos manages dependencies.

Copy link
Contributor

@yeya24 yeya24 Jun 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use the latest main directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am getting the following conflict

# github.com/cortexproject/cortex/pkg/querier/queryrange
../../go/pkg/mod/github.com/akanshat/cortex@v1.10.1-0.20211222182735-328fbeedd424/pkg/querier/queryrange/querysharding.go:221:3: not enough arguments in call to qs.engine.NewRangeQuery
	have (storage.Queryable, string, time.Time, time.Time, time.Duration)
	want (storage.Queryable, *promql.QueryOpts, string, time.Time, time.Time, time.Duration)

Seems that Prometheus has introduced some changes in public functions, and the cortex fork that Thanos uses is not up to date yet. Should I submit a PR there to update to latest prometheus?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, so while adding groupcache we had to make some changes in Thanos, which affected cortex imports just a little. Then we decided to update my fork of cortex and point Thanos to it, and once groupcache changes were merged we were going to create a PR to Cortex, to have those changes merged and point Thanos back to the original cortex.

Now #4651 is open, it's been reviewed by @GiedriusS and @bwplotka but not by any cortex maintainer yet. I am not sure what the next step is. @GiedriusS would you like to help me out with this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While waiting for that to get merged, should we maybe rebase the fork against the latest master in Cortex in order to get this PR unblocked? Otherwise, we will blocked whenever we try to update the prometheus/prometheus dependency.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for relase

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could rebase cortex fork.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's time to remove dep on Cortex, it's quite painful recently (:

HeadStats v1.HeadStats `json:"headStats"`
SeriesCountByMetricName []Stat `json:"seriesCountByMetricName"`
LabelValueCountByLabelName []Stat `json:"labelValueCountByLabelName"`
MemoryInBytesByLabelName []Stat `json:"memoryInBytesByLabelName"`
SeriesCountByLabelValuePair []Stat `json:"seriesCountByLabelValuePair"`
}

type GetStatsFunc func(r *http.Request, statsByLabelName string) (*tsdb.Stats, error)

type Options struct {
GetStats GetStatsFunc
Registry *prometheus.Registry
}

// TODO(fpetkovski): replace with upstream struct after dependency update.
type StatusAPI struct {
getTSDBStats GetStatsFunc
options Options
}

func New(opts Options) *StatusAPI {
return &StatusAPI{
getTSDBStats: opts.GetStats,
options: opts,
}
}

func (sapi *StatusAPI) Register(r *route.Router, tracer opentracing.Tracer, logger log.Logger, ins extpromhttp.InstrumentationMiddleware, logMiddleware *logging.HTTPServerMiddleware) {
instr := api.GetInstr(tracer, logger, ins, logMiddleware, false)
r.Get("/api/v1/status/tsdb", instr("tsdb_status", sapi.httpServeStats))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other federated APIs like rules, targets, exemplars are exposed as gRPC APIs at the store, and federations are done at the Querier level.
For this API do we have any plan to implement the gRPC version of it and then aggregated at the Query level?

Copy link
Contributor Author

@fpetkovski fpetkovski Jun 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we have this issue at the moment: #5395

Maybe we can discuss federation there, but I think it can be a good idea to do it. We just have to see how merging is going to work, and whether we want to merge the results in a single json object, or have one object per store target. I think we have the same question if we want to expose tsdb stats for all tenants.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding merging, I'm wondering how data replication for Receive will affect it. AFAIU, we use quorum-based logic for replication, so maybe merging this would lead to some inaccuracies in TSDB statistics for a tenant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes agreed. The stats can get very inaccurate if we merge across replicas.

}

func (sapi *StatusAPI) httpServeStats(r *http.Request) (interface{}, []error, *api.ApiError) {
s, err := sapi.getTSDBStats(r, labels.MetricName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this API can only fetch the TSDB stats for a specific tenant. Admins cannot use this API to to see the global TSDB stats (cross tenant).

Copy link
Contributor Author

@fpetkovski fpetkovski Jun 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's correct. We could extend the endpoint to provide a global view by traversing all tenants and merging the results. This could maybe happen when an explicit parameter is set.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Let's have an issue to track it after this pr is merged.

if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: err}
}

if s == nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: fmt.Errorf("unknown tenant")}
}

metrics, err := sapi.options.Registry.Gather()
if err != nil {
return nil, []error{err}, nil
}

chunkCount := int64(math.NaN())
for _, mF := range metrics {
if *mF.Name == "prometheus_tsdb_head_chunks" {
m := *mF.Metric[0]
if m.Gauge != nil {
chunkCount = int64(m.Gauge.GetValue())
break
}
}
}

return TSDBStatus{
HeadStats: v1.HeadStats{
NumSeries: s.NumSeries,
ChunkCount: chunkCount,
MinTime: s.MinTime,
MaxTime: s.MaxTime,
NumLabelPairs: s.IndexPostingStats.NumLabelPairs,
},
SeriesCountByMetricName: convertStats(s.IndexPostingStats.CardinalityMetricsStats),
LabelValueCountByLabelName: convertStats(s.IndexPostingStats.CardinalityLabelStats),
MemoryInBytesByLabelName: convertStats(s.IndexPostingStats.LabelValueStats),
SeriesCountByLabelValuePair: convertStats(s.IndexPostingStats.LabelValuePairsStats),
}, nil, nil

}
36 changes: 32 additions & 4 deletions pkg/receive/handler.go
Expand Up @@ -17,6 +17,9 @@ import (
"sync"
"time"

statusapi "github.com/thanos-io/thanos/pkg/api/status"
"github.com/thanos-io/thanos/pkg/logging"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -72,7 +75,7 @@ var (
type Options struct {
Writer *Writer
ListenAddress string
Registry prometheus.Registerer
Registry *prometheus.Registry
TenantHeader string
DefaultTenantID string
ReplicaHeader string
Expand All @@ -84,6 +87,7 @@ type Options struct {
DialOpts []grpc.DialOption
ForwardTimeout time.Duration
RelabelConfigs []*relabel.Config
GetTSDBStats GetStatsFunc
}

// Handler serves a Prometheus remote write receiving HTTP endpoint.
Expand Down Expand Up @@ -111,6 +115,11 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
logger = log.NewNopLogger()
}

var registerer prometheus.Registerer = nil
if o.Registry != nil {
registerer = o.Registry
}

h := &Handler{
logger: logger,
writer: o.Writer,
Expand All @@ -124,19 +133,19 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
Max: 30 * time.Second,
Jitter: true,
},
forwardRequests: promauto.With(o.Registry).NewCounterVec(
forwardRequests: promauto.With(registerer).NewCounterVec(
prometheus.CounterOpts{
Name: "thanos_receive_forward_requests_total",
Help: "The number of forward requests.",
}, []string{"result"},
),
replications: promauto.With(o.Registry).NewCounterVec(
replications: promauto.With(registerer).NewCounterVec(
prometheus.CounterOpts{
Name: "thanos_receive_replications_total",
Help: "The number of replication operations done by the receiver. The success of replication is fulfilled when a quorum is met.",
}, []string{"result"},
),
replicationFactor: promauto.With(o.Registry).NewGauge(
replicationFactor: promauto.With(registerer).NewGauge(
prometheus.GaugeOpts{
Name: "thanos_receive_replication_factor",
Help: "The number of times to replicate incoming write requests.",
Expand Down Expand Up @@ -173,6 +182,12 @@ func NewHandler(logger log.Logger, o *Options) *Handler {

h.router.Post("/api/v1/receive", instrf("receive", readyf(middleware.RequestID(http.HandlerFunc(h.receiveHTTP)))))

statusAPI := statusapi.New(statusapi.Options{
GetStats: h.getStats,
Registry: o.Registry,
})
statusAPI.Register(h.router, o.Tracer, logger, ins, logging.NewHTTPServerMiddleware(logger))

return h
}

Expand Down Expand Up @@ -214,6 +229,19 @@ func (h *Handler) testReady(f http.HandlerFunc) http.HandlerFunc {
}
}

func (h *Handler) getStats(r *http.Request, statsByLabelName string) (*tsdb.Stats, error) {
if !h.isReady() {
return nil, fmt.Errorf("service unavailable")
}

tenantID := r.Header.Get(h.options.TenantHeader)
if tenantID == "" {
tenantID = h.options.DefaultTenantID
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmmmmm this is going to hit us IMO, better to fail 🤔 I can imagine lots of confusion here.

But the trade-off is that it works for default receive with no configuration I guess... Maybe fixable with documentation.

}

return h.options.GetTSDBStats(tenantID, statsByLabelName), nil
}

// Close stops the Handler.
func (h *Handler) Close() {
if h.listener != nil {
Expand Down
14 changes: 14 additions & 0 deletions pkg/receive/multitsdb.go
Expand Up @@ -31,6 +31,8 @@ import (
"github.com/thanos-io/thanos/pkg/store/labelpb"
)

type GetStatsFunc func(tenantID, statsByLabelName string) *tsdb.Stats

type MultiTSDB struct {
dataDir string
logger log.Logger
Expand Down Expand Up @@ -289,6 +291,18 @@ func (t *MultiTSDB) TSDBExemplars() map[string]*exemplars.TSDB {
return res
}

func (t *MultiTSDB) Stats(tenantID, statsByLabelName string) *tsdb.Stats {
t.mtx.RLock()
defer t.mtx.RUnlock()

tenant, ok := t.tenants[tenantID]
if !ok {
return nil
}

return tenant.readyS.get().db.Head().Stats(statsByLabelName)
fpetkovski marked this conversation as resolved.
Show resolved Hide resolved
}

func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant) error {
reg := prometheus.WrapRegistererWith(prometheus.Labels{"tenant": tenantID}, t.reg)
lset := labelpb.ExtendSortedLabels(t.labels, labels.FromStrings(t.tenantLabelName, tenantID))
Expand Down