Skip to content

Commit

Permalink
Expose tsdb status in receiver
Browse files Browse the repository at this point in the history
This commit implements the api/v1/status/tsdb API in the Receiver.

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed Jun 8, 2022
1 parent 586449e commit 9d598ab
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -18,6 +18,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

- [#5352](https://github.com/thanos-io/thanos/pull/5352) Cache: Add cache metrics to groupcache.
- [#5391](https://github.com/thanos-io/thanos/pull/5391) Receive: Add relabeling support.
- [#5391](https://github.com/thanos-io/thanos/pull/5391) Receive: Implement api/v1/status/tsdb.

### Changed

Expand Down
1 change: 1 addition & 0 deletions cmd/thanos/receive.go
Expand Up @@ -211,6 +211,7 @@ func runReceive(
TLSConfig: rwTLSConfig,
DialOpts: dialOpts,
ForwardTimeout: time.Duration(*conf.forwardTimeout),
GetTSDBStats: dbs.Stats,
})

grpcProbe := prober.NewGRPC()
Expand Down
125 changes: 125 additions & 0 deletions pkg/api/status/v1.go
@@ -0,0 +1,125 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package status

import (
"fmt"
"math"
"net/http"

"github.com/go-kit/log"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/index"
v1 "github.com/prometheus/prometheus/web/api/v1"
"github.com/thanos-io/thanos/pkg/api"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/logging"
)

// Stat holds the information about individual cardinality.
type Stat struct {
Name string `json:"name"`
Value uint64 `json:"value"`
}

func convertStats(stats []index.Stat) []Stat {
result := make([]Stat, 0, len(stats))
for _, item := range stats {
item := Stat{Name: item.Name, Value: item.Count}
result = append(result, item)
}
return result
}

// TSDBStatus has information of cardinality statistics from postings.
type TSDBStatus struct {
HeadStats v1.HeadStats `json:"headStats"`
SeriesCountByMetricName []Stat `json:"seriesCountByMetricName"`
LabelValueCountByLabelName []Stat `json:"labelValueCountByLabelName"`
MemoryInBytesByLabelName []Stat `json:"memoryInBytesByLabelName"`
SeriesCountByLabelValuePair []Stat `json:"seriesCountByLabelValuePair"`
}

type GetStatsFunc func(r *http.Request, statsByLabelName string) (*tsdb.Stats, error)

type Options struct {
GetStats GetStatsFunc
Registry *prometheus.Registry
}

type StatusAPI struct {
getTSDBStats GetStatsFunc
options Options
}

func New(opts Options) *StatusAPI {
return &StatusAPI{
getTSDBStats: opts.GetStats,
options: opts,
}
}

func (sapi *StatusAPI) Register(r *route.Router, tracer opentracing.Tracer, logger log.Logger, ins extpromhttp.InstrumentationMiddleware, logMiddleware *logging.HTTPServerMiddleware) {
instr := api.GetInstr(tracer, logger, ins, logMiddleware, false)
r.Get("/api/v1/status/tsdb", instr("tsdb_status", sapi.httpServeStats))
}

func (sapi *StatusAPI) httpServeStats(r *http.Request) (interface{}, []error, *api.ApiError) {
s, err := sapi.getTSDBStats(r, labels.MetricName)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: err}
}

if s == nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: fmt.Errorf("unknown tenant")}
}

metrics, err := sapi.options.Registry.Gather()
if err != nil {
return nil, []error{err}, nil
}

chunkCount := int64(math.NaN())
for _, mF := range metrics {
if *mF.Name == "prometheus_tsdb_head_chunks" {
m := *mF.Metric[0]
if m.Gauge != nil {
chunkCount = int64(m.Gauge.GetValue())
break
}
}
}

return TSDBStatus{
HeadStats: v1.HeadStats{
NumSeries: s.NumSeries,
ChunkCount: chunkCount,
MinTime: s.MinTime,
MaxTime: s.MaxTime,
NumLabelPairs: s.IndexPostingStats.NumLabelPairs,
},
SeriesCountByMetricName: convertStats(s.IndexPostingStats.CardinalityMetricsStats),
LabelValueCountByLabelName: convertStats(s.IndexPostingStats.CardinalityLabelStats),
MemoryInBytesByLabelName: convertStats(s.IndexPostingStats.LabelValueStats),
SeriesCountByLabelValuePair: convertStats(s.IndexPostingStats.LabelValuePairsStats),
}, nil, nil

}
25 changes: 24 additions & 1 deletion pkg/receive/handler.go
Expand Up @@ -17,6 +17,9 @@ import (
"sync"
"time"

statusapi "github.com/thanos-io/thanos/pkg/api/status"
"github.com/thanos-io/thanos/pkg/logging"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -72,7 +75,7 @@ var (
type Options struct {
Writer *Writer
ListenAddress string
Registry prometheus.Registerer
Registry *prometheus.Registry
TenantHeader string
DefaultTenantID string
ReplicaHeader string
Expand All @@ -84,6 +87,7 @@ type Options struct {
DialOpts []grpc.DialOption
ForwardTimeout time.Duration
RelabelConfigs []*relabel.Config
GetTSDBStats GetStatsFunc
}

// Handler serves a Prometheus remote write receiving HTTP endpoint.
Expand Down Expand Up @@ -173,6 +177,12 @@ func NewHandler(logger log.Logger, o *Options) *Handler {

h.router.Post("/api/v1/receive", instrf("receive", readyf(middleware.RequestID(http.HandlerFunc(h.receiveHTTP)))))

statusAPI := statusapi.New(statusapi.Options{
GetStats: h.getStats,
Registry: o.Registry,
})
statusAPI.Register(h.router, o.Tracer, logger, ins, logging.NewHTTPServerMiddleware(logger))

return h
}

Expand Down Expand Up @@ -214,6 +224,19 @@ func (h *Handler) testReady(f http.HandlerFunc) http.HandlerFunc {
}
}

func (h *Handler) getStats(r *http.Request, statsByLabelName string) (*tsdb.Stats, error) {
if !h.isReady() {
return nil, fmt.Errorf("service unavailable")
}

tenantID := r.Header.Get(h.options.TenantHeader)
if tenantID == "" {
tenantID = h.options.DefaultTenantID
}

return h.options.GetTSDBStats(tenantID, statsByLabelName), nil
}

// Close stops the Handler.
func (h *Handler) Close() {
if h.listener != nil {
Expand Down
14 changes: 14 additions & 0 deletions pkg/receive/multitsdb.go
Expand Up @@ -31,6 +31,8 @@ import (
"github.com/thanos-io/thanos/pkg/store/labelpb"
)

type GetStatsFunc func(tenantID, statsByLabelName string) *tsdb.Stats

type MultiTSDB struct {
dataDir string
logger log.Logger
Expand Down Expand Up @@ -289,6 +291,18 @@ func (t *MultiTSDB) TSDBExemplars() map[string]*exemplars.TSDB {
return res
}

func (t *MultiTSDB) Stats(tenantID, statsByLabelName string) *tsdb.Stats {
t.mtx.RLock()
defer t.mtx.RUnlock()

tenant, ok := t.tenants[tenantID]
if !ok {
return nil
}

return tenant.readyS.get().db.Head().Stats(statsByLabelName)
}

func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant) error {
reg := prometheus.WrapRegistererWith(prometheus.Labels{"tenant": tenantID}, t.reg)
lset := labelpb.ExtendSortedLabels(t.labels, labels.FromStrings(t.tenantLabelName, tenantID))
Expand Down

0 comments on commit 9d598ab

Please sign in to comment.