From 226f6a656511826e9242974a6e753010a04101af Mon Sep 17 00:00:00 2001 From: Nick Ripley <97066770+nsrip-dd@users.noreply.github.com> Date: Thu, 24 Feb 2022 09:56:09 -0500 Subject: [PATCH] internal/telemetry: add initial telemetry implementation (#1172) Initially support start, stop, heartbeat, and metrics messages. Does everything in the background. Doesn't have integration/dependency update messages, but those might not make sense for Go. Still needs default URLs baked in. --- internal/telemetry/client.go | 421 ++++++++++++++++++++++++++++++ internal/telemetry/client_test.go | 263 +++++++++++++++++++ internal/telemetry/message.go | 119 +++++++++ 3 files changed, 803 insertions(+) create mode 100644 internal/telemetry/client.go create mode 100644 internal/telemetry/client_test.go create mode 100644 internal/telemetry/message.go diff --git a/internal/telemetry/client.go b/internal/telemetry/client.go new file mode 100644 index 0000000000..c3afb69f10 --- /dev/null +++ b/internal/telemetry/client.go @@ -0,0 +1,421 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2022 Datadog, Inc. + +// Package telemetry implements a client for sending telemetry information to +// Datadog regarding usage of an APM library such as tracing or profiling. +package telemetry + +import ( + "bytes" + "encoding/json" + "fmt" + "net" + "net/http" + "os" + "runtime" + "runtime/debug" + "strings" + "sync" + "sync/atomic" + "time" + + "gopkg.in/DataDog/dd-trace-go.v1/internal" + "gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig" + "gopkg.in/DataDog/dd-trace-go.v1/internal/osinfo" + "gopkg.in/DataDog/dd-trace-go.v1/internal/version" +) + +var ( + // copied from dd-trace-go/profiler + defaultClient = &http.Client{ + // We copy the transport to avoid using the default one, as it might be + // augmented with tracing and we don't want these calls to be recorded. + // See https://golang.org/pkg/net/http/#DefaultTransport . + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + DualStack: true, + }).DialContext, + MaxIdleConns: 100, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + }, + Timeout: 5 * time.Second, + } + // TODO: Default telemetry URL? + hostname string +) + +func init() { + h, err := os.Hostname() + if err == nil { + hostname = h + } +} + +// Client buffers and sends telemetry messages to Datadog (possibly through an +// agent). Client.Start should be called before any other methods. +// +// Client is safe to use from multiple goroutines concurrently. The client will +// send all telemetry requests in the background, in order to avoid blocking the +// caller since telemetry should not disrupt an application. Metrics are +// aggregated by the Client. +type Client struct { + // URL for the Datadog agent or Datadog telemetry endpoint + URL string + // APIKey should be supplied if the endpoint is not a Datadog agent, + // i.e. you are sending telemetry directly to Datadog + APIKey string + // How often to send batched requests. Defaults to 60s + SubmissionInterval time.Duration + + // e.g. "tracers", "profilers", "appsec" + Namespace string + + // App-specific information + Service string + Env string + Version string + + // Determines whether telemetry should actually run. + // Defaults to false, but will be overridden by the environment variable + // DD_INSTRUMENTATION_TELEMETRY_ENABLED is set to 0 or false + Disabled bool + + // Optional destination to record submission-related logging events + Logger interface { + Printf(msg string, args ...interface{}) + } + + // mu guards all of the following fields + mu sync.Mutex + // started is true in between when Start() returns and the next call to + // Stop() + started bool + // seqID is a sequence number used to order telemetry messages by + // the back end. + seqID int64 + // t is used to schedule flushing outstanding messages + t *time.Timer + // requests hold all messages which don't need to be immediately sent + requests []*Request + // metrics holds un-sent metrics that will be aggregated the next time + // metrics are sent + metrics map[string]*metric + newMetrics bool +} + +func (c *Client) log(msg string, args ...interface{}) { + if c.Logger == nil { + return + } + c.Logger.Printf(msg, args...) +} + +// Start registers that the app has begun running with the given integrations +// and configuration. +func (c *Client) Start(integrations []Integration, configuration []Configuration) { + c.mu.Lock() + defer c.mu.Unlock() + enabled := os.Getenv("DD_INSTRUMENTATION_TELEMETRY_ENABLED") + if c.Disabled || enabled == "0" || enabled == "false" { + return + } + if c.started { + return + } + c.started = true + + // XXX: Should we let metrics persist between starting and stopping? + c.metrics = make(map[string]*metric) + + payload := &AppStarted{ + Integrations: append([]Integration{}, integrations...), + Configuration: append([]Configuration{}, configuration...), + } + deps, ok := debug.ReadBuildInfo() + if ok { + for _, dep := range deps.Deps { + payload.Dependencies = append(payload.Dependencies, + Dependency{ + Name: dep.Path, + Version: dep.Version, + // TODO: Neither of the types in the API + // docs (this or "SharedSystemLibrary") + // describe Go dependencies well + Type: "PlatformStandard", + }, + ) + } + } + + fromEnvOrDefault := func(key, def string) string { + if v := os.Getenv(key); len(v) > 0 { + return v + } + return def + } + c.Service = fromEnvOrDefault("DD_SERVICE", c.Service) + if len(c.Service) == 0 { + if name := globalconfig.ServiceName(); len(name) != 0 { + c.Service = name + } else { + // I think service *has* to be something? + c.Service = "unnamed-go-service" + } + } + c.Env = fromEnvOrDefault("DD_ENV", c.Env) + c.Version = fromEnvOrDefault("DD_VERSION", c.Version) + + c.APIKey = fromEnvOrDefault("DD_API_KEY", c.APIKey) + // TODO: Initialize URL/endpoint from environment var + + r := c.newRequest(RequestTypeAppStarted) + r.Payload = payload + c.scheduleSubmit(r) + c.flush() + + if c.SubmissionInterval == 0 { + c.SubmissionInterval = 60 * time.Second + } + c.t = time.AfterFunc(c.SubmissionInterval, c.backgroundFlush) +} + +// Stop notifies the telemetry endpoint that the app is closing. All outstanding +// messages will also be sent. No further messages will be sent until the client +// is started again +func (c *Client) Stop() { + c.mu.Lock() + defer c.mu.Unlock() + if !c.started { + return + } + c.started = false + c.t.Stop() + // close request types have no body + r := c.newRequest(RequestTypeAppClosing) + c.scheduleSubmit(r) + c.flush() +} + +type metricKind string + +var ( + metricKindGauge metricKind = "gauge" + metricKindCount metricKind = "count" +) + +type metric struct { + name string + kind metricKind + value float64 + // Unix timestamp + ts float64 + tags []string + common bool +} + +// TODO: Can there be identically named/tagged metrics with a "common" and "not +// common" variant? + +func newmetric(name string, kind metricKind, tags []string, common bool) *metric { + return &metric{ + name: name, + kind: kind, + tags: append([]string{}, tags...), + common: common, + } +} + +func metricKey(name string, tags []string) string { + return name + strings.Join(tags, "-") +} + +// Gauge sets the value for a gauge with the given name and tags. If the metric +// is not language-specific, common should be set to true +func (c *Client) Gauge(name string, value float64, tags []string, common bool) { + c.mu.Lock() + defer c.mu.Unlock() + if !c.started { + return + } + key := metricKey(name, tags) + m, ok := c.metrics[key] + if !ok { + m = newmetric(name, metricKindGauge, tags, common) + c.metrics[key] = m + } + m.value = value + m.ts = float64(time.Now().Unix()) + c.newMetrics = true +} + +// Count adds the value to a count with the given name and tags. If the metric +// is not language-specific, common should be set to true +func (c *Client) Count(name string, value float64, tags []string, common bool) { + c.mu.Lock() + defer c.mu.Unlock() + if !c.started { + return + } + key := metricKey(name, tags) + m, ok := c.metrics[key] + if !ok { + m = newmetric(name, metricKindCount, tags, common) + c.metrics[key] = m + } + m.value += value + m.ts = float64(time.Now().Unix()) + c.newMetrics = true +} + +// flush sends any outstanding telemetry messages and aggregated metrics to be +// sent to the backend. Requests are sent in the background. Should be called +// with c.mu locked +func (c *Client) flush() { + submissions := make([]*Request, 0, len(c.requests)+1) + if c.newMetrics { + c.newMetrics = false + r := c.newRequest(RequestTypeGenerateMetrics) + payload := &Metrics{ + Namespace: c.Namespace, + LibLanguage: "golang", + LibVersion: version.Tag, + } + for _, m := range c.metrics { + s := Series{ + Metric: m.name, + Type: string(m.kind), + Tags: m.tags, + Common: m.common, + } + s.Points = [][2]float64{{m.ts, m.value}} + payload.Series = append(payload.Series, s) + } + r.Payload = payload + submissions = append(submissions, r) + } + + // copy over requests so we can do the actual submission without holding + // the lock. Zero out the old stuff so we don't leak references + for i, r := range c.requests { + submissions = append(submissions, r) + c.requests[i] = nil + } + c.requests = c.requests[:0] + + go func() { + for _, r := range submissions { + err := c.submit(r) + if err != nil { + c.log("telemetry submission failed: %s", err) + } + } + }() +} + +var ( + osName string + osNameOnce sync.Once + osVersion string + osVersionOnce sync.Once +) + +// XXX: is it actually safe to cache osName and osVersion? For example, can the +// kernel be updated without stopping execution? + +func getOSName() string { + osNameOnce.Do(func() { osName = osinfo.OSName() }) + return osName +} + +func getOSVersion() string { + osVersionOnce.Do(func() { osVersion = osinfo.OSVersion() }) + return osVersion +} + +// newRequests populates a request with the common fields shared by all requests +// sent through this Client +func (c *Client) newRequest(t RequestType) *Request { + seqID := atomic.AddInt64(&c.seqID, 1) + return &Request{ + APIVersion: "v1", + RequestType: t, + TracerTime: time.Now().Unix(), + RuntimeID: globalconfig.RuntimeID(), + SeqID: seqID, + Application: Application{ + ServiceName: c.Service, + Env: c.Env, + ServiceVersion: c.Version, + TracerVersion: version.Tag, + LanguageName: "golang", + LanguageVersion: runtime.Version(), + }, + Host: Host{ + Hostname: hostname, + ContainerID: internal.ContainerID(), + OS: getOSName(), + OSVersion: getOSVersion(), + }, + } +} + +// submit posts a telemetry request to the backend +func (c *Client) submit(r *Request) error { + b, err := json.Marshal(r) + if err != nil { + return err + } + + req, err := http.NewRequest(http.MethodPost, c.URL, bytes.NewReader(b)) + if err != nil { + return err + } + req.Header = http.Header{ + "Content-Type": {"application/json"}, + "DD-Telemetry-API-Version": {"v1"}, + "DD-Telemetry-Request-Type": {string(r.RequestType)}, + } + if len(c.APIKey) > 0 { + req.Header.Add("DD-API-Key", c.APIKey) + } + req.ContentLength = int64(len(b)) + + resp, err := defaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusAccepted && resp.StatusCode != http.StatusOK { + return errBadStatus(resp.StatusCode) + } + return nil +} + +type errBadStatus int + +func (e errBadStatus) Error() string { return fmt.Sprintf("bad HTTP response status %d", e) } + +// scheduleSubmit queues a request to be sent to the backend. Should be called +// with c.mu locked +func (c *Client) scheduleSubmit(r *Request) { + c.requests = append(c.requests, r) +} + +func (c *Client) backgroundFlush() { + c.mu.Lock() + defer c.mu.Unlock() + if !c.started { + return + } + r := c.newRequest(RequestTypeAppHeartbeat) + c.scheduleSubmit(r) + c.flush() + c.t.Reset(c.SubmissionInterval) +} diff --git a/internal/telemetry/client_test.go b/internal/telemetry/client_test.go new file mode 100644 index 0000000000..b91fbee679 --- /dev/null +++ b/internal/telemetry/client_test.go @@ -0,0 +1,263 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2022 Datadog, Inc. + +package telemetry_test + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "os" + "reflect" + "sort" + "sync" + "sync/atomic" + "testing" + "time" + + "gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry" +) + +func TestClient(t *testing.T) { + ch := make(chan telemetry.RequestType) + var ( + hb sync.WaitGroup + gotheartbeat int64 + ) + hb.Add(1) // signal that we got a heartbeat + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + h := r.Header.Get("DD-Telemetry-Request-Type") + if len(h) == 0 { + t.Fatal("didn't get telemetry request type header") + } + if telemetry.RequestType(h) == telemetry.RequestTypeAppHeartbeat { + // only keep the first heartbeat in case we happen to get + // multiple heartbeats in the waiting interval to avoid flaky + // tests + if !atomic.CompareAndSwapInt64(&gotheartbeat, 0, 1) { + return + } + hb.Done() + } + ch <- telemetry.RequestType(h) + })) + defer server.Close() + + go func() { + client := &telemetry.Client{ + URL: server.URL, + SubmissionInterval: 5 * time.Millisecond, + } + client.Start(nil, nil) + client.Start(nil, nil) // test idempotence + hb.Wait() + client.Stop() + client.Stop() // test idempotence + }() + + var got []telemetry.RequestType + for i := 0; i < 3; i++ { + header := <-ch + got = append(got, header) + } + + want := []telemetry.RequestType{telemetry.RequestTypeAppStarted, telemetry.RequestTypeAppHeartbeat, telemetry.RequestTypeAppClosing} + if !reflect.DeepEqual(want, got) { + t.Fatalf("wanted %v, got %v", want, got) + } +} + +func TestMetrics(t *testing.T) { + var ( + mu sync.Mutex + got []telemetry.Series + ) + closed := make(chan struct{}, 1) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Header.Get("DD-Telemetry-Request-Type") == string(telemetry.RequestTypeAppClosing) { + select { + case closed <- struct{}{}: + default: + } + return + } + req := telemetry.Request{ + Payload: new(telemetry.Metrics), + } + dec := json.NewDecoder(r.Body) + err := dec.Decode(&req) + if err != nil { + t.Fatal(err) + } + if req.RequestType != telemetry.RequestTypeGenerateMetrics { + return + } + v, ok := req.Payload.(*telemetry.Metrics) + if !ok { + t.Fatal("payload set metrics but didn't get metrics") + } + for _, s := range v.Series { + for i, p := range s.Points { + // zero out timestamps + s.Points[i] = [2]float64{0, p[1]} + } + } + mu.Lock() + got = append(got, v.Series...) + mu.Unlock() + })) + defer server.Close() + + go func() { + client := &telemetry.Client{ + URL: server.URL, + } + client.Start(nil, nil) + + // Gauges should have the most recent value + client.Gauge("foobar", 1, nil, false) + client.Gauge("foobar", 2, nil, false) + // Counts should be aggregated + client.Count("baz", 3, nil, true) + client.Count("baz", 1, nil, true) + // Tags should be passed through + client.Count("bonk", 4, []string{"org:1"}, false) + client.Stop() + }() + + <-closed + + want := []telemetry.Series{ + {Metric: "baz", Type: "count", Points: [][2]float64{{0, 4}}, Tags: []string{}, Common: true}, + {Metric: "bonk", Type: "count", Points: [][2]float64{{0, 4}}, Tags: []string{"org:1"}}, + {Metric: "foobar", Type: "gauge", Points: [][2]float64{{0, 2}}, Tags: []string{}}, + } + sort.Slice(got, func(i, j int) bool { + return got[i].Metric < got[j].Metric + }) + if !reflect.DeepEqual(want, got) { + t.Fatalf("want %+v, got %+v", want, got) + } +} + +// testSetEnv is a copy of testing.T.Setenv so we can build this library +// for Go versions prior to 1.17 +func testSetEnv(t *testing.T, key, val string) { + prev, ok := os.LookupEnv(key) + if ok { + t.Cleanup(func() { os.Setenv(key, prev) }) + } else { + t.Cleanup(func() { os.Unsetenv(key) }) + } + os.Setenv(key, val) +} + +func TestDisabledClient(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Fatal("shouldn't have got any requests") + })) + defer server.Close() + testSetEnv(t, "DD_INSTRUMENTATION_TELEMETRY_ENABLED", "0") + + client := &telemetry.Client{ + URL: server.URL, + SubmissionInterval: time.Millisecond, + } + client.Start(nil, nil) + client.Gauge("foobar", 1, nil, false) + client.Count("bonk", 4, []string{"org:1"}, false) + client.Stop() +} + +func TestNonStartedClient(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Fatal("shouldn't have got any requests") + })) + defer server.Close() + + client := &telemetry.Client{ + URL: server.URL, + SubmissionInterval: time.Millisecond, + } + client.Gauge("foobar", 1, nil, false) + client.Count("bonk", 4, []string{"org:1"}, false) + client.Stop() +} + +func TestConcurrentClient(t *testing.T) { + var ( + mu sync.Mutex + got []telemetry.Series + ) + closed := make(chan struct{}, 1) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Log("foo") + if r.Header.Get("DD-Telemetry-Request-Type") == string(telemetry.RequestTypeAppClosing) { + select { + case closed <- struct{}{}: + default: + return + } + } + req := telemetry.Request{ + Payload: new(telemetry.Metrics), + } + dec := json.NewDecoder(r.Body) + err := dec.Decode(&req) + if err != nil { + t.Fatal(err) + } + if req.RequestType != telemetry.RequestTypeGenerateMetrics { + return + } + v, ok := req.Payload.(*telemetry.Metrics) + if !ok { + t.Fatal("payload set metrics but didn't get metrics") + } + for _, s := range v.Series { + for i, p := range s.Points { + // zero out timestamps + s.Points[i] = [2]float64{0, p[1]} + } + } + mu.Lock() + got = append(got, v.Series...) + mu.Unlock() + })) + defer server.Close() + + go func() { + client := &telemetry.Client{ + URL: server.URL, + } + client.Start(nil, nil) + defer client.Stop() + + var wg sync.WaitGroup + for i := 0; i < 8; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < 10; j++ { + client.Count("foobar", 1, []string{"tag"}, false) + } + }() + } + wg.Wait() + }() + + <-closed + + want := []telemetry.Series{ + {Metric: "foobar", Type: "count", Points: [][2]float64{{0, 80}}, Tags: []string{"tag"}}, + } + sort.Slice(got, func(i, j int) bool { + return got[i].Metric < got[j].Metric + }) + if !reflect.DeepEqual(want, got) { + t.Fatalf("want %+v, got %+v", want, got) + } +} diff --git a/internal/telemetry/message.go b/internal/telemetry/message.go new file mode 100644 index 0000000000..f8619d70d2 --- /dev/null +++ b/internal/telemetry/message.go @@ -0,0 +1,119 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2022 Datadog, Inc. + +package telemetry + +// Request is the common high-level structure encapsulating a telemetry request +type Request struct { + APIVersion string `json:"api_version"` + RequestType RequestType `json:"request_type"` + TracerTime int64 `json:"tracer_time"` + RuntimeID string `json:"runtime_id"` + SeqID int64 `json:"seq_id"` + Application Application `json:"application"` + Host Host `json:"host"` + Payload interface{} `json:"payload"` +} + +// RequestType determines how the Payload of a request should be handled +type RequestType string + +const ( + // RequestTypeAppStarted is the first message sent by the telemetry + // client, containing the configuration, and integrations and + // dependencies loaded at startup + RequestTypeAppStarted RequestType = "app-started" + // RequestTypeAppHeartbeat is sent periodically by the client to indicate + // that the app is still running + RequestTypeAppHeartbeat RequestType = "app-heartbeat" + // RequestTypeGenerateMetrics contains all metrics accumulated by the + // client, and is sent periodically along with the heartbeat + RequestTypeGenerateMetrics RequestType = "generate-metrics" + // RequestTypeAppClosing is sent when the telemetry client is stopped + RequestTypeAppClosing RequestType = "app-closing" +) + +// Application is identifying information about the app itself +type Application struct { + ServiceName string `json:"service_name"` + Env string `json:"env"` + ServiceVersion string `json:"service_version,omitempty"` + TracerVersion string `json:"tracer_version"` + LanguageName string `json:"language_name"` + LanguageVersion string `json:"language_version"` +} + +// Host is identifying information about the host on which the app +// is running +type Host struct { + ContainerID string `json:"container_id,omitempty"` + Hostname string `json:"hostname,omitempty"` + OS string `json:"os,omitempty"` + OSVersion string `json:"os_version,omitempty"` + // TODO: Do we care about the kernel stuff? internal/osinfo gets most of + // this information in OSName/OSVersion + KernelName string `json:"kernel_name,omitempty"` + KernelRelease string `json:"kernel_release,omitempty"` + KernelVersion string `json:"kernel_version,omitempty"` +} + +// AppStarted corresponds to the "app-started" request type +type AppStarted struct { + Integrations []Integration `json:"integrations"` + Dependencies []Dependency `json:"dependencies"` + Configuration []Configuration `json:"configuration"` +} + +// Integration is an integration that is available within the app and applicable +// to be traced +type Integration struct { + Name string `json:"name"` + Enabled bool `json:"enabled"` + Version string `json:"version,omitempty"` + AutoEnabled bool `json:"auto_enabled,omitempty"` + Compatible bool `json:"compatible,omitempty"` + Error string `json:"error,omitempty"` +} + +// Dependency is a Go module on which the applciation depends. This information +// can be accesed at run-time through the runtime/debug.ReadBuildInfo API. +type Dependency struct { + Name string `json:"name"` + Version string `json:"version"` + Type string `json:"type"` +} + +// Configuration is a library-specific configuration value +type Configuration struct { + Name string `json:"name"` + // Value should have a type that can be marshaled to JSON + Value interface{} `json:"value"` +} + +// Metrics corresponds to the "generate-metrics" request type +type Metrics struct { + Namespace string `json:"namespace"` + LibLanguage string `json:"lib_language"` + LibVersion string `json:"lib_version"` + Series []Series `json:"series"` +} + +// Series is a sequence of observations for a single named metric +type Series struct { + Metric string `json:"metric"` + Points [][2]float64 `json:"points"` + Type string `json:"type"` + Tags []string `json:"tags"` + // Common distinguishes metrics which are cross-language vs. + // language-specific. + // + // NOTE: If this field isn't present in the request, the API assumes + // assumed the metric is common. So we can't "omitempty" even though the + // field is technically optional. + Common bool `json:"common"` +} + +// TODO: app-dependencies-loaded and app-integrations-change? Does this really +// apply to Go?