Skip to content

Commit

Permalink
receiver: Expose stats for all tenants
Browse files Browse the repository at this point in the history
Thanos Receiver supports the Prometheus tsdb status API and can expose
TSDB stats for a single tenant.

This commit extends that functionality and allows users to request
TSDB stats for all tenants using the all_tenants=true query parameter.
  • Loading branch information
fpetkovski committed Jul 5, 2022
1 parent 4de555d commit 8f5030d
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 49 deletions.
2 changes: 1 addition & 1 deletion cmd/thanos/receive.go
Expand Up @@ -213,7 +213,7 @@ func runReceive(
TLSConfig: rwTLSConfig,
DialOpts: dialOpts,
ForwardTimeout: time.Duration(*conf.forwardTimeout),
GetTSDBStats: dbs.Stats,
TSDBStats: dbs,
})

grpcProbe := prober.NewGRPC()
Expand Down
69 changes: 29 additions & 40 deletions pkg/api/status/v1.go
Expand Up @@ -18,15 +18,14 @@ package status

import (
"fmt"
"math"
"net/http"

"github.com/prometheus/prometheus/tsdb"

"github.com/go-kit/log"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/index"
v1 "github.com/prometheus/prometheus/web/api/v1"
"github.com/thanos-io/thanos/pkg/api"
Expand All @@ -49,32 +48,35 @@ func convertStats(stats []index.Stat) []Stat {
return result
}

type TenantStats struct {
Tenant string
Stats *tsdb.Stats
}

// TSDBStatus has information of cardinality statistics from postings.
// TODO(fpetkovski): replace with upstream struct after dependency update.
type TSDBStatus struct {
Tenant string `json:"tenant"`
HeadStats v1.HeadStats `json:"headStats"`
SeriesCountByMetricName []Stat `json:"seriesCountByMetricName"`
LabelValueCountByLabelName []Stat `json:"labelValueCountByLabelName"`
MemoryInBytesByLabelName []Stat `json:"memoryInBytesByLabelName"`
SeriesCountByLabelValuePair []Stat `json:"seriesCountByLabelValuePair"`
}

type GetStatsFunc func(r *http.Request, statsByLabelName string) (*tsdb.Stats, error)
type GetStatsFunc func(r *http.Request, statsByLabelName string) ([]TenantStats, *api.ApiError)

type Options struct {
GetStats GetStatsFunc
Registry *prometheus.Registry
}

// TODO(fpetkovski): replace with upstream struct after dependency update.
type StatusAPI struct {
getTSDBStats GetStatsFunc
options Options
}

func New(opts Options) *StatusAPI {
return &StatusAPI{
getTSDBStats: opts.GetStats,
options: opts,
}
}

Expand All @@ -84,43 +86,30 @@ func (sapi *StatusAPI) Register(r *route.Router, tracer opentracing.Tracer, logg
}

func (sapi *StatusAPI) httpServeStats(r *http.Request) (interface{}, []error, *api.ApiError) {
s, err := sapi.getTSDBStats(r, labels.MetricName)
stats, err := sapi.getTSDBStats(r, labels.MetricName)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: err}
return nil, nil, err
}

if s == nil {
if stats == nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: fmt.Errorf("unknown tenant")}
}

metrics, err := sapi.options.Registry.Gather()
if err != nil {
return nil, []error{err}, nil
}

