Skip to content

Commit

Permalink
receiver: Expose stats for all tenants (#5470)
Browse files Browse the repository at this point in the history
* receiver: Expose stats for all tenants

Thanos Receiver supports the Prometheus tsdb status API and can expose
TSDB stats for a single tenant.

This commit extends that functionality and allows users to request
TSDB stats for all tenants using the all_tenants=true query parameter.

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Add back chunk count

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Simplify TSDBStats interface

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Return empty result for no stats

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed Jul 12, 2022
1 parent 90484c3 commit f8ef962
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 46 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -20,6 +20,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5420](https://github.com/thanos-io/thanos/pull/5420) Receive: Automatically remove stale tenants.
- [#5472](https://github.com/thanos-io/thanos/pull/5472) Receive: add new tenant metrics to example dashboard.
- [#5475](https://github.com/thanos-io/thanos/pull/5475) Compact/Store: Added `--block-files-concurrency` allowing to configure number of go routines for download/upload block files during compaction.
- [#5470](https://github.com/thanos-io/thanos/pull/5470) Receive: Implement exposing TSDB stats for all tenants

### Changed

Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/receive.go
Expand Up @@ -213,7 +213,7 @@ func runReceive(
TLSConfig: rwTLSConfig,
DialOpts: dialOpts,
ForwardTimeout: time.Duration(*conf.forwardTimeout),
GetTSDBStats: dbs.Stats,
TSDBStats: dbs,
})

grpcProbe := prober.NewGRPC()
Expand Down
85 changes: 51 additions & 34 deletions pkg/api/status/v1.go
Expand Up @@ -17,16 +17,15 @@
package status

import (
"fmt"
"math"
"net/http"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/tsdb"

"github.com/go-kit/log"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/index"
v1 "github.com/prometheus/prometheus/web/api/v1"
"github.com/thanos-io/thanos/pkg/api"
Expand All @@ -49,32 +48,38 @@ func convertStats(stats []index.Stat) []Stat {
return result
}

type TenantStats struct {
Tenant string
Stats *tsdb.Stats
}

// TSDBStatus has information of cardinality statistics from postings.
// TODO(fpetkovski): replace with upstream struct after dependency update.
type TSDBStatus struct {
Tenant string `json:"tenant"`
HeadStats v1.HeadStats `json:"headStats"`
SeriesCountByMetricName []Stat `json:"seriesCountByMetricName"`
LabelValueCountByLabelName []Stat `json:"labelValueCountByLabelName"`
MemoryInBytesByLabelName []Stat `json:"memoryInBytesByLabelName"`
SeriesCountByLabelValuePair []Stat `json:"seriesCountByLabelValuePair"`
}

type GetStatsFunc func(r *http.Request, statsByLabelName string) (*tsdb.Stats, error)
type GetStatsFunc func(r *http.Request, statsByLabelName string) ([]TenantStats, *api.ApiError)

type Options struct {
GetStats GetStatsFunc
Registry *prometheus.Registry
}

// TODO(fpetkovski): replace with upstream struct after dependency update.
type StatusAPI struct {
getTSDBStats GetStatsFunc
options Options
registry *prometheus.Registry
}

func New(opts Options) *StatusAPI {
return &StatusAPI{
getTSDBStats: opts.GetStats,
options: opts,
registry: opts.Registry,
}
}

Expand All @@ -84,43 +89,55 @@ func (sapi *StatusAPI) Register(r *route.Router, tracer opentracing.Tracer, logg
}

func (sapi *StatusAPI) httpServeStats(r *http.Request) (interface{}, []error, *api.ApiError) {
s, err := sapi.getTSDBStats(r, labels.MetricName)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: err}
stats, sterr := sapi.getTSDBStats(r, labels.MetricName)
if sterr != nil {
return nil, nil, sterr
}

if s == nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: fmt.Errorf("unknown tenant")}
result := make([]TSDBStatus, 0, len(stats))
if len(stats) == 0 {
return result, nil, nil
}

metrics, err := sapi.options.Registry.Gather()
metrics, err := sapi.registry.Gather()
if err != nil {
return nil, []error{err}, nil
}

chunkCount := int64(math.NaN())
tenantChunks := make(map[string]int64)
for _, mF := range metrics {
if *mF.Name == "prometheus_tsdb_head_chunks" {
m := *mF.Metric[0]
if m.Gauge != nil {
chunkCount = int64(m.Gauge.GetValue())
break
if *mF.Name != "prometheus_tsdb_head_chunks" {
continue
}

for _, metric := range mF.Metric {
for _, lbl := range metric.Label {
if *lbl.Name == "tenant" {
tenantChunks[*lbl.Value] = int64(metric.Gauge.GetValue())
}
}
}
}

return TSDBStatus{
HeadStats: v1.HeadStats{
NumSeries: s.NumSeries,
ChunkCount: chunkCount,
MinTime: s.MinTime,
MaxTime: s.MaxTime,
NumLabelPairs: s.IndexPostingStats.NumLabelPairs,
},
SeriesCountByMetricName: convertStats(s.IndexPostingStats.CardinalityMetricsStats),
LabelValueCountByLabelName: convertStats(s.IndexPostingStats.CardinalityLabelStats),
MemoryInBytesByLabelName: convertStats(s.IndexPostingStats.LabelValueStats),
SeriesCountByLabelValuePair: convertStats(s.IndexPostingStats.LabelValuePairsStats),
}, nil, nil

for _, s := range stats {
var chunkCount int64
if c, ok := tenantChunks[s.Tenant]; ok {
chunkCount = c
}
result = append(result, TSDBStatus{
Tenant: s.Tenant,
HeadStats: v1.HeadStats{
NumSeries: s.Stats.NumSeries,
ChunkCount: chunkCount,
MinTime: s.Stats.MinTime,
MaxTime: s.Stats.MaxTime,
NumLabelPairs: s.Stats.IndexPostingStats.NumLabelPairs,
},
SeriesCountByMetricName: convertStats(s.Stats.IndexPostingStats.CardinalityMetricsStats),
LabelValueCountByLabelName: convertStats(s.Stats.IndexPostingStats.CardinalityLabelStats),
MemoryInBytesByLabelName: convertStats(s.Stats.IndexPostingStats.LabelValueStats),
SeriesCountByLabelValuePair: convertStats(s.Stats.IndexPostingStats.LabelValuePairsStats),
})
}
return result, nil, nil
}
23 changes: 18 additions & 5 deletions pkg/receive/handler.go
Expand Up @@ -17,6 +17,7 @@ import (
"sync"
"time"

"github.com/thanos-io/thanos/pkg/api"
statusapi "github.com/thanos-io/thanos/pkg/api/status"
"github.com/thanos-io/thanos/pkg/logging"

Expand Down Expand Up @@ -57,6 +58,8 @@ const (
DefaultTenantLabel = "tenant_id"
// DefaultReplicaHeader is the default header used to designate the replica count of a write request.
DefaultReplicaHeader = "THANOS-REPLICA"
// AllTenantsQueryParam is the query parameter for getting TSDB stats for all tenants.
AllTenantsQueryParam = "all_tenants"
// Labels for metrics.
labelSuccess = "success"
labelError = "error"
Expand Down Expand Up @@ -95,7 +98,7 @@ type Options struct {
DialOpts []grpc.DialOption
ForwardTimeout time.Duration
RelabelConfigs []*relabel.Config
GetTSDBStats GetStatsFunc
TSDBStats TSDBStats
}

// Handler serves a Prometheus remote write receiving HTTP endpoint.
Expand Down Expand Up @@ -226,7 +229,7 @@ func NewHandler(logger log.Logger, o *Options) *Handler {

statusAPI := statusapi.New(statusapi.Options{
GetStats: h.getStats,
Registry: o.Registry,
Registry: h.options.Registry,
})
statusAPI.Register(h.router, o.Tracer, logger, ins, logging.NewHTTPServerMiddleware(logger))

Expand Down Expand Up @@ -271,17 +274,27 @@ func (h *Handler) testReady(f http.HandlerFunc) http.HandlerFunc {
}
}

func (h *Handler) getStats(r *http.Request, statsByLabelName string) (*tsdb.Stats, error) {
func (h *Handler) getStats(r *http.Request, statsByLabelName string) ([]statusapi.TenantStats, *api.ApiError) {
if !h.isReady() {
return nil, fmt.Errorf("service unavailable")
return nil, &api.ApiError{Typ: api.ErrorInternal, Err: fmt.Errorf("service unavailable")}
}

tenantID := r.Header.Get(h.options.TenantHeader)
getAllTenantStats := r.FormValue(AllTenantsQueryParam) == "true"
if getAllTenantStats && tenantID != "" {
err := fmt.Errorf("using both the %s parameter and the %s header is not supported", AllTenantsQueryParam, h.options.TenantHeader)
return nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}

if getAllTenantStats {
return h.options.TSDBStats.TenantStats(statsByLabelName), nil
}

if tenantID == "" {
tenantID = h.options.DefaultTenantID
}

return h.options.GetTSDBStats(tenantID, statsByLabelName), nil
return h.options.TSDBStats.TenantStats(statsByLabelName, tenantID), nil
}

// Close stops the Handler.
Expand Down
47 changes: 41 additions & 6 deletions pkg/receive/multitsdb.go
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"path"
"path/filepath"
"sort"
"sync"
"time"

Expand All @@ -19,6 +20,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/thanos/pkg/api/status"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"

Expand All @@ -32,7 +34,11 @@ import (
"github.com/thanos-io/thanos/pkg/store/labelpb"
)

type GetStatsFunc func(tenantID, statsByLabelName string) *tsdb.Stats
type TSDBStats interface {
// TenantStats returns TSDB head stats for the given tenants.
// If no tenantIDs are provided, stats for all tenants are returned.
TenantStats(statsByLabelName string, tenantIDs ...string) []status.TenantStats
}

type MultiTSDB struct {
dataDir string
Expand Down Expand Up @@ -384,16 +390,45 @@ func (t *MultiTSDB) TSDBExemplars() map[string]*exemplars.TSDB {
return res
}

func (t *MultiTSDB) Stats(tenantID, statsByLabelName string) *tsdb.Stats {
func (t *MultiTSDB) TenantStats(statsByLabelName string, tenantIDs ...string) []status.TenantStats {
t.mtx.RLock()
defer t.mtx.RUnlock()
if len(tenantIDs) == 0 {
for tenantID := range t.tenants {
tenantIDs = append(tenantIDs, tenantID)
}
}

tenant, ok := t.tenants[tenantID]
if !ok {
return nil
var (
mu sync.Mutex
wg sync.WaitGroup
result = make([]status.TenantStats, 0, len(t.tenants))
)
for _, tenantID := range tenantIDs {
tenantInstance, ok := t.tenants[tenantID]
if !ok {
continue
}

wg.Add(1)
go func(tenantID string, tenantInstance *tenant) {
defer wg.Done()
stats := tenantInstance.readyS.get().db.Head().Stats(statsByLabelName)

mu.Lock()
defer mu.Unlock()
result = append(result, status.TenantStats{
Tenant: tenantID,
Stats: stats,
})
}(tenantID, tenantInstance)
}
wg.Wait()

return tenant.readyS.get().db.Head().Stats(statsByLabelName)
sort.Slice(result, func(i, j int) bool {
return result[i].Tenant < result[j].Tenant
})
return result
}

func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant) error {
Expand Down
59 changes: 59 additions & 0 deletions pkg/receive/multitsdb_test.go
Expand Up @@ -477,6 +477,65 @@ func TestMultiTSDBPrune(t *testing.T) {
}
}

func TestMultiTSDBStats(t *testing.T) {
tests := []struct {
name string
tenants []string
expectedStats int
}{
{
name: "single tenant",
tenants: []string{"foo"},
expectedStats: 1,
},
{
name: "missing tenant",
tenants: []string{"missing-foo"},
expectedStats: 0,
},
{
name: "multiple tenants with missing tenant",
tenants: []string{"foo", "missing-foo"},
expectedStats: 1,
},
{
name: "all tenants",
tenants: []string{"foo", "bar", "baz"},
expectedStats: 3,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
dir, err := ioutil.TempDir("", "tsdb-stats")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

m := NewMultiTSDB(dir, log.NewNopLogger(), prometheus.NewRegistry(),
&tsdb.Options{
MinBlockDuration: (2 * time.Hour).Milliseconds(),
MaxBlockDuration: (2 * time.Hour).Milliseconds(),
RetentionDuration: (6 * time.Hour).Milliseconds(),
},
labels.FromStrings("replica", "test"),
"tenant_id",
nil,
false,
metadata.NoneFunc,
)
defer func() { testutil.Ok(t, m.Close()) }()

testutil.Ok(t, appendSample(m, "foo", time.Now()))
testutil.Ok(t, appendSample(m, "bar", time.Now()))
testutil.Ok(t, appendSample(m, "baz", time.Now()))
testutil.Equals(t, 3, len(m.TSDBStores()))

stats := m.TenantStats(labels.MetricName, test.tenants...)
testutil.Equals(t, test.expectedStats, len(stats))
})
}
}

func appendSample(m *MultiTSDB, tenant string, timestamp time.Time) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
Expand Down

0 comments on commit f8ef962

Please sign in to comment.