Skip to content

Commit

Permalink
Merge pull request #124256 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-24.1-124231

release-24.1: pgwire: add a metric for number of pipelined requests
  • Loading branch information
rafiss committed May 17, 2024
2 parents ef92f86 + 051500f commit 90981c0
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 13 deletions.
1 change: 1 addition & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -1479,6 +1479,7 @@
<tr><td>APPLICATION</td><td>sql.optimizer.plan_cache.hits.internal</td><td>Number of non-prepared statements for which a cached plan was used (internal queries)</td><td>SQL Internal Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.optimizer.plan_cache.misses</td><td>Number of non-prepared statements for which a cached plan was not used</td><td>SQL Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.optimizer.plan_cache.misses.internal</td><td>Number of non-prepared statements for which a cached plan was not used (internal queries)</td><td>SQL Internal Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.pgwire.pipeline.count</td><td>Number of pgwire commands received by the server that have not yet begun processing</td><td>Commands</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>sql.pgwire_cancel.ignored</td><td>Number of pgwire query cancel requests that were ignored due to rate limiting</td><td>Requests</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.pgwire_cancel.successful</td><td>Number of pgwire query cancel requests that were successful</td><td>Requests</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.pgwire_cancel.total</td><td>Number of pgwire query cancel requests</td><td>Requests</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
Expand Down
14 changes: 14 additions & 0 deletions pkg/sql/conn_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/ring"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -110,6 +111,10 @@ type StmtBuf struct {
// the buffer.
lastPos CmdPos
}

// PipelineCount, if non-nil, is a gauge that measures how many commands are
// in all client-facing StmtBuf instances.
PipelineCount *metric.Gauge
}

// Command is an interface implemented by all commands pushed by pgwire into the
Expand Down Expand Up @@ -506,6 +511,9 @@ func (buf *StmtBuf) Push(ctx context.Context, cmd Command) error {
}
buf.mu.data.AddLast(cmd)
buf.mu.lastPos++
if buf.PipelineCount != nil {
buf.PipelineCount.Inc(1)
}

buf.mu.cond.Signal()
return nil
Expand Down Expand Up @@ -593,6 +601,9 @@ func (buf *StmtBuf) AdvanceOne() CmdPos {
defer buf.mu.Unlock()
prev := buf.mu.curPos
buf.mu.curPos++
if buf.PipelineCount != nil {
buf.PipelineCount.Dec(1)
}
return prev
}

