Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics: remove metrics proxy #6948

Merged
merged 3 commits into from Sep 6, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 1 addition & 3 deletions dm/dumpling/dumpling.go
Expand Up @@ -30,7 +30,6 @@ import (
"go.uber.org/atomic"
"go.uber.org/zap"

"github.com/pingcap/tiflow/dm/pkg/metricsproxy"
"github.com/pingcap/tiflow/engine/pkg/promutil"

"github.com/pingcap/tiflow/dm/config"
Expand Down Expand Up @@ -81,8 +80,7 @@ func (m *Dumpling) Init(ctx context.Context) error {
// will register and deregister metrics, so we must use NoopRegistry
// to avoid duplicated registration.
m.metricProxies = &metricProxies{}
m.metricProxies.dumplingExitWithErrorCounter = metricsproxy.NewCounterVec(
m.cfg.MetricsFactory,
m.metricProxies.dumplingExitWithErrorCounter = m.cfg.MetricsFactory.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dm",
Subsystem: "dumpling",
Expand Down
10 changes: 5 additions & 5 deletions dm/dumpling/metrics.go
Expand Up @@ -14,18 +14,18 @@
package dumpling

import (
"github.com/pingcap/tiflow/dm/pkg/metricsproxy"
"github.com/pingcap/tiflow/engine/pkg/promutil"
"github.com/prometheus/client_golang/prometheus"
)

type metricProxies struct {
dumplingExitWithErrorCounter *metricsproxy.CounterVecProxy
dumplingExitWithErrorCounter *prometheus.CounterVec
}

var f = &promutil.PromFactory{}

var defaultMetricProxies = &metricProxies{
dumplingExitWithErrorCounter: metricsproxy.NewCounterVec(
&promutil.PromFactory{},
dumplingExitWithErrorCounter: f.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dm",
Subsystem: "dumpling",
Expand All @@ -41,5 +41,5 @@ func RegisterMetrics(registry *prometheus.Registry) {

func (m *Dumpling) removeLabelValuesWithTaskInMetrics(task, source string) {
labels := prometheus.Labels{"task": task, "source_id": source}
m.metricProxies.dumplingExitWithErrorCounter.DeleteAllAboutLabels(labels)
m.metricProxies.dumplingExitWithErrorCounter.DeletePartialMatch(labels)
}
43 changes: 21 additions & 22 deletions dm/loader/metrics.go
Expand Up @@ -16,21 +16,20 @@ package loader
import (
"github.com/pingcap/tiflow/engine/pkg/promutil"
"github.com/prometheus/client_golang/prometheus"

"github.com/pingcap/tiflow/dm/pkg/metricsproxy"
)

var (
f = &promutil.PromFactory{}
// should error.
tidbExecutionErrorCounter = metricsproxy.NewCounterVec(&promutil.PromFactory{},
tidbExecutionErrorCounter = f.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dm",
Subsystem: "loader",
Name: "tidb_execution_error",
Help: "Total count of tidb execution errors",
}, []string{"task", "source_id"})

queryHistogram = metricsproxy.NewHistogramVec(&promutil.PromFactory{},
queryHistogram = f.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dm",
Subsystem: "loader",
Expand All @@ -39,7 +38,7 @@ var (
Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25),
}, []string{"task", "source_id"})

txnHistogram = metricsproxy.NewHistogramVec(&promutil.PromFactory{},
txnHistogram = f.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dm",
Subsystem: "loader",
Expand All @@ -48,7 +47,7 @@ var (
Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25),
}, []string{"task", "worker", "source_id", "target_schema", "target_table"})

stmtHistogram = metricsproxy.NewHistogramVec(&promutil.PromFactory{},
stmtHistogram = f.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dm",
Subsystem: "loader",
Expand All @@ -57,31 +56,31 @@ var (
Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25),
}, []string{"type", "task"})

dataFileGauge = metricsproxy.NewGaugeVec(&promutil.PromFactory{},
dataFileGauge = f.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Subsystem: "loader",
Name: "data_file_gauge",
Help: "data files in total",
}, []string{"task", "source_id"})

tableGauge = metricsproxy.NewGaugeVec(&promutil.PromFactory{},
tableGauge = f.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Subsystem: "loader",
Name: "table_gauge",
Help: "tables in total",
}, []string{"task", "source_id"})

dataSizeGauge = metricsproxy.NewGaugeVec(&promutil.PromFactory{},
dataSizeGauge = f.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Subsystem: "loader",
Name: "data_size_gauge",
Help: "data size in total",
}, []string{"task", "source_id"})

