Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

receiver: Expose stats for all tenants #5470

Merged
merged 4 commits into from Jul 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -20,6 +20,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5420](https://github.com/thanos-io/thanos/pull/5420) Receive: Automatically remove stale tenants.
- [#5472](https://github.com/thanos-io/thanos/pull/5472) Receive: add new tenant metrics to example dashboard.
- [#5475](https://github.com/thanos-io/thanos/pull/5475) Compact/Store: Added `--block-files-concurrency` allowing to configure number of go routines for download/upload block files during compaction.
- [#5470](https://github.com/thanos-io/thanos/pull/5470) Receive: Implement exposing TSDB stats for all tenants

### Changed

Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/receive.go
Expand Up @@ -213,7 +213,7 @@ func runReceive(
TLSConfig: rwTLSConfig,
DialOpts: dialOpts,
ForwardTimeout: time.Duration(*conf.forwardTimeout),
GetTSDBStats: dbs.Stats,
TSDBStats: dbs,
})

grpcProbe := prober.NewGRPC()
Expand Down
85 changes: 51 additions & 34 deletions pkg/api/status/v1.go
Expand Up @@ -17,16 +17,15 @@
package status

import (
"fmt"
"math"
"net/http"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/tsdb"
yeya24 marked this conversation as resolved.
Show resolved Hide resolved

"github.com/go-kit/log"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/index"
v1 "github.com/prometheus/prometheus/web/api/v1"
"github.com/thanos-io/thanos/pkg/api"
Expand All @@ -49,32 +48,38 @@ func convertStats(stats []index.Stat) []Stat {
return result
}

type TenantStats struct {
Tenant string
Stats *tsdb.Stats
}

// TSDBStatus has information of cardinality statistics from postings.
// TODO(fpetkovski): replace with upstream struct after dependency update.
type TSDBStatus struct {
Tenant string `json:"tenant"`
HeadStats v1.HeadStats `json:"headStats"`
SeriesCountByMetricName []Stat `json:"seriesCountByMetricName"`
LabelValueCountByLabelName []Stat `json:"labelValueCountByLabelName"`
MemoryInBytesByLabelName []Stat `json:"memoryInBytesByLabelName"`
SeriesCountByLabelValuePair []Stat `json:"seriesCountByLabelValuePair"`
}

type GetStatsFunc func(r *http.Request, statsByLabelName string) (*tsdb.Stats, error)
type GetStatsFunc func(r *http.Request, statsByLabelName string) ([]TenantStats, *api.ApiError)

type Options struct {
GetStats GetStatsFunc
Registry *prometheus.Registry
}

// TODO(fpetkovski): replace with upstream struct after dependency update.
type StatusAPI struct {
getTSDBStats GetStatsFunc
options Options
registry *prometheus.Registry
}

func New(opts Options) *StatusAPI {
return &StatusAPI{
getTSDBStats: opts.GetStats,
options: opts,
registry: opts.Registry,
}
}

Expand All @@ -84,43 +89,55 @@ func (sapi *StatusAPI) Register(r *route.Router, tracer opentracing.Tracer, logg
}

func (sapi *StatusAPI) httpServeStats(r *http.Request) (interface{}, []error, *api.ApiError) {
s, err := sapi.getTSDBStats(r, labels.MetricName)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: err}
stats, sterr := sapi.getTSDBStats(r, labels.MetricName)
if sterr != nil {
return nil, nil, sterr
}

if s == nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: fmt.Errorf("unknown tenant")}
result := make([]TSDBStatus, 0, len(stats))
if len(stats) == 0 {
return result, nil, nil
}

metrics, err := sapi.options.Registry.Gather()
metrics, err := sapi.registry.Gather()
if err != nil {
return nil, []error{err}, nil
}

chunkCount := int64(math.NaN())
fpetkovski marked this conversation as resolved.
Show resolved Hide resolved
tenantChunks := make(map[string]int64)
for _, mF := range metrics {
if *mF.Name == "prometheus_tsdb_head_chunks" {
m := *mF.Metric[0]
if m.Gauge != nil {
chunkCount = int64(m.Gauge.GetValue())
break
if *mF.Name != "prometheus_tsdb_head_chunks" {
continue
}

for _, metric := range mF.Metric {
for _, lbl := range metric.Label {
if *lbl.Name == "tenant" {
tenantChunks[*lbl.Value] = int64(metric.Gauge.GetValue())
}
}
}
}

return TSDBStatus{
HeadStats: v1.HeadStats{
NumSeries: s.NumSeries,
ChunkCount: chunkCount,
MinTime: s.MinTime,
MaxTime: s.MaxTime,
NumLabelPairs: s.IndexPostingStats.NumLabelPairs,
},
SeriesCountByMetricName: convertStats(s.IndexPostingStats.CardinalityMetricsStats),
LabelValueCountByLabelName: convertStats(s.IndexPostingStats.CardinalityLabelStats),
MemoryInBytesByLabelName: convertStats(s.IndexPostingStats.LabelValueStats),
SeriesCountByLabelValuePair: convertStats(s.IndexPostingStats.LabelValuePairsStats),
}, nil, nil

for _, s := range stats {
var chunkCount int64
if c, ok := tenantChunks[s.Tenant]; ok {
chunkCount = c
}
result = append(result, TSDBStatus{
Tenant: s.Tenant,
HeadStats: v1.HeadStats{
NumSeries: s.Stats.NumSeries,
ChunkCount: chunkCount,
MinTime: s.Stats.MinTime,
MaxTime: s.Stats.MaxTime,
NumLabelPairs: s.Stats.IndexPostingStats.NumLabelPairs,
},
SeriesCountByMetricName: convertStats(s.Stats.IndexPostingStats.CardinalityMetricsStats),
LabelValueCountByLabelName: convertStats(s.Stats.IndexPostingStats.CardinalityLabelStats),
MemoryInBytesByLabelName: convertStats(s.Stats.IndexPostingStats.LabelValueStats),
SeriesCountByLabelValuePair: convertStats(s.Stats.IndexPostingStats.LabelValuePairsStats),
})
}
return result, nil, nil
}
23 changes: 18 additions & 5 deletions pkg/receive/handler.go
Expand Up @@ -17,6 +17,7 @@ import (
"sync"
"time"

"github.com/thanos-io/thanos/pkg/api"
statusapi "github.com/thanos-io/thanos/pkg/api/status"
yeya24 marked this conversation as resolved.
Show resolved Hide resolved
"github.com/thanos-io/thanos/pkg/logging"

Expand Down Expand Up @@ -57,6 +58,8 @@ const (
DefaultTenantLabel = "tenant_id"
// DefaultReplicaHeader is the default header used to designate the replica count of a write request.
DefaultReplicaHeader = "THANOS-REPLICA"
// AllTenantsQueryParam is the query parameter for getting TSDB stats for all tenants.
AllTenantsQueryParam = "all_tenants"
// Labels for metrics.
labelSuccess = "success"
labelError = "error"
Expand Down Expand Up @@ -95,7 +98,7 @@ type Options struct {
DialOpts []grpc.DialOption
ForwardTimeout time.Duration
RelabelConfigs []*relabel.Config
GetTSDBStats GetStatsFunc
TSDBStats TSDBStats
}

// Handler serves a Prometheus remote write receiving HTTP endpoint.
Expand Down Expand Up @@ -226,7 +229,7 @@ func NewHandler(logger log.Logger, o *Options) *Handler {

statusAPI := statusapi.New(statusapi.Options{
GetStats: h.getStats,
Registry: o.Registry,
Registry: h.options.Registry,
})
statusAPI.Register(h.router, o.Tracer, logger, ins, logging.NewHTTPServerMiddleware(logger))

Expand Down Expand Up @@ -271,17 +274,27 @@ func (h *Handler) testReady(f http.HandlerFunc) http.HandlerFunc {
}
}

func (h *Handler) getStats(r *http.Request, statsByLabelName string) (*tsdb.Stats, error) {
func (h *Handler) getStats(r *http.Request, statsByLabelName string) ([]statusapi.TenantStats, *api.ApiError) {
if !h.isReady() {
return nil, fmt.Errorf("service unavailable")
return nil, &api.ApiError{Typ: api.ErrorInternal, Err: fmt.Errorf("service unavailable")}
}

tenantID := r.Header.Get(h.options.TenantHeader)
getAllTenantStats := r.FormValue(AllTenantsQueryParam) == "true"
if getAllTenantStats && tenantID != "" {
err := fmt.Errorf("using both the %s parameter and the %s header is not supported", AllTenantsQueryParam, h.options.TenantHeader)
return nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}

if getAllTenantStats {
return h.options.TSDBStats.TenantStats(statsByLabelName), nil
}

if tenantID == "" {
tenantID = h.options.DefaultTenantID
}

return h.options.GetTSDBStats(tenantID, statsByLabelName), nil
return h.options.TSDBStats.TenantStats(statsByLabelName, tenantID), nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is still no way to query multiple tenants (not all) from the API. Do we want to provide this capability to users?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thought was that getting stats for all tenants should be good enough for most users that want to identify where high cardinality series come from. But we can add it if you think the use case is going to be common. We can also wait to see whether users open an issue for it.

Copy link
Contributor

@yeya24 yeya24 Jul 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great if we can make it more extensible to support those use cases at this stage. If you think the current implementation is good to support it in the future then I am good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think we should be good to extend functionality in the future. We can pass multiple tenant IDs here. So the main question will be how the users will provide the tenant IDs, whether as comma-separated values in the THANOS-TENANT header or something else.

}

// Close stops the Handler.
Expand Down
47 changes: 41 additions & 6 deletions pkg/receive/multitsdb.go
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"path"
"path/filepath"
"sort"
"sync"
"time"

Expand All @@ -19,6 +20,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/thanos/pkg/api/status"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"

Expand All @@ -32,7 +34,11 @@ import (
"github.com/thanos-io/thanos/pkg/store/labelpb"
)

type GetStatsFunc func(tenantID, statsByLabelName string) *tsdb.Stats
type TSDBStats interface {
// TenantStats returns TSDB head stats for the given tenants.
// If no tenantIDs are provided, stats for all tenants are returned.
TenantStats(statsByLabelName string, tenantIDs ...string) []status.TenantStats
}

type MultiTSDB struct {
dataDir string
Expand Down Expand Up @@ -384,16 +390,45 @@ func (t *MultiTSDB) TSDBExemplars() map[string]*exemplars.TSDB {
return res
}

func (t *MultiTSDB) Stats(tenantID, statsByLabelName string) *tsdb.Stats {
func (t *MultiTSDB) TenantStats(statsByLabelName string, tenantIDs ...string) []status.TenantStats {
t.mtx.RLock()
defer t.mtx.RUnlock()
if len(tenantIDs) == 0 {
for tenantID := range t.tenants {
tenantIDs = append(tenantIDs, tenantID)
}
}

tenant, ok := t.tenants[tenantID]
if !ok {
return nil
var (
mu sync.Mutex
wg sync.WaitGroup
result = make([]status.TenantStats, 0, len(t.tenants))
)
for _, tenantID := range tenantIDs {
tenantInstance, ok := t.tenants[tenantID]
if !ok {
continue
}

wg.Add(1)
go func(tenantID string, tenantInstance *tenant) {
defer wg.Done()
stats := tenantInstance.readyS.get().db.Head().Stats(statsByLabelName)

mu.Lock()
defer mu.Unlock()
result = append(result, status.TenantStats{
Tenant: tenantID,
Stats: stats,
})
}(tenantID, tenantInstance)
}
wg.Wait()

return tenant.readyS.get().db.Head().Stats(statsByLabelName)
sort.Slice(result, func(i, j int) bool {
return result[i].Tenant < result[j].Tenant
})
return result
yeya24 marked this conversation as resolved.
Show resolved Hide resolved
}

func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant) error {
Expand Down
59 changes: 59 additions & 0 deletions pkg/receive/multitsdb_test.go
Expand Up @@ -477,6 +477,65 @@ func TestMultiTSDBPrune(t *testing.T) {
}
}

func TestMultiTSDBStats(t *testing.T) {
tests := []struct {
name string
tenants []string
expectedStats int
}{
{
name: "single tenant",
tenants: []string{"foo"},
expectedStats: 1,
},
{
name: "missing tenant",
tenants: []string{"missing-foo"},
expectedStats: 0,
},
{
name: "multiple tenants with missing tenant",
tenants: []string{"foo", "missing-foo"},
expectedStats: 1,
},
{
name: "all tenants",
tenants: []string{"foo", "bar", "baz"},
expectedStats: 3,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
dir, err := ioutil.TempDir("", "tsdb-stats")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

m := NewMultiTSDB(dir, log.NewNopLogger(), prometheus.NewRegistry(),
&tsdb.Options{
MinBlockDuration: (2 * time.Hour).Milliseconds(),
MaxBlockDuration: (2 * time.Hour).Milliseconds(),
RetentionDuration: (6 * time.Hour).Milliseconds(),
},
labels.FromStrings("replica", "test"),
"tenant_id",
nil,
false,
metadata.NoneFunc,
)
defer func() { testutil.Ok(t, m.Close()) }()

testutil.Ok(t, appendSample(m, "foo", time.Now()))
testutil.Ok(t, appendSample(m, "bar", time.Now()))
testutil.Ok(t, appendSample(m, "baz", time.Now()))
testutil.Equals(t, 3, len(m.TSDBStores()))

stats := m.TenantStats(labels.MetricName, test.tenants...)
testutil.Equals(t, test.expectedStats, len(stats))
})
}
}

func appendSample(m *MultiTSDB, tenant string, timestamp time.Time) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
Expand Down