chunkCount := int64(math.NaN())
for _, mF := range metrics {
if *mF.Name == "prometheus_tsdb_head_chunks" {
m := *mF.Metric[0]
if m.Gauge != nil {
chunkCount = int64(m.Gauge.GetValue())
break
}
}
result := make([]TSDBStatus, 0, len(stats))
for _, s := range stats {
result = append(result, TSDBStatus{
Tenant: s.Tenant,
HeadStats: v1.HeadStats{
NumSeries: s.Stats.NumSeries,
MinTime: s.Stats.MinTime,
MaxTime: s.Stats.MaxTime,
NumLabelPairs: s.Stats.IndexPostingStats.NumLabelPairs,
},
SeriesCountByMetricName: convertStats(s.Stats.IndexPostingStats.CardinalityMetricsStats),
LabelValueCountByLabelName: convertStats(s.Stats.IndexPostingStats.CardinalityLabelStats),
MemoryInBytesByLabelName: convertStats(s.Stats.IndexPostingStats.LabelValueStats),
SeriesCountByLabelValuePair: convertStats(s.Stats.IndexPostingStats.LabelValuePairsStats),
})
}

return TSDBStatus{
HeadStats: v1.HeadStats{
NumSeries: s.NumSeries,
ChunkCount: chunkCount,
MinTime: s.MinTime,
MaxTime: s.MaxTime,
NumLabelPairs: s.IndexPostingStats.NumLabelPairs,
},
SeriesCountByMetricName: convertStats(s.IndexPostingStats.CardinalityMetricsStats),
LabelValueCountByLabelName: convertStats(s.IndexPostingStats.CardinalityLabelStats),
MemoryInBytesByLabelName: convertStats(s.IndexPostingStats.LabelValueStats),
SeriesCountByLabelValuePair: convertStats(s.IndexPostingStats.LabelValuePairsStats),
}, nil, nil

return result, nil, nil
}
28 changes: 23 additions & 5 deletions pkg/receive/handler.go
Expand Up @@ -17,6 +17,8 @@ import (
"sync"
"time"

"github.com/thanos-io/thanos/pkg/api"

statusapi "github.com/thanos-io/thanos/pkg/api/status"
"github.com/thanos-io/thanos/pkg/logging"

Expand Down Expand Up @@ -57,6 +59,8 @@ const (
DefaultTenantLabel = "tenant_id"
// DefaultReplicaHeader is the default header used to designate the replica count of a write request.
DefaultReplicaHeader = "THANOS-REPLICA"
// AllTenantsQueryParam is the query parameter for getting TSDB stats for all tenants
AllTenantsQueryParam = "all_tenants"
// Labels for metrics.
labelSuccess = "success"
labelError = "error"
Expand Down Expand Up @@ -95,7 +99,7 @@ type Options struct {
DialOpts []grpc.DialOption
ForwardTimeout time.Duration
RelabelConfigs []*relabel.Config
GetTSDBStats GetStatsFunc
TSDBStats TSDBStats
}

// Handler serves a Prometheus remote write receiving HTTP endpoint.
Expand Down Expand Up @@ -226,7 +230,6 @@ func NewHandler(logger log.Logger, o *Options) *Handler {

statusAPI := statusapi.New(statusapi.Options{
GetStats: h.getStats,
Registry: o.Registry,
})
statusAPI.Register(h.router, o.Tracer, logger, ins, logging.NewHTTPServerMiddleware(logger))

Expand Down Expand Up @@ -271,17 +274,32 @@ func (h *Handler) testReady(f http.HandlerFunc) http.HandlerFunc {
}
}

func (h *Handler) getStats(r *http.Request, statsByLabelName string) (*tsdb.Stats, error) {
func (h *Handler) getStats(r *http.Request, statsByLabelName string) ([]statusapi.TenantStats, *api.ApiError) {
if !h.isReady() {
return nil, fmt.Errorf("service unavailable")
return nil, &api.ApiError{Typ: api.ErrorInternal, Err: fmt.Errorf("service unavailable")}
}

tenantID := r.Header.Get(h.options.TenantHeader)
getAllTenantStats := r.FormValue(AllTenantsQueryParam) == "true"
if getAllTenantStats && tenantID != "" {
err := fmt.Errorf("using both the %s parameter and the %s header is not supported", AllTenantsQueryParam, h.options.TenantHeader)
return nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}

if getAllTenantStats {
return h.options.TSDBStats.AllTenantStats(statsByLabelName), nil
}

if tenantID == "" {
tenantID = h.options.DefaultTenantID
}

return h.options.GetTSDBStats(tenantID, statsByLabelName), nil
stats := h.options.TSDBStats.SingleTenantStats(tenantID, statsByLabelName)
if stats == nil {
return nil, nil
}

return []statusapi.TenantStats{*stats}, nil
}

// Close stops the Handler.
Expand Down
43 changes: 40 additions & 3 deletions pkg/receive/multitsdb.go
Expand Up @@ -12,6 +12,8 @@ import (
"sync"
"time"

"github.com/thanos-io/thanos/pkg/api/status"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
Expand All @@ -32,7 +34,10 @@ import (
"github.com/thanos-io/thanos/pkg/store/labelpb"
)

type GetStatsFunc func(tenantID, statsByLabelName string) *tsdb.Stats
type TSDBStats interface {
SingleTenantStats(tenantID, statsByLabelName string) *status.TenantStats
AllTenantStats(statsByLabelName string) []status.TenantStats
}

type MultiTSDB struct {
dataDir string
Expand Down Expand Up @@ -375,7 +380,7 @@ func (t *MultiTSDB) TSDBExemplars() map[string]*exemplars.TSDB {
return res
}

func (t *MultiTSDB) Stats(tenantID, statsByLabelName string) *tsdb.Stats {
func (t *MultiTSDB) SingleTenantStats(tenantID, statsByLabelName string) *status.TenantStats {
t.mtx.RLock()
defer t.mtx.RUnlock()

Expand All @@ -384,7 +389,39 @@ func (t *MultiTSDB) Stats(tenantID, statsByLabelName string) *tsdb.Stats {
return nil
}

return tenant.readyS.get().db.Head().Stats(statsByLabelName)
stats := tenant.readyS.get().db.Head().Stats(statsByLabelName)
return &status.TenantStats{
Tenant: tenantID,
Stats: stats,
}
}

func (t *MultiTSDB) AllTenantStats(statsByLabelName string) []status.TenantStats {
t.mtx.RLock()
defer t.mtx.RUnlock()

var (
mu sync.Mutex
wg sync.WaitGroup
result = make([]status.TenantStats, 0, len(t.tenants))
)
for tenantID, tenantInstance := range t.tenants {
wg.Add(1)
go func(tenantID string, tenantInstance *tenant) {
defer wg.Done()
stats := tenantInstance.readyS.get().db.Head().Stats(statsByLabelName)

mu.Lock()
defer mu.Unlock()
result = append(result, status.TenantStats{
Tenant: tenantID,
Stats: stats,
})
}(tenantID, tenantInstance)
}
wg.Wait()

return result
}

func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant) error {
Expand Down

0 comments on commit 8f5030d

Please sign in to comment.