progressGauge = metricsproxy.NewGaugeVec(&promutil.PromFactory{},
progressGauge = f.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Subsystem: "loader",
Expand All @@ -90,15 +89,15 @@ var (
}, []string{"task", "source_id"})

// should alert.
loaderExitWithErrorCounter = metricsproxy.NewCounterVec(&promutil.PromFactory{},
loaderExitWithErrorCounter = f.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dm",
Subsystem: "loader",
Name: "exit_with_error_count",
Help: "counter for loader exits with error",
}, []string{"task", "source_id"})

remainingTimeGauge = metricsproxy.NewGaugeVec(&promutil.PromFactory{},
remainingTimeGauge = f.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Subsystem: "loader",
Expand All @@ -122,14 +121,14 @@ func RegisterMetrics(registry *prometheus.Registry) {
}

func (l *Loader) removeLabelValuesWithTaskInMetrics(task string) {
tidbExecutionErrorCounter.DeleteAllAboutLabels(prometheus.Labels{"task": task})
txnHistogram.DeleteAllAboutLabels(prometheus.Labels{"task": task})
queryHistogram.DeleteAllAboutLabels(prometheus.Labels{"task": task})
stmtHistogram.DeleteAllAboutLabels(prometheus.Labels{"task": task})
dataFileGauge.DeleteAllAboutLabels(prometheus.Labels{"task": task})
tableGauge.DeleteAllAboutLabels(prometheus.Labels{"task": task})
dataSizeGauge.DeleteAllAboutLabels(prometheus.Labels{"task": task})
progressGauge.DeleteAllAboutLabels(prometheus.Labels{"task": task})
loaderExitWithErrorCounter.DeleteAllAboutLabels(prometheus.Labels{"task": task})
remainingTimeGauge.DeleteAllAboutLabels(prometheus.Labels{"task": task})
tidbExecutionErrorCounter.DeletePartialMatch(prometheus.Labels{"task": task})
txnHistogram.DeletePartialMatch(prometheus.Labels{"task": task})
queryHistogram.DeletePartialMatch(prometheus.Labels{"task": task})
stmtHistogram.DeletePartialMatch(prometheus.Labels{"task": task})
dataFileGauge.DeletePartialMatch(prometheus.Labels{"task": task})
tableGauge.DeletePartialMatch(prometheus.Labels{"task": task})
dataSizeGauge.DeletePartialMatch(prometheus.Labels{"task": task})
progressGauge.DeletePartialMatch(prometheus.Labels{"task": task})
loaderExitWithErrorCounter.DeletePartialMatch(prometheus.Labels{"task": task})
remainingTimeGauge.DeletePartialMatch(prometheus.Labels{"task": task})
}
15 changes: 7 additions & 8 deletions dm/master/metrics/metrics.go
Expand Up @@ -20,8 +20,6 @@ import (
cpu "github.com/pingcap/tidb-tools/pkg/utils"
"github.com/pingcap/tiflow/engine/pkg/promutil"
"github.com/prometheus/client_golang/prometheus"

"github.com/pingcap/tiflow/dm/pkg/metricsproxy"
)

// used for ddlPendingCounter, no "Resolved" lock because they will be
Expand All @@ -48,7 +46,8 @@ const (
)

var (
workerState = metricsproxy.NewGaugeVec(&promutil.PromFactory{},
f = &promutil.PromFactory{}
workerState = f.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Subsystem: "master",
Expand All @@ -64,23 +63,23 @@ var (
Help: "the cpu usage of master",
})

ddlPendingCounter = metricsproxy.NewGaugeVec(&promutil.PromFactory{},
ddlPendingCounter = f.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Subsystem: "master",
Name: "ddl_state_number",
Help: "number of pending DDL in different states, Un-synced (waiting all upstream), Synced (all upstream finished, waiting all downstream)",
}, []string{"task", "type"})

ddlErrCounter = metricsproxy.NewCounterVec(&promutil.PromFactory{},
ddlErrCounter = f.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dm",
Subsystem: "master",
Name: "shard_ddl_error",
Help: "number of shard DDL lock/operation error",
}, []string{"task", "type"})