Expand Down Expand Up @@ -669,6 +680,9 @@ func (buf *StmtBuf) Rewind(ctx context.Context, pos CmdPos) {
if pos < buf.mu.startPos {
log.Fatalf(ctx, "attempting to rewind below buffer start")
}
if buf.PipelineCount != nil {
buf.PipelineCount.Inc(int64(buf.mu.curPos - pos))
}
buf.mu.curPos = pos
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/pgwire/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ go_library(
"@com_github_cockroachdb_redact//:redact",
"@com_github_go_ldap_ldap_v3//:ldap",
"@com_github_lib_pq//oid",
"@com_github_prometheus_client_model//go",
"@com_github_xdg_go_scram//:scram",
"@io_opentelemetry_go_otel//attribute",
"@org_golang_google_grpc//codes",
Expand Down Expand Up @@ -155,6 +156,7 @@ go_test(
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util",
"//pkg/util/ctxgroup",
"//pkg/util/duration",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand All @@ -174,6 +176,7 @@ go_test(
"@com_github_jackc_pgconn//:pgconn",
"@com_github_jackc_pgproto3_v2//:pgproto3",
"@com_github_jackc_pgx_v4//:pgx",
"@com_github_jackc_pgx_v5//:pgx",
"@com_github_lib_pq//:pq",
"@com_github_lib_pq//oid",
"@com_github_stretchr_testify//require",
Expand Down
168 changes: 160 additions & 8 deletions pkg/sql/pgwire/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,17 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/jackc/pgconn"
pgproto3 "github.com/jackc/pgproto3/v2"
pgx "github.com/jackc/pgx/v4"
"github.com/jackc/pgproto3/v2"
"github.com/jackc/pgx/v4"
pgx5 "github.com/jackc/pgx/v5"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -104,7 +106,7 @@ func TestConn(t *testing.T) {
})

server := newTestServer()
// Wait for the client to connect and perform the handshake.
// Wait for the client to connect.
netConn, err := waitForClientConn(ln)
if err != nil {
t.Fatal(err)
Expand All @@ -118,7 +120,7 @@ func TestConn(t *testing.T) {
)

// Run the conn's loop in the background - it will push commands to the
// buffer.
// buffer. serveImpl also performs the server handshake.
serveCtx, stopServe := context.WithCancel(ctx)
g.Go(func() error {
server.serveImpl(
Expand Down Expand Up @@ -188,6 +190,151 @@ func TestConn(t *testing.T) {
}
}

// TestPipelineMetric checks that we update the metric for commands in the
// stmtBuf.
func TestPipelineMetric(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Start a pgwire "server". We use a fake server here since we need to control
// exactly when statements are removed from the stmtBuf in the server.
addr := util.TestAddr
ln, err := net.Listen(addr.Network(), addr.String())
if err != nil {
t.Fatal(err)
}
serverAddr := ln.Addr()
log.Infof(context.Background(), "started listener on %s", serverAddr)

ctx, cancelConn := context.WithCancel(context.Background())
defer cancelConn()
connectGroup := ctxgroup.WithContext(ctx)

var pgxConn *pgx5.Conn
connectGroup.GoCtx(func(ctx context.Context) error {
host, ports, err := net.SplitHostPort(serverAddr.String())
if err != nil {
return err
}
port, err := strconv.Atoi(ports)
if err != nil {
return err
}
pgxConn, err = pgx5.Connect(
ctx,
fmt.Sprintf("postgresql://%s@%s:%d/system?sslmode=disable", username.RootUser, host, port),
)
if err != nil {
return err
}
return nil
})

server := newTestServer()
// Wait for the client to connect.
netConn, err := waitForClientConn(ln)
require.NoError(t, err)
serverSideConn := server.newConn(
ctx,
cancelConn,
netConn,
sql.SessionArgs{ConnResultsBufferSize: 16 << 10},
timeutil.Now(),
)

// Run the conn's loop in the background - it will push commands to the
// buffer. serveImpl also performs the server handshake.
serveCtx, stopServe := context.WithCancel(context.Background())
defer stopServe()
serveGroup := ctxgroup.WithContext(serveCtx)
serveGroup.GoCtx(func(ctx context.Context) error {
server.serveImpl(
ctx,
serverSideConn,
&mon.BoundAccount{}, /* reserved */
authOptions{testingSkipAuth: true, connType: hba.ConnHostAny},
clusterunique.ID{},
)
return nil
})

err = connectGroup.Wait()
require.NoError(t, err)
typeMap := pgxConn.TypeMap()
pipeline := pgxConn.PgConn().StartPipeline(ctx)

eqb := pgx5.ExtendedQueryBuilder{}
err = eqb.Build(typeMap, nil, []any{1, 2})
require.NoError(t, err)
pipeline.SendQueryParams(`select $1::int + $2::int`, eqb.ParamValues, nil, eqb.ParamFormats, eqb.ResultFormats)
err = eqb.Build(typeMap, nil, []any{3, 4, 5})
require.NoError(t, err)
pipeline.SendQueryParams(`select $1::int + $2::int + $3::int`, eqb.ParamValues, nil, eqb.ParamFormats, eqb.ResultFormats)
err = pipeline.Sync()
require.NoError(t, err)

// We need to poll until all the messages are received by the server. Note
// that the server will never actually send results back to the client, since
// we are just testing the stmtBuf, not the actual execution of the queries.
testutils.SucceedsSoon(t, func() error {
// Each query sends Prepare, Bind, Describe, and Execute. The Sync brings it
// to 9 total messages.
if n := serverSideConn.stmtBuf.PipelineCount.Value(); n != 9 {
return errors.Errorf("expected 9, got %d", n)
}
return nil
})

// Now we'll expect to receive the commands corresponding to the
// pipelined operations. This simulates the server processing the commands.
rd := sql.MakeStmtBufReader(&serverSideConn.stmtBuf)
expectPrepareStmt(ctx, t, "", "SELECT $1::INT8 + $2::INT8", &rd, serverSideConn)
expectBindStmt(ctx, t, "", &rd, serverSideConn)
expectDescribeStmt(ctx, t, "", pgwirebase.PreparePortal, &rd, serverSideConn)
expectExecPortal(ctx, t, "", &rd, serverSideConn)
require.EqualValues(t, 5, serverSideConn.stmtBuf.PipelineCount.Value())

// Send another query in the pipeline.
err = eqb.Build(typeMap, nil, []any{"abc"})
require.NoError(t, err)
pipeline.SendQueryParams(`select $1::text`, eqb.ParamValues, nil, eqb.ParamFormats, eqb.ResultFormats)
err = pipeline.Sync()
require.NoError(t, err)

// We need to poll until all the messages are received by the server. Note
// that the server will never actually send results back to the client, since
// we are just testing the stmtBuf, not the actual execution of the queries.
testutils.SucceedsSoon(t, func() error {
// Should have received Prepare, Bind, Describe, Execute and Sync.
if n := serverSideConn.stmtBuf.PipelineCount.Value(); n != 10 {
return errors.Errorf("expected 10, got %d", n)
}
return nil
})

// Process all of the commands that are in the pipeline.
expectPrepareStmt(ctx, t, "", "SELECT ($1::INT8 + $2::INT8) + $3::INT8", &rd, serverSideConn)
expectBindStmt(ctx, t, "", &rd, serverSideConn)
expectDescribeStmt(ctx, t, "", pgwirebase.PreparePortal, &rd, serverSideConn)
expectExecPortal(ctx, t, "", &rd, serverSideConn)
expectSync(ctx, t, &rd)
require.EqualValues(t, 5, serverSideConn.stmtBuf.PipelineCount.Value())

expectPrepareStmt(ctx, t, "", "SELECT $1::STRING", &rd, serverSideConn)
expectBindStmt(ctx, t, "", &rd, serverSideConn)
expectDescribeStmt(ctx, t, "", pgwirebase.PreparePortal, &rd, serverSideConn)
expectExecPortal(ctx, t, "", &rd, serverSideConn)
expectSync(ctx, t, &rd)
require.EqualValues(t, 0, serverSideConn.stmtBuf.PipelineCount.Value())

err = pipeline.Close()
require.NoError(t, err)
err = pgxConn.Close(ctx)
require.NoError(t, err)
err = serveGroup.Wait()
require.NoError(t, err)
}

func TestConnMessageTooBig(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -554,14 +701,16 @@ func newTestServer() *Server {
// waitForClientConn blocks until a client connects and performs the pgwire
// handshake. This emulates what pgwire.Server does.
func waitForClientConn(ln net.Listener) (net.Conn, error) {
conn, _, err := getSessionArgs(ln, false)
conn, _, err := getSessionArgs(ln, false /* trustRemoteAddr */, false /* sendResponse */)
return conn, err
}

// getSessionArgs blocks until a client connects and returns the connection
// together with session arguments or an error.
// together with session arguments or an error. If sendResponse is true,
// this also sends back a response to the client to indicate that the
// connection is ready.
func getSessionArgs(
ln net.Listener, trustRemoteAddr bool,
ln net.Listener, trustRemoteAddr bool, sendResponse bool,
) (netConn net.Conn, _ sql.SessionArgs, retErr error) {
for {
var err error
Expand All @@ -588,6 +737,9 @@ func getSessionArgs(
return nil, sql.SessionArgs{}, errors.Errorf("unexpected protocol version: %d", version)
}
defer func() {
if !sendResponse {
return
}
// Implement a fake pgwire connection handshake. Send the response
// after parsing the client-sent parameters.
c := &conn{conn: netConn}
Expand Down Expand Up @@ -1661,7 +1813,7 @@ func TestParseClientProvidedSessionParameters(t *testing.T) {
var args sql.SessionArgs
func() {
defer serverReceivedConn.Done()
netConn, args, err = getSessionArgs(ln, true /* trustRemoteAddr */)
netConn, args, err = getSessionArgs(ln, true /* trustRemoteAddr */, true /* sendResponse */)
}()
clientDone.Wait()
tc.assert(t, args, err)
Expand Down
21 changes: 16 additions & 5 deletions pkg/sql/pgwire/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/cockroachdb/redact"
io_prometheus_client "github.com/prometheus/client_model/go"
"go.opentelemetry.io/otel/attribute"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -165,6 +166,13 @@ var (
Measurement: "Requests",
Unit: metric.Unit_COUNT,
}
MetaPGWirePipelineCount = metric.Metadata{
Name: "sql.pgwire.pipeline.count",
Help: "Number of pgwire commands received by the server that have not yet begun processing",
Measurement: "Commands",
Unit: metric.Unit_COUNT,
MetricType: io_prometheus_client.MetricType_GAUGE,
}
)

const (
Expand Down Expand Up @@ -262,6 +270,7 @@ type tenantSpecificMetrics struct {
Conns *metric.Gauge
NewConns *metric.Counter
ConnsWaitingToHash *metric.Gauge
PGWirePipelineCount *metric.Gauge
ConnLatency metric.IHistogram
ConnFailures *metric.Counter
PGWireCancelTotalCount *metric.Counter
Expand All @@ -275,11 +284,12 @@ func newTenantSpecificMetrics(
sqlMemMetrics sql.MemoryMetrics, histogramWindow time.Duration,
) *tenantSpecificMetrics {
return &tenantSpecificMetrics{
BytesInCount: metric.NewCounter(MetaBytesIn),
BytesOutCount: metric.NewCounter(MetaBytesOut),
Conns: metric.NewGauge(MetaConns),
NewConns: metric.NewCounter(MetaNewConns),
ConnsWaitingToHash: metric.NewGauge(MetaConnsWaitingToHash),
BytesInCount: metric.NewCounter(MetaBytesIn),
BytesOutCount: metric.NewCounter(MetaBytesOut),
Conns: metric.NewGauge(MetaConns),
NewConns: metric.NewCounter(MetaNewConns),
ConnsWaitingToHash: metric.NewGauge(MetaConnsWaitingToHash),
PGWirePipelineCount: metric.NewGauge(MetaPGWirePipelineCount),
ConnLatency: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePreferHdrLatency,
Metadata: MetaConnLatency,
Expand Down Expand Up @@ -843,6 +853,7 @@ func (s *Server) newConn(
alwaysLogAuthActivity: s.testingAuthLogEnabled.Get(),
}
c.stmtBuf.Init()
c.stmtBuf.PipelineCount = s.tenantMetrics.PGWirePipelineCount
c.res.released = true
c.writerState.fi.buf = &c.writerState.buf
c.writerState.fi.lastFlushed = -1
Expand Down

0 comments on commit 90981c0

Please sign in to comment.