Skip to content

Commit

Permalink
profiler: upgrade to v2.4 upload format (#1453)
Browse files Browse the repository at this point in the history
Upgrading to format v2.4 lets us add custom attributes to uploaded
profiles, which have more rich type information than tags and allow for
better querying & sorting in the Datadog UI.

Along the way, consolidate some of the duplicated testing code for
creating test servers in the profiler and upload tests.
  • Loading branch information
nsrip-dd committed Sep 2, 2022
1 parent 516e82a commit 1e0594a
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 202 deletions.
116 changes: 72 additions & 44 deletions profiler/profiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package profiler
import (
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -288,12 +289,67 @@ func unstartedProfiler(opts ...Option) (*profiler, error) {
return p, nil
}

func TestAllUploaded(t *testing.T) {
type profileMeta struct {
tags []string
files []string
type profileMeta struct {
tags []string
headers http.Header
event uploadEvent
attachments map[string][]byte
}

type mockBackend struct {
t *testing.T
profiles chan profileMeta
}

func (m *mockBackend) ServeHTTP(w http.ResponseWriter, r *http.Request) {
profile := profileMeta{
attachments: make(map[string][]byte),
}
defer func() {
select {
case m.profiles <- profile:
default:
}
}()
profile.headers = r.Header.Clone()
if err := r.ParseMultipartForm(50 << 20); err != nil {
w.WriteHeader(http.StatusInternalServerError)
m.t.Fatalf("bad multipart form: %s", err)
return
}
file, _, err := r.FormFile("event")
if err != nil {
w.WriteHeader(http.StatusBadRequest)
m.t.Fatalf("getting event.json: %s", err)
return
}
if err := json.NewDecoder(file).Decode(&profile.event); err != nil {
w.WriteHeader(http.StatusBadRequest)
m.t.Fatalf("decoding event payload: %s", err)
return
}

profile.tags = append(profile.tags, strings.Split(profile.event.Tags, ",")...)
for _, name := range profile.event.Attachments {
f, _, err := r.FormFile(name)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
m.t.Fatalf("event attachment %s is missing from upload: %s", name, err)
return
}
defer f.Close()
data, err := io.ReadAll(f)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
m.t.Fatalf("reading attachment %s: %s", name, err)
return
}
profile.attachments[name] = data
}
w.WriteHeader(http.StatusAccepted)
}

func TestAllUploaded(t *testing.T) {
// This is a kind of end-to-end test that runs the real profiles (i.e.
// not mocking/replacing any internal functions) and verifies that the
// profiles are at least uploaded.
Expand All @@ -309,25 +365,7 @@ func TestAllUploaded(t *testing.T) {
// second batch of profiles is correct in case the profiler gets in a
// bad state after the first round of profiling.
received := make(chan profileMeta, 2)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var profile profileMeta
defer func() {
select {
case received <- profile:
default:
}
}()
if err := r.ParseMultipartForm(50 << 20); err != nil {
t.Fatalf("bad multipart form: %s", err)
return
}
profile.tags = append(profile.tags, r.Form["tags[]"]...)
for k, v := range r.MultipartForm.File {
if v[0].Filename == "pprof-data" {
profile.files = append(profile.files, k)
}
}
}))
server := httptest.NewServer(&mockBackend{t: t, profiles: received})
defer server.Close()

t.Setenv("DD_PROFILING_WAIT_PROFILE", "1")
Expand All @@ -347,14 +385,14 @@ func TestAllUploaded(t *testing.T) {

validateProfile := func(profile profileMeta, seq uint64) {
expected := []string{
"data[cpu.pprof]",
"data[delta-block.pprof]",
"data[delta-heap.pprof]",
"data[delta-mutex.pprof]",
"data[goroutines.pprof]",
"data[goroutineswait.pprof]",
"cpu.pprof",
"delta-block.pprof",
"delta-heap.pprof",
"delta-mutex.pprof",
"goroutines.pprof",
"goroutineswait.pprof",
}
assert.ElementsMatch(t, expected, profile.files)
assert.ElementsMatch(t, expected, profile.event.Attachments)

assert.Contains(t, profile.tags, fmt.Sprintf("profile_seq:%d", seq))
}
Expand All @@ -364,18 +402,8 @@ func TestAllUploaded(t *testing.T) {
}

func TestCorrectTags(t *testing.T) {
got := make(chan []string)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var tags []string
defer func() {
got <- tags
}()
if err := r.ParseMultipartForm(50 << 20); err != nil {
t.Fatalf("bad multipart form: %s", err)
return
}
tags = append(tags, r.Form["tags[]"]...)
}))
got := make(chan profileMeta)
server := httptest.NewServer(&mockBackend{t: t, profiles: got})
defer server.Close()

