From 9dc010f2878594fc03fc6979dad16e8a39480a93 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Wed, 23 Oct 2019 13:46:30 +0800 Subject: [PATCH] Add span.context.destination.* Add span.context fields: - destination.address - destination.port - destination.service.type - destination.service.name - destination.service.resource Implement support for (client) HTTP, Elasticsearch, PostgreSQL and MySQL (via apmsql or GORM) spans. --- internal/apmhttputil/remoteaddr.go | 34 ++++++- internal/apmhttputil/remoteaddr_test.go | 23 +++++ internal/apmhttputil/url.go | 3 + model/marshal_fastjson.go | 89 +++++++++++++++++++ model/model.go | 31 +++++++ modelwriter.go | 8 ++ module/apmelasticsearch/client.go | 8 ++ module/apmelasticsearch/client_test.go | 32 +++++++ .../integration/elastic_integration_test.go | 84 +++++++++++++++++ .../internal/integration/go.mod | 1 + .../internal/integration/go.sum | 2 + module/apmgorm/apmgorm_test.go | 20 ++++- module/apmgorm/context.go | 1 + module/apmhttp/client_test.go | 14 ++- module/apmot/tracer_test.go | 9 ++ module/apmsql/conn.go | 7 ++ module/apmsql/dsn.go | 6 ++ module/apmsql/mysql/mysql_test.go | 9 ++ module/apmsql/mysql/parser.go | 12 +++ module/apmsql/mysql/parser_test.go | 20 ++++- module/apmsql/pq/parser.go | 29 ++++++ module/apmsql/pq/parser_test.go | 65 +++++++++++--- module/apmsql/pq/pq_test.go | 9 ++ spancontext.go | 73 ++++++++++++++- spancontext_test.go | 75 ++++++++++++++++ 25 files changed, 644 insertions(+), 20 deletions(-) create mode 100644 module/apmelasticsearch/internal/integration/elastic_integration_test.go diff --git a/internal/apmhttputil/remoteaddr.go b/internal/apmhttputil/remoteaddr.go index a7cd080cd..e79400e6a 100644 --- a/internal/apmhttputil/remoteaddr.go +++ b/internal/apmhttputil/remoteaddr.go @@ -19,10 +19,42 @@ package apmhttputil import ( "net/http" + "strconv" ) -// RemoteAddr returns the remote (peer) socket address for the HTTP request. +// RemoteAddr returns the remote (peer) socket address for req, +// a server HTTP request. func RemoteAddr(req *http.Request) string { remoteAddr, _ := splitHost(req.RemoteAddr) return remoteAddr } + +// DestinationAddr returns the destination server address and port +// for req, a client HTTP request. +// +// If req.URL.Host contains a port it will be returned, and otherwise +// the default port according to req.URL.Scheme will be returned. If +// the included port is not a valid integer, or no port is included +// and the scheme is unknown, the returned port value will be zero. +func DestinationAddr(req *http.Request) (string, int) { + host, strport := splitHost(req.URL.Host) + var port int + if strport != "" { + port, _ = strconv.Atoi(strport) + } else { + port = SchemeDefaultPort(req.URL.Scheme) + } + return host, port +} + +// SchemeDefaultPort returns the default port for the given URI scheme, +// if known, or 0 otherwise. +func SchemeDefaultPort(scheme string) int { + switch scheme { + case "http": + return 80 + case "https": + return 443 + } + return 0 +} diff --git a/internal/apmhttputil/remoteaddr_test.go b/internal/apmhttputil/remoteaddr_test.go index 184202ad5..73f12a8a9 100644 --- a/internal/apmhttputil/remoteaddr_test.go +++ b/internal/apmhttputil/remoteaddr_test.go @@ -19,9 +19,11 @@ package apmhttputil_test import ( "net/http" + "net/url" "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.elastic.co/apm/internal/apmhttputil" ) @@ -35,3 +37,24 @@ func TestRemoteAddr(t *testing.T) { req.Header.Set("X-Real-IP", "127.1.2.3") assert.Equal(t, "::1", apmhttputil.RemoteAddr(req)) } + +func TestDestinationAddr(t *testing.T) { + test := func(u, expectAddr string, expectPort int) { + t.Run(u, func(t *testing.T) { + url, err := url.Parse(u) + require.NoError(t, err) + + addr, port := apmhttputil.DestinationAddr(&http.Request{URL: url}) + assert.Equal(t, expectAddr, addr) + assert.Equal(t, expectPort, port) + }) + } + test("http://127.0.0.1:80", "127.0.0.1", 80) + test("http://127.0.0.1", "127.0.0.1", 80) + test("https://127.0.0.1:443", "127.0.0.1", 443) + test("https://127.0.0.1", "127.0.0.1", 443) + test("https://[::1]", "::1", 443) + test("https://[::1]:1234", "::1", 1234) + test("gopher://gopher.invalid:70", "gopher.invalid", 70) + test("gopher://gopher.invalid", "gopher.invalid", 0) // default unknown +} diff --git a/internal/apmhttputil/url.go b/internal/apmhttputil/url.go index a11dfb55d..45ea94181 100644 --- a/internal/apmhttputil/url.go +++ b/internal/apmhttputil/url.go @@ -98,6 +98,9 @@ func splitHost(in string) (host, port string) { } host, port, err := net.SplitHostPort(in) if err != nil { + if n := len(in); n > 1 && in[0] == '[' && in[n-1] == ']' { + in = in[1 : n-1] + } return in, "" } return host, port diff --git a/model/marshal_fastjson.go b/model/marshal_fastjson.go index 9f079d8b3..2b0cb7d50 100644 --- a/model/marshal_fastjson.go +++ b/model/marshal_fastjson.go @@ -486,6 +486,18 @@ func (v *SpanContext) MarshalFastJSON(w *fastjson.Writer) error { firstErr = err } } + if v.Destination != nil { + const prefix = ",\"destination\":" + if first { + first = false + w.RawString(prefix[1:]) + } else { + w.RawString(prefix) + } + if err := v.Destination.MarshalFastJSON(w); err != nil && firstErr == nil { + firstErr = err + } + } if v.HTTP != nil { const prefix = ",\"http\":" if first { @@ -514,6 +526,83 @@ func (v *SpanContext) MarshalFastJSON(w *fastjson.Writer) error { return firstErr } +func (v *SpanDestinationContext) MarshalFastJSON(w *fastjson.Writer) error { + var firstErr error + w.RawByte('{') + first := true + if v.Address != "" { + const prefix = ",\"address\":" + if first { + first = false + w.RawString(prefix[1:]) + } else { + w.RawString(prefix) + } + w.String(v.Address) + } + if v.Port != 0 { + const prefix = ",\"port\":" + if first { + first = false + w.RawString(prefix[1:]) + } else { + w.RawString(prefix) + } + w.Int64(int64(v.Port)) + } + if v.Service != nil { + const prefix = ",\"service\":" + if first { + first = false + w.RawString(prefix[1:]) + } else { + w.RawString(prefix) + } + if err := v.Service.MarshalFastJSON(w); err != nil && firstErr == nil { + firstErr = err + } + } + w.RawByte('}') + return firstErr +} + +func (v *SpanDestinationServiceContext) MarshalFastJSON(w *fastjson.Writer) error { + w.RawByte('{') + first := true + if v.Name != "" { + const prefix = ",\"name\":" + if first { + first = false + w.RawString(prefix[1:]) + } else { + w.RawString(prefix) + } + w.String(v.Name) + } + if v.Resource != "" { + const prefix = ",\"resource\":" + if first { + first = false + w.RawString(prefix[1:]) + } else { + w.RawString(prefix) + } + w.String(v.Resource) + } + if v.Type != "" { + const prefix = ",\"type\":" + if first { + first = false + w.RawString(prefix[1:]) + } else { + w.RawString(prefix) + } + w.String(v.Type) + } + w.RawByte('}') + return nil +} + func (v *DatabaseSpanContext) MarshalFastJSON(w *fastjson.Writer) error { w.RawByte('{') first := true diff --git a/model/model.go b/model/model.go index 85e3e7972..8ae3478dd 100644 --- a/model/model.go +++ b/model/model.go @@ -263,6 +263,9 @@ type Span struct { // SpanContext holds contextual information relating to the span. type SpanContext struct { + // Destination holds information about a destination service. + Destination *SpanDestinationContext `json:"destination,omitempty"` + // Database holds contextual information for database // operation spans. Database *DatabaseSpanContext `json:"db,omitempty"` @@ -274,6 +277,34 @@ type SpanContext struct { Tags IfaceMap `json:"tags,omitempty"` } +// SpanDestinationContext holds contextual information about the destination +// for a span that relates to an operation involving an external service. +type SpanDestinationContext struct { + // Address holds the network address of the destination service. + // This may be a hostname, FQDN, or (IPv4 or IPv6) network address. + Address string `json:"address,omitempty"` + + // Port holds the network port for the destination service. + Port int `json:"port,omitempty"` + + // Service holds additional destination service context. + Service *SpanDestinationServiceContext `json:"service,omitempty"` +} + +// SpanDestinationServiceContext holds contextual information about a +// destination service,. +type SpanDestinationServiceContext struct { + // Type holds the destination service type. + Type string `json:"type,omitempty"` + + // Name holds the destination service name. + Name string `json:"name,omitempty"` + + // Resource identifies the destination service + // resource, e.g. a URI or message queue name. + Resource string `json:"resource,omitempty"` +} + // DatabaseSpanContext holds contextual information for database // operation spans. type DatabaseSpanContext struct { diff --git a/modelwriter.go b/modelwriter.go index def2ae617..a3e4c6573 100644 --- a/modelwriter.go +++ b/modelwriter.go @@ -147,6 +147,14 @@ func (w *modelWriter) buildModelSpan(out *model.Span, span *Span, sd *SpanData) out.Duration = sd.Duration.Seconds() * 1000 out.Context = sd.Context.build() + // Copy the span type to context.destination.service.type. + // + // TODO(axw) see if we can omit the field entirely, having + // the server default to using span.type. + if out.Context != nil && out.Context.Destination != nil && out.Context.Destination.Service != nil { + out.Context.Destination.Service.Type = out.Type + } + w.modelStacktrace = appendModelStacktraceFrames(w.modelStacktrace, sd.stacktrace) out.Stacktrace = w.modelStacktrace w.setStacktraceContext(out.Stacktrace) diff --git a/module/apmelasticsearch/client.go b/module/apmelasticsearch/client.go index eda9163ad..36b27ef2f 100644 --- a/module/apmelasticsearch/client.go +++ b/module/apmelasticsearch/client.go @@ -32,6 +32,10 @@ import ( "go.elastic.co/apm/module/apmhttp" ) +const ( + defaultElasticsearchPortSuffix = ":9200" +) + // WrapRoundTripper returns an http.RoundTripper wrapping r, reporting each // request as a span to Elastic APM, if the request's context contains a // sampled transaction. @@ -79,6 +83,10 @@ func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { ctx = apm.ContextWithSpan(ctx, span) req = apmhttp.RequestWithContext(ctx, req) span.Context.SetHTTPRequest(req) + span.Context.SetDestinationService(apm.DestinationServiceSpanContext{ + Name: "elasticsearch", + Resource: "elasticsearch", + }) span.Context.SetDatabase(apm.DatabaseSpanContext{ Type: "elasticsearch", Statement: statement, diff --git a/module/apmelasticsearch/client_test.go b/module/apmelasticsearch/client_test.go index f9e1982b3..b895bd845 100644 --- a/module/apmelasticsearch/client_test.go +++ b/module/apmelasticsearch/client_test.go @@ -260,6 +260,38 @@ func TestStatementBodyGzipContentEncoding(t *testing.T) { }, spans[0].Context.Database) } +func TestDestinationAddress(t *testing.T) { + var rt roundTripperFunc = func(req *http.Request) (*http.Response, error) { + return httptest.NewRecorder().Result(), nil + } + client := &http.Client{Transport: apmelasticsearch.WrapRoundTripper(rt)} + + test := func(url, destinationAddr string, destinationPort int) { + req, err := http.NewRequest("GET", url, nil) + require.NoError(t, err) + _, spans, _ := apmtest.WithTransaction(func(ctx context.Context) { + resp, err := client.Do(req.WithContext(ctx)) + assert.NoError(t, err) + resp.Body.Close() + }) + require.Len(t, spans, 1) + assert.Equal(t, &model.SpanDestinationContext{ + Address: destinationAddr, + Port: destinationPort, + Service: &model.SpanDestinationServiceContext{ + Type: "db", + Name: "elasticsearch", + Resource: "elasticsearch", + }, + }, spans[0].Context.Destination) + } + test("http://host:9200/_search", "host", 9200) + test("http://host:80/_search", "host", 80) + test("http://127.0.0.1:9200/_search", "127.0.0.1", 9200) + test("http://[2001:db8::1]:9200/_search", "2001:db8::1", 9200) + test("http://[2001:db8::1]:80/_search", "2001:db8::1", 80) +} + type errorReadCloser struct { readError error closed bool diff --git a/module/apmelasticsearch/internal/integration/elastic_integration_test.go b/module/apmelasticsearch/internal/integration/elastic_integration_test.go new file mode 100644 index 000000000..a74170295 --- /dev/null +++ b/module/apmelasticsearch/internal/integration/elastic_integration_test.go @@ -0,0 +1,84 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build go1.11 + +package integration_test + +import ( + "context" + "net/http" + "net/url" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + elasticsearch "github.com/elastic/go-elasticsearch/v7" + + "go.elastic.co/apm/apmtest" + "go.elastic.co/apm/model" + "go.elastic.co/apm/module/apmelasticsearch" +) + +func TestElastic(t *testing.T) { + if elasticsearchURL == "" { + t.Skipf("ELASTICSEARCH_URL not specified") + } + + es, err := elasticsearch.NewClient(elasticsearch.Config{ + // Addresses set from ELASTICSEARCH_URL + Transport: apmelasticsearch.WrapRoundTripper(http.DefaultTransport), + }) + require.NoError(t, err) + + _, spans, errs := apmtest.WithTransaction(func(ctx context.Context) { + res, err := es.Search( + es.Search.WithIndex("no_index"), + es.Search.WithContext(ctx), + es.Search.WithBody(strings.NewReader(`{"query":{"match_all":{}}}`)), + ) + require.NoError(t, err) + res.Body.Close() + }) + assert.Empty(t, errs) + require.Len(t, spans, 1) + + esurl, err := url.Parse(elasticsearchURL) + require.NoError(t, err) + esurl.Path = "/no_index/_search" + + // We test the value of destination in unit tests. + require.NotNil(t, spans[0].Context.Destination) + spans[0].Context.Destination = nil + + assert.Equal(t, "Elasticsearch: GET no_index/_search", spans[0].Name) + assert.Equal(t, "db", spans[0].Type) + assert.Equal(t, "elasticsearch", spans[0].Subtype) + assert.Equal(t, "", spans[0].Action) + assert.Equal(t, &model.SpanContext{ + Database: &model.DatabaseSpanContext{ + Type: "elasticsearch", + Statement: `{"query":{"match_all":{}}}`, + }, + HTTP: &model.HTTPSpanContext{ + URL: esurl, + StatusCode: 404, + }, + }, spans[0].Context) +} diff --git a/module/apmelasticsearch/internal/integration/go.mod b/module/apmelasticsearch/internal/integration/go.mod index 43c3f598f..f86b96aad 100644 --- a/module/apmelasticsearch/internal/integration/go.mod +++ b/module/apmelasticsearch/internal/integration/go.mod @@ -1,6 +1,7 @@ module go.elastic.co/apm/module/apmelasticsearch/internal/integration require ( + github.com/elastic/go-elasticsearch/v7 v7.5.0 github.com/fortytw2/leaktest v1.3.0 // indirect github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329 // indirect github.com/olivere/elastic v6.2.16+incompatible diff --git a/module/apmelasticsearch/internal/integration/go.sum b/module/apmelasticsearch/internal/integration/go.sum index a074a6091..a8d116565 100644 --- a/module/apmelasticsearch/internal/integration/go.sum +++ b/module/apmelasticsearch/internal/integration/go.sum @@ -3,6 +3,8 @@ github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgI github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/elastic/go-elasticsearch/v7 v7.5.0 h1:kXW+EKls2BwsyQujTmVrxANb3U0qoz9wEq8Zkf/JEco= +github.com/elastic/go-elasticsearch/v7 v7.5.0/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= github.com/elastic/go-sysinfo v1.0.1 h1:lzGPX2sIXaETeMXitXL2XZU8K4B7k7JBhIKWxdOdUt8= github.com/elastic/go-sysinfo v1.0.1/go.mod h1:O/D5m1VpYLwGjCYzEt63g3Z1uO3jXfwyzzjiW90t8cY= github.com/elastic/go-windows v1.0.0 h1:qLURgZFkkrYyTTkvYpsZIgf83AUsdIHfvlJaqaZ7aSY= diff --git a/module/apmgorm/apmgorm_test.go b/module/apmgorm/apmgorm_test.go index 2c4740811..429a8aeae 100644 --- a/module/apmgorm/apmgorm_test.go +++ b/module/apmgorm/apmgorm_test.go @@ -56,7 +56,12 @@ func TestWithContext(t *testing.T) { } else { t.Run("postgres", func(t *testing.T) { testWithContext(t, - apmsql.DSNInfo{Database: "test_db", User: "postgres"}, + apmsql.DSNInfo{ + Address: os.Getenv("PGHOST"), + Port: 5432, + Database: "test_db", + User: "postgres", + }, "postgres", "user=postgres password=hunter2 dbname=test_db sslmode=disable", ) }) @@ -67,7 +72,12 @@ func TestWithContext(t *testing.T) { } else { t.Run("mysql", func(t *testing.T) { testWithContext(t, - apmsql.DSNInfo{Database: "test_db", User: "root"}, + apmsql.DSNInfo{ + Address: mysqlHost, + Port: 3306, + Database: "test_db", + User: "root", + }, "mysql", "root:hunter2@tcp("+mysqlHost+")/test_db?parseTime=true", ) }) @@ -106,6 +116,12 @@ func testWithContext(t *testing.T, dsnInfo apmsql.DSNInfo, dialect string, args assert.NotEmpty(t, span.Context.Database.Statement) assert.Equal(t, "sql", span.Context.Database.Type) assert.Equal(t, dsnInfo.User, span.Context.Database.User) + if dsnInfo.Address == "" { + assert.Nil(t, span.Context.Destination) + } else { + assert.Equal(t, dsnInfo.Address, span.Context.Destination.Address) + assert.Equal(t, dsnInfo.Port, span.Context.Destination.Port) + } } assert.Equal(t, []string{ "INSERT INTO products", diff --git a/module/apmgorm/context.go b/module/apmgorm/context.go index 53f716078..12f03112c 100644 --- a/module/apmgorm/context.go +++ b/module/apmgorm/context.go @@ -132,6 +132,7 @@ func newAfterCallback(dsnInfo apmsql.DSNInfo) func(*gorm.Scope) { return } span.Name = apmsql.QuerySignature(scope.SQL) + span.Context.SetDestinationAddress(dsnInfo.Address, dsnInfo.Port) span.Context.SetDatabase(apm.DatabaseSpanContext{ Instance: dsnInfo.Database, Statement: scope.SQL, diff --git a/module/apmhttp/client_test.go b/module/apmhttp/client_test.go index cc8905cd2..f365d99c8 100644 --- a/module/apmhttp/client_test.go +++ b/module/apmhttp/client_test.go @@ -22,6 +22,7 @@ import ( "encoding/json" "errors" "io/ioutil" + "net" "net/http" "net/http/httptest" "net/url" @@ -73,11 +74,22 @@ func TestClient(t *testing.T) { transaction := payloads.Transactions[0] span := payloads.Spans[0] + serverAddr := server.Listener.Addr().(*net.TCPAddr) + assert.Equal(t, transaction.ID, span.ParentID) - assert.Equal(t, "GET "+server.Listener.Addr().String(), span.Name) + assert.Equal(t, "GET "+serverAddr.String(), span.Name) assert.Equal(t, "external", span.Type) assert.Equal(t, "http", span.Subtype) assert.Equal(t, &model.SpanContext{ + Destination: &model.SpanDestinationContext{ + Address: serverAddr.IP.String(), + Port: serverAddr.Port, + Service: &model.SpanDestinationServiceContext{ + Type: "external", + Name: "http://" + serverAddr.String(), + Resource: serverAddr.String(), + }, + }, HTTP: &model.HTTPSpanContext{ // Note no user info included in server.URL. URL: serverURL, diff --git a/module/apmot/tracer_test.go b/module/apmot/tracer_test.go index 574815d96..b23306023 100644 --- a/module/apmot/tracer_test.go +++ b/module/apmot/tracer_test.go @@ -180,6 +180,15 @@ func TestHTTPSpan(t *testing.T) { assert.Equal(t, "http", modelSpan.Subtype) assert.Equal(t, &model.SpanContext{ HTTP: &model.HTTPSpanContext{URL: url}, + Destination: &model.SpanDestinationContext{ + Address: "testing.invalid", + Port: 8443, + Service: &model.SpanDestinationServiceContext{ + Type: "external", + Name: "https://testing.invalid:8443", + Resource: "testing.invalid:8443", + }, + }, }, modelSpan.Context) } diff --git a/module/apmsql/conn.go b/module/apmsql/conn.go index 073614325..a0ae7fb0f 100644 --- a/module/apmsql/conn.go +++ b/module/apmsql/conn.go @@ -66,6 +66,13 @@ func (c *conn) startStmtSpan(ctx context.Context, stmt, spanType string) (*apm.S func (c *conn) startSpan(ctx context.Context, name, spanType, stmt string) (*apm.Span, context.Context) { span, ctx := apm.StartSpan(ctx, name, spanType) if !span.Dropped() { + if c.dsnInfo.Address != "" { + span.Context.SetDestinationAddress(c.dsnInfo.Address, c.dsnInfo.Port) + span.Context.SetDestinationService(apm.DestinationServiceSpanContext{ + Name: c.driver.driverName, + Resource: c.driver.driverName, + }) + } span.Context.SetDatabase(apm.DatabaseSpanContext{ Instance: c.dsnInfo.Database, Statement: stmt, diff --git a/module/apmsql/dsn.go b/module/apmsql/dsn.go index b756a72ca..2ac83c837 100644 --- a/module/apmsql/dsn.go +++ b/module/apmsql/dsn.go @@ -19,6 +19,12 @@ package apmsql // DSNInfo contains information from a database-specific data source name. type DSNInfo struct { + // Address is the database server address specified by the DSN. + Address string + + // Port is the database server port specified by the DSN. + Port int + // Database is the name of the specific database identified by the DSN. Database string diff --git a/module/apmsql/mysql/mysql_test.go b/module/apmsql/mysql/mysql_test.go index 6b548346f..fa2ec8244 100644 --- a/module/apmsql/mysql/mysql_test.go +++ b/module/apmsql/mysql/mysql_test.go @@ -56,6 +56,15 @@ func TestQueryContext(t *testing.T) { assert.Equal(t, "SELECT FROM foo", spans[0].Name) assert.Equal(t, "mysql", spans[0].Subtype) assert.Equal(t, &model.SpanContext{ + Destination: &model.SpanDestinationContext{ + Address: mysqlHost, + Port: 3306, + Service: &model.SpanDestinationServiceContext{ + Type: "db", + Name: "mysql", + Resource: "mysql", + }, + }, Database: &model.DatabaseSpanContext{ Instance: "test_db", Statement: "SELECT * FROM foo", diff --git a/module/apmsql/mysql/parser.go b/module/apmsql/mysql/parser.go index a8e76c403..49fc1651d 100644 --- a/module/apmsql/mysql/parser.go +++ b/module/apmsql/mysql/parser.go @@ -18,6 +18,9 @@ package apmmysql import ( + "net" + "strconv" + "github.com/go-sql-driver/mysql" "go.elastic.co/apm/module/apmsql" @@ -31,7 +34,16 @@ func ParseDSN(name string) apmsql.DSNInfo { // so just return a zero value. return apmsql.DSNInfo{} } + var addr string + var port int + if cfg.Net == "tcp" { + host, portstr, _ := net.SplitHostPort(cfg.Addr) + port, _ = strconv.Atoi(portstr) + addr = host + } return apmsql.DSNInfo{ + Address: addr, + Port: port, Database: cfg.DBName, User: cfg.User, } diff --git a/module/apmsql/mysql/parser_test.go b/module/apmsql/mysql/parser_test.go index 6e9bab0c3..c6beff7b8 100644 --- a/module/apmsql/mysql/parser_test.go +++ b/module/apmsql/mysql/parser_test.go @@ -28,8 +28,24 @@ import ( func TestParseDSN(t *testing.T) { info := apmmysql.ParseDSN("user:pass@/dbname") - assert.Equal(t, "dbname", info.Database) - assert.Equal(t, "user", info.User) + assert.Equal(t, apmsql.DSNInfo{ + Address: "127.0.0.1", + Port: 3306, + Database: "dbname", + User: "user", + }, info) +} + +func TestParseDSNAddr(t *testing.T) { + test := func(dsn, addr string, port int) { + parsed := apmmysql.ParseDSN(dsn) + assert.Equal(t, addr, parsed.Address) + assert.Equal(t, port, parsed.Port) + } + test("user:pass@tcp(1.2.3.4)/dbname", "1.2.3.4", 3306) + test("user:pass@tcp(1.2.3.4:1234)/dbname", "1.2.3.4", 1234) + test("user:pass@tcp(::1)/dbname", "::1", 3306) + test("user:pass@tcp([::1]:3306)/dbname", "::1", 3306) } func TestParseDSNError(t *testing.T) { diff --git a/module/apmsql/pq/parser.go b/module/apmsql/pq/parser.go index f30216539..2c4f4db73 100644 --- a/module/apmsql/pq/parser.go +++ b/module/apmsql/pq/parser.go @@ -23,12 +23,17 @@ import ( nurl "net/url" "os" "sort" + "strconv" "strings" "unicode" "go.elastic.co/apm/module/apmsql" ) +const ( + defaultPostgresPort = 5432 +) + // ParseDSN parses the given lib/pq datasource name, which may // be either a URL or connection string. func ParseDSN(name string) apmsql.DSNInfo { @@ -36,12 +41,17 @@ func ParseDSN(name string) apmsql.DSNInfo { name = connStr } opts := make(values) + opts["host"] = os.Getenv("PGHOST") + opts["port"] = os.Getenv("PGPORT") if err := parseOpts(name, opts); err != nil { // pq.Open will fail with the same error, // so just return a zero value. return apmsql.DSNInfo{} } + addr, port := getAddr(opts) info := apmsql.DSNInfo{ + Address: addr, + Port: port, Database: opts["dbname"], User: opts["user"], } @@ -54,6 +64,25 @@ func ParseDSN(name string) apmsql.DSNInfo { return info } +func getAddr(opts values) (string, int) { + hostOpt := opts["host"] + if hostOpt == "" { + hostOpt = "localhost" + } else if strings.HasPrefix(hostOpt, "/") { + // We don't report Unix addresses. + return "", 0 + } else if n := len(hostOpt); n > 1 && hostOpt[0] == '[' && hostOpt[n-1] == ']' { + hostOpt = hostOpt[1 : n-1] + } + port := defaultPostgresPort + if portOpt := opts["port"]; portOpt != "" { + if v, err := strconv.Atoi(portOpt); err == nil { + port = v + } + } + return hostOpt, port +} + // Code below is copied from github.com/lib/pq (see NOTICE). // parseURL no longer needs to be used by clients of this library since supplying a URL as a diff --git a/module/apmsql/pq/parser_test.go b/module/apmsql/pq/parser_test.go index eb39d0936..71a3e4d5b 100644 --- a/module/apmsql/pq/parser_test.go +++ b/module/apmsql/pq/parser_test.go @@ -23,28 +23,71 @@ import ( "github.com/stretchr/testify/assert" + "go.elastic.co/apm/module/apmsql" apmpq "go.elastic.co/apm/module/apmsql/pq" ) +func patchEnv(k, v string) func() { + old, reset := os.LookupEnv(k) + os.Setenv(k, v) + if reset { + return func() { os.Setenv(k, old) } + } else { + return func() { os.Unsetenv(k) } + } +} + func TestParseDSNURL(t *testing.T) { - info := apmpq.ParseDSN("postgresql://user:pass@localhost/dbinst") - assert.Equal(t, "dbinst", info.Database) - assert.Equal(t, "user", info.User) + for _, k := range []string{"PGDATABASE", "PGUSER", "PGHOST", "PGPORT"} { + unpatch := patchEnv(k, "") + defer unpatch() + } + + test := func(url, addr string, port int) { + info := apmpq.ParseDSN(url) + assert.Equal(t, apmsql.DSNInfo{ + Address: addr, + Port: port, + Database: "dbinst", + User: "user", + }, info) + } + test("postgresql://user:pass@localhost/dbinst", "localhost", 5432) + test("postgresql://user:pass@localhost:5432/dbinst", "localhost", 5432) + test("postgresql://user:pass@localhost:5433/dbinst", "localhost", 5433) + test("postgresql://user:pass@127.0.0.1/dbinst", "127.0.0.1", 5432) + test("postgresql://user:pass@[::1]:1234/dbinst", "::1", 1234) + test("postgresql://user:pass@[::1]/dbinst", "::1", 5432) + test("postgresql://user:pass@::1/dbinst", "::1", 5432) } func TestParseDSNConnectionString(t *testing.T) { + for _, k := range []string{"PGDATABASE", "PGUSER", "PGHOST", "PGPORT"} { + unpatch := patchEnv(k, "") + defer unpatch() + } info := apmpq.ParseDSN("dbname=foo\\ bar user='baz'") - assert.Equal(t, "foo bar", info.Database) - assert.Equal(t, "baz", info.User) + assert.Equal(t, apmsql.DSNInfo{ + Address: "localhost", + Port: 5432, + Database: "foo bar", + User: "baz", + }, info) } func TestParseDSNEnv(t *testing.T) { - os.Setenv("PGDATABASE", "dbinst") - os.Setenv("PGUSER", "bob") - defer os.Unsetenv("PGDATABASE") - defer os.Unsetenv("PGUSER") + for _, kv := range [][]string{ + {"PGDATABASE", "dbinst"}, {"PGUSER", "bob"}, {"PGHOST", "postgres"}, {"PGPORT", "2345"}, + } { + unpatch := patchEnv(kv[0], kv[1]) + defer unpatch() + } info := apmpq.ParseDSN("postgres://") - assert.Equal(t, "dbinst", info.Database) - assert.Equal(t, "bob", info.User) + assert.Equal(t, apmsql.DSNInfo{ + Address: "postgres", + Port: 2345, + Database: "dbinst", + User: "bob", + }, info) } diff --git a/module/apmsql/pq/pq_test.go b/module/apmsql/pq/pq_test.go index 72bd1e614..0e38b7a61 100644 --- a/module/apmsql/pq/pq_test.go +++ b/module/apmsql/pq/pq_test.go @@ -54,6 +54,15 @@ func TestQueryContext(t *testing.T) { assert.Equal(t, "SELECT FROM foo", spans[0].Name) assert.Equal(t, "postgresql", spans[0].Subtype) assert.Equal(t, &model.SpanContext{ + Destination: &model.SpanDestinationContext{ + Address: os.Getenv("PGHOST"), + Port: 5432, + Service: &model.SpanDestinationServiceContext{ + Type: "db", + Name: "postgresql", + Resource: "postgresql", + }, + }, Database: &model.DatabaseSpanContext{ Instance: "test_db", Statement: "SELECT * FROM foo", diff --git a/spancontext.go b/spancontext.go index 3bc826b81..1f2d65a42 100644 --- a/spancontext.go +++ b/spancontext.go @@ -18,16 +18,22 @@ package apm import ( + "fmt" "net/http" + "net/url" + "strings" + "go.elastic.co/apm/internal/apmhttputil" "go.elastic.co/apm/model" ) // SpanContext provides methods for setting span context. type SpanContext struct { - model model.SpanContext - database model.DatabaseSpanContext - http model.HTTPSpanContext + model model.SpanContext + destination model.SpanDestinationContext + destinationService model.SpanDestinationServiceContext + database model.DatabaseSpanContext + http model.HTTPSpanContext } // DatabaseSpanContext holds database span context. @@ -46,11 +52,23 @@ type DatabaseSpanContext struct { User string } +// DestinationServiceSpanContext holds destination service span span. +type DestinationServiceSpanContext struct { + // Name holds a name for the destination service, which may be used + // for grouping and labeling in service maps. + Name string + + // Resource holds an identifier for a destination service resource, + // such as a message queue. + Resource string +} + func (c *SpanContext) build() *model.SpanContext { switch { case len(c.model.Tags) != 0: case c.model.Database != nil: case c.model.HTTP != nil: + case c.model.Destination != nil: default: return nil } @@ -106,9 +124,39 @@ func (c *SpanContext) SetDatabase(db DatabaseSpanContext) { // // This function relates to client requests. If the request URL contains // user info, it will be removed and excluded from the stored URL. +// +// SetHTTPRequest makes implicit calls to SetDestinationAddress and +// SetDestinationService, using details from req.URL. func (c *SpanContext) SetHTTPRequest(req *http.Request) { + if req.URL == nil { + return + } c.http.URL = req.URL c.model.HTTP = &c.http + + addr, port := apmhttputil.DestinationAddr(req) + c.SetDestinationAddress(addr, port) + + destinationServiceURL := url.URL{Scheme: req.URL.Scheme, Host: req.URL.Host} + destinationServiceResource := destinationServiceURL.Host + if port != 0 && port == apmhttputil.SchemeDefaultPort(req.URL.Scheme) { + var hasDefaultPort bool + if n := len(destinationServiceURL.Host); n > 0 && destinationServiceURL.Host[n-1] != ']' { + if i := strings.LastIndexByte(destinationServiceURL.Host, ':'); i != -1 { + // Remove the default port from destination.service.name. + destinationServiceURL.Host = destinationServiceURL.Host[:i] + hasDefaultPort = true + } + } + if !hasDefaultPort { + // Add the default port to destination.service.resource. + destinationServiceResource = fmt.Sprintf("%s:%d", destinationServiceResource, port) + } + } + c.SetDestinationService(DestinationServiceSpanContext{ + Name: destinationServiceURL.String(), + Resource: destinationServiceResource, + }) } // SetHTTPStatusCode records the HTTP response status code. @@ -116,3 +164,22 @@ func (c *SpanContext) SetHTTPStatusCode(statusCode int) { c.http.StatusCode = statusCode c.model.HTTP = &c.http } + +// SetDestinationAddress sets the destination address and port in the context. +// +// SetDestinationAddress has no effect when called when an empty addr. +func (c *SpanContext) SetDestinationAddress(addr string, port int) { + if addr != "" { + c.destination.Address = addr + c.destination.Port = port + c.model.Destination = &c.destination + } +} + +// SetDestinationService sets the destination service info in the context. +func (c *SpanContext) SetDestinationService(service DestinationServiceSpanContext) { + c.destinationService.Name = service.Name + c.destinationService.Resource = service.Resource + c.destination.Service = &c.destinationService + c.model.Destination = &c.destination +} diff --git a/spancontext_test.go b/spancontext_test.go index e44a190b9..72a536f9a 100644 --- a/spancontext_test.go +++ b/spancontext_test.go @@ -19,6 +19,8 @@ package apm_test import ( "context" + "net/http" + "net/url" "testing" "github.com/stretchr/testify/assert" @@ -47,3 +49,76 @@ func TestSpanContextSetLabel(t *testing.T) { {Key: "qux", Value: true}, }, spans[0].Context.Tags) } + +func TestSpanContextSetHTTPRequest(t *testing.T) { + type testcase struct { + url string + + addr string + port int + name string + resource string + } + + testcases := []testcase{{ + url: "http://localhost/foo/bar", + addr: "localhost", + port: 80, + name: "http://localhost", + resource: "localhost:80", + }, { + url: "http://localhost:80/foo/bar", + addr: "localhost", + port: 80, + name: "http://localhost", + resource: "localhost:80", + }, { + url: "https://[::1]/foo/bar", + addr: "::1", + port: 443, + name: "https://[::1]", + resource: "[::1]:443", + }, { + url: "https://[::1]:8443/foo/bar", + addr: "::1", + port: 8443, + name: "https://[::1]:8443", + resource: "[::1]:8443", + }, { + url: "gopher://gopher.invalid:70", + addr: "gopher.invalid", + port: 70, + name: "gopher://gopher.invalid:70", + resource: "gopher.invalid:70", + }, { + url: "gopher://gopher.invalid", + addr: "gopher.invalid", + port: 0, + name: "gopher://gopher.invalid", + resource: "gopher.invalid", + }} + + for _, tc := range testcases { + t.Run(tc.url, func(t *testing.T) { + url, err := url.Parse(tc.url) + require.NoError(t, err) + + _, spans, _ := apmtest.WithTransaction(func(ctx context.Context) { + span, _ := apm.StartSpan(ctx, "name", "type") + span.Context.SetHTTPRequest(&http.Request{URL: url}) + span.End() + }) + require.Len(t, spans, 1) + + assert.Equal(t, &model.SpanDestinationContext{ + Address: tc.addr, + Port: tc.port, + Service: &model.SpanDestinationServiceContext{ + Type: spans[0].Type, + Name: tc.name, + Resource: tc.resource, + }, + }, spans[0].Context.Destination) + }) + } +}