Skip to content

Commit

Permalink
Merge pull request #76 from profefe/profiles-merge-by-id
Browse files Browse the repository at this point in the history
Update get profile API to allow merging profiles by IDs
  • Loading branch information
narqo committed Mar 7, 2020
2 parents 0a891f9 + 57c6201 commit 5ab457f
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 70 deletions.
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,19 @@ GET /api/0/profiles/<id>

- `id` - id of stored profile, returned with the request for meta information above

#### Merge a set of individual profiles into a single profile

```
GET /api/0/profiles/<id1>+<id2>+...
< 200 OK
< pprof.pb.gz
```

- `id1`, `id2` - ids of stored profiles

*Note, merging is possible only for profiles of the same type.*

### Get services for which profiling data is stored

```
Expand Down
49 changes: 29 additions & 20 deletions pkg/profefe/profiles_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package profefe
import (
"fmt"
"net/http"
"net/url"
"path"
"strings"

Expand All @@ -28,29 +29,25 @@ func NewProfilesHandler(logger *log.Logger, collector *Collector, querier *Queri

func (h *ProfilesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var (
handler func(http.ResponseWriter, *http.Request) error
urlPath = path.Clean(r.URL.Path)
err error
)

if urlPath == apiProfilesMergePath {
handler = h.HandleMergeProfile
} else if urlPath == apiProfilesPath {
if urlPath == apiProfilesPath {
switch r.Method {
case http.MethodPost:
handler = h.HandleCreateProfile
err = h.HandleCreateProfile(w, r)
case http.MethodGet:
handler = h.HandleFindProfiles
err = h.HandleFindProfiles(w, r)
}
} else if len(urlPath) > len(apiProfilesPath) {
handler = h.HandleGetProfile
}

var err error
if handler != nil {
err = handler(w, r)
} else if urlPath == apiProfilesMergePath {
err = h.HandleMergeProfiles(w, r)
} else if strings.HasPrefix(urlPath, apiProfilesPath) {
err = h.HandleGetProfile(w, r)
} else {
err = ErrNotFound
}

HandleErrorHTTP(h.logger, err, w, r)
}

Expand All @@ -71,22 +68,33 @@ func (h *ProfilesHandler) HandleCreateProfile(w http.ResponseWriter, r *http.Req
}

func (h *ProfilesHandler) HandleGetProfile(w http.ResponseWriter, r *http.Request) error {
pid := r.URL.Path[len(apiProfilesPath):] // id part of the path
pid = strings.Trim(pid, "/")
if pid == "" {
rawPids := r.URL.Path[len(apiProfilesPath):] // id part of the path
rawPids = strings.Trim(rawPids, "/")
if rawPids == "" {
return StatusError(http.StatusBadRequest, "no profile id", nil)
}

rawPids, err := url.PathUnescape(rawPids)
if err != nil {
return StatusError(http.StatusBadRequest, err.Error(), nil)
}

pids, err := profile.SplitIDs(rawPids)
if err != nil {
return err
}

w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Content-Disposition", fmt.Sprintf(`attachment; filename="%s"`, rawPids))

err := h.querier.GetProfileTo(r.Context(), w, profile.ID(pid))
err = h.querier.GetProfilesTo(r.Context(), w, pids)
if err == storage.ErrNotFound {
return ErrNotFound
} else if err == storage.ErrEmpty {
return ErrEmpty
} else if err != nil {
err = xerrors.Errorf("could not get profile by id %q: %w", pid, err)
return StatusError(http.StatusInternalServerError, fmt.Sprintf("failed to get profile by id %q", pid), err)
err = xerrors.Errorf("could not get profile by id %q: %w", rawPids, err)
return StatusError(http.StatusInternalServerError, fmt.Sprintf("failed to get profile by id %q", rawPids), err)
}
return nil
}
Expand All @@ -113,7 +121,7 @@ func (h *ProfilesHandler) HandleFindProfiles(w http.ResponseWriter, r *http.Requ
return nil
}

func (h *ProfilesHandler) HandleMergeProfile(w http.ResponseWriter, r *http.Request) error {
func (h *ProfilesHandler) HandleMergeProfiles(w http.ResponseWriter, r *http.Request) error {
params := &storage.FindProfilesParams{}
if err := parseFindProfileParams(params, r); err != nil {
return err
Expand All @@ -123,6 +131,7 @@ func (h *ProfilesHandler) HandleMergeProfile(w http.ResponseWriter, r *http.Requ
return StatusError(http.StatusMethodNotAllowed, "tracing profiles are not mergeable", nil)
}


w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Content-Disposition", fmt.Sprintf(`attachment; filename="%s"`, params.Type))

Expand Down
68 changes: 33 additions & 35 deletions pkg/profefe/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,51 +23,27 @@ func NewQuerier(logger *log.Logger, sr storage.Reader) *Querier {
}
}

func (q *Querier) GetProfileTo(ctx context.Context, dst io.Writer, pid profile.ID) error {
list, err := q.sr.ListProfiles(ctx, []profile.ID{pid})
func (q *Querier) GetProfilesTo(ctx context.Context, dst io.Writer, pids []profile.ID) error {
list, err := q.sr.ListProfiles(ctx, pids)
if err != nil {
return err
}
defer list.Close()

if !list.Next() {
return storage.ErrNotFound
}
pr, err := list.Profile()
if err != nil {
return err
}
_, err = io.Copy(dst, pr)
return err
}

func (q *Querier) FindProfiles(ctx context.Context, params *storage.FindProfilesParams) ([]Profile, error) {
metas, err := q.sr.FindProfiles(ctx, params)
if err != nil {
return nil, err
}

profModels := make([]Profile, 0, len(metas))
for _, meta := range metas {
profModels = append(profModels, ProfileFromProfileMeta(meta))
}
return profModels, nil
}

func (q *Querier) FindMergeProfileTo(ctx context.Context, dst io.Writer, params *storage.FindProfilesParams) error {
pids, err := q.sr.FindProfileIDs(ctx, params)
if err != nil {
if len(pids) == 1 {
if !list.Next() {
return storage.ErrNotFound
}
pr, err := list.Profile()
if err != nil {
return err
}
_, err = io.Copy(dst, pr)
return err
}

// TODO(narqo): limit maximum number of profiles to merge; as an example,
// Stackdriver merges up to 250 random profiles if query returns more than that
list, err := q.sr.ListProfiles(ctx, pids)
if err != nil {
return err
}
defer list.Close()

pps := make([]*pprofProfile.Profile, 0, len(pids))
for list.Next() {
// exit fast if context canceled
Expand Down Expand Up @@ -95,6 +71,28 @@ func (q *Querier) FindMergeProfileTo(ctx context.Context, dst io.Writer, params
return pp.Write(dst)
}

func (q *Querier) FindProfiles(ctx context.Context, params *storage.FindProfilesParams) ([]Profile, error) {
metas, err := q.sr.FindProfiles(ctx, params)
if err != nil {
return nil, err
}

profModels := make([]Profile, 0, len(metas))
for _, meta := range metas {
profModels = append(profModels, ProfileFromProfileMeta(meta))
}
return profModels, nil
}

func (q *Querier) FindMergeProfileTo(ctx context.Context, dst io.Writer, params *storage.FindProfilesParams) error {
pids, err := q.sr.FindProfileIDs(ctx, params)
if err != nil {
return err
}

return q.GetProfilesTo(ctx, dst, pids)
}

func (q *Querier) ListServices(ctx context.Context) ([]string, error) {
return q.sr.ListServices(ctx)
}
16 changes: 4 additions & 12 deletions pkg/profefe/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"io"
"io/ioutil"
"testing"
"time"

"github.com/profefe/profefe/pkg/log"
"github.com/profefe/profefe/pkg/profile"
Expand All @@ -14,29 +13,22 @@ import (
"go.uber.org/zap/zaptest"
)

func TestQuerier_FindMergeProfileTo_contextCancelled(t *testing.T) {
func TestQuerier_GetProfilesTo_contextCancelled(t *testing.T) {
list := &unboundProfileList{}
sr := &storage.StubReader{
FindProfileIDsFunc: func(ctx context.Context, _ *storage.FindProfilesParams) ([]profile.ID, error) {
return []profile.ID{profile.TestID}, nil
},
ListProfilesfunc: func(ctx context.Context, _ []profile.ID) (storage.ProfileList, error) {
ListProfilesFunc: func(ctx context.Context, _ []profile.ID) (storage.ProfileList, error) {
return list, nil
},
}

testLogger := log.New(zaptest.NewLogger(t))
querier := NewQuerier(testLogger, sr)

// because we cancel the context, the find call below will exit w/o reading the whole (unbound) profile list
// because we cancel the context, the get call below will exit w/o reading the whole (unbound) profile list
ctx, cancel := context.WithCancel(context.Background())
cancel()

params := &storage.FindProfilesParams{
Service: "test-server",
CreatedAtMin: time.Now(),
}
err := querier.FindMergeProfileTo(ctx, ioutil.Discard, params)
err := querier.GetProfilesTo(ctx, ioutil.Discard, []profile.ID{"p1", "p2"})
assert.Equal(t, context.Canceled, err)

assert.True(t, list.closed, "profile list must be closed")
Expand Down
35 changes: 35 additions & 0 deletions pkg/profile/profile.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,48 @@
package profile

import (
"fmt"
"strings"
"time"
)

const TestID = ID("bpc00mript33iv4net00")

type ID string

func JoinIDs(ids ...ID) (string, error) {
if len(ids) == 0 {
return "", nil
}
var buf strings.Builder
for n, id := range ids {
if n > 0 {
buf.WriteByte('+')
}
sid := string(id)
if strings.ContainsRune(sid, '+') {
return "", fmt.Errorf("could not join %v: found recerved char in %q", ids, id)
}
buf.WriteString(sid)
}
return buf.String(), nil
}

func SplitIDs(s string) ([]ID, error) {
if s == "" {
return nil, nil
}
ss := strings.Split(s, "+")
ids := make([]ID, len(ss))
for i, sid := range ss {
if sid == "" {
return nil, fmt.Errorf("could not split %q: found empty id at %d", s, i)
}
ids[i] = ID(sid)
}
return ids, nil
}

type Meta struct {
ProfileID ID `json:"profile_id"`
Service string `json:"service"`
Expand Down

0 comments on commit 5ab457f

Please sign in to comment.