Start(
Expand Down Expand Up @@ -404,9 +432,9 @@ func TestCorrectTags(t *testing.T) {
// are clobbered due to a bug caused by the same
// profiler-internal tag slice being appended to from different
// goroutines concurrently.
tags := <-got
p := <-got
for _, tag := range expected {
require.Contains(t, tags, tag)
require.Contains(t, p.tags, tag)
}
}
}
Expand Down
58 changes: 38 additions & 20 deletions profiler/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ package profiler
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"math/rand"
"mime/multipart"
"net/http"
"net/textproto"
"strings"
"time"

"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
Expand Down Expand Up @@ -123,41 +126,56 @@ func (p *profiler) doRequest(bat batch) error {
return errors.New(resp.Status)
}

type uploadEvent struct {
Start string `json:"start"`
End string `json:"end"`
Attachments []string `json:"attachments"`
Tags string `json:"tags_profiler"`
Family string `json:"family"`
Version string `json:"version"`
}

// encode encodes the profile as a multipart mime request.
func encode(bat batch, tags []string) (contentType string, body io.Reader, err error) {
var buf bytes.Buffer

mw := multipart.NewWriter(&buf)
// write all of the profile metadata (including some useless ones)
// with a small helper function that makes error tracking less verbose.
writeField := func(k, v string) {
if err == nil {
err = mw.WriteField(k, v)
}
}
writeField("version", "3")
writeField("family", "go")
writeField("start", bat.start.Format(time.RFC3339))
writeField("end", bat.end.Format(time.RFC3339))

if bat.host != "" {
writeField("tags[]", fmt.Sprintf("host:%s", bat.host))
tags = append(tags, fmt.Sprintf("host:%s", bat.host))
}
writeField("tags[]", "runtime:go")
for _, tag := range tags {
writeField("tags[]", tag)
}
if err != nil {
return "", nil, err
tags = append(tags, "runtime:go")

event := &uploadEvent{
Version: "4",
Family: "go",
Start: bat.start.Format(time.RFC3339),
End: bat.end.Format(time.RFC3339),
Tags: strings.Join(tags, ","),
}

for _, p := range bat.profiles {
formFile, err := mw.CreateFormFile(fmt.Sprintf("data[%s]", p.name), "pprof-data")
event.Attachments = append(event.Attachments, p.name)
f, err := mw.CreateFormFile(p.name, p.name)
if err != nil {
return "", nil, err
}
if _, err := formFile.Write(p.data); err != nil {
if _, err := f.Write(p.data); err != nil {
return "", nil, err
}
}

f, err := mw.CreatePart(textproto.MIMEHeader{
"Content-Disposition": []string{`form-data; name="event"; filename="event.json"`},
"Content-Type": []string{"application/json"},
})
if err != nil {
return "", nil, err
}
if err := json.NewEncoder(f).Encode(event); err != nil {
return "", nil, err
}

if err := mw.Close(); err != nil {
return "", nil, err
}
Expand Down

0 comments on commit 1e0594a

Please sign in to comment.