workerEventErrCounter = metricsproxy.NewCounterVec(&promutil.PromFactory{},
workerEventErrCounter = f.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dm",
Subsystem: "master",
Expand Down Expand Up @@ -137,7 +136,7 @@ func ReportWorkerStage(name string, state float64) {

// RemoveWorkerState cleans state of deleted worker.
func RemoveWorkerState(name string) {
workerState.DeleteAllAboutLabels(prometheus.Labels{"worker": name})
workerState.DeletePartialMatch(prometheus.Labels{"worker": name})
}

// ReportDDLPending inc/dec by 1 to ddlPendingCounter.
Expand All @@ -152,7 +151,7 @@ func ReportDDLPending(task, oldStatus, newStatus string) {

// RemoveDDLPending removes all counter of this task.
func RemoveDDLPending(task string) {
ddlPendingCounter.DeleteAllAboutLabels(prometheus.Labels{"task": task})
ddlPendingCounter.DeletePartialMatch(prometheus.Labels{"task": task})
}

// ReportDDLError is a setter for ddlErrCounter.
Expand Down
6 changes: 3 additions & 3 deletions dm/pkg/conn/baseconn.go
Expand Up @@ -23,11 +23,11 @@ import (
gmysql "github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-sql-driver/mysql"
"github.com/pingcap/failpoint"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"

tcontext "github.com/pingcap/tiflow/dm/pkg/context"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/metricsproxy"
"github.com/pingcap/tiflow/dm/pkg/retry"
"github.com/pingcap/tiflow/dm/pkg/terror"
"github.com/pingcap/tiflow/dm/pkg/utils"
Expand Down Expand Up @@ -117,7 +117,7 @@ func (conn *BaseConn) QuerySQL(tctx *tcontext.Context, query string, args ...int
// return
// 1. failed: (the index of sqls executed error, error)
// 2. succeed: (len(sqls), nil).
func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, hVec *metricsproxy.HistogramVecProxy, task string, ignoreErr func(error) bool, queries []string, args ...[]interface{}) (int, error) {
func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, hVec *prometheus.HistogramVec, task string, ignoreErr func(error) bool, queries []string, args ...[]interface{}) (int, error) {
// inject an error to trigger retry, this should be placed before the real execution of the SQL statement.
failpoint.Inject("retryableError", func(val failpoint.Value) {
if mark, ok := val.(string); ok {
Expand Down Expand Up @@ -216,7 +216,7 @@ func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, hVec *me
// return
// 1. failed: (the index of sqls executed error, error)
// 2. succeed: (len(sqls), nil).
func (conn *BaseConn) ExecuteSQL(tctx *tcontext.Context, hVec *metricsproxy.HistogramVecProxy, task string, queries []string, args ...[]interface{}) (int, error) {
func (conn *BaseConn) ExecuteSQL(tctx *tcontext.Context, hVec *prometheus.HistogramVec, task string, queries []string, args ...[]interface{}) (int, error) {
return conn.ExecuteSQLWithIgnoreError(tctx, hVec, task, nil, queries, args...)
}

Expand Down
5 changes: 3 additions & 2 deletions dm/pkg/conn/baseconn_test.go
Expand Up @@ -19,7 +19,6 @@ import (
"testing"

tcontext "github.com/pingcap/tiflow/dm/pkg/context"
"github.com/pingcap/tiflow/dm/pkg/metricsproxy"
"github.com/pingcap/tiflow/dm/pkg/retry"
"github.com/pingcap/tiflow/dm/pkg/terror"
"github.com/pingcap/tiflow/engine/pkg/promutil"
Expand All @@ -37,7 +36,9 @@ var _ = Suite(&testBaseConnSuite{})

type testBaseConnSuite struct{}

var testStmtHistogram = metricsproxy.NewHistogramVec(&promutil.PromFactory{},
var f = &promutil.PromFactory{}

var testStmtHistogram = f.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dm",
Subsystem: "conn",
Expand Down
105 changes: 0 additions & 105 deletions dm/pkg/metricsproxy/countervec.go

This file was deleted.