diff --git a/changelog/16146.txt b/changelog/16146.txt new file mode 100644 index 0000000000000..39086b3b044f4 --- /dev/null +++ b/changelog/16146.txt @@ -0,0 +1,3 @@ +```release-note:improvement +core/activity: generate hyperloglogs containing clientIds for each month during precomputation +``` \ No newline at end of file diff --git a/go.mod b/go.mod index adb94e2236262..522f866b6384a 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,7 @@ require ( github.com/armon/go-radix v1.0.0 github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a github.com/aws/aws-sdk-go v1.43.4 + github.com/axiomhq/hyperloglog v0.0.0-20220105174342-98591331716a github.com/cenkalti/backoff/v3 v3.2.2 github.com/chrismalek/oktasdk-go v0.0.0-20181212195951-3430665dfaa0 github.com/client9/misspell v0.3.4 @@ -253,6 +254,7 @@ require ( github.com/couchbase/gocbcore/v10 v10.0.4 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/denverdino/aliyungo v0.0.0-20190125010748-a747050bb1ba // indirect + github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc // indirect github.com/digitalocean/godo v1.7.5 // indirect github.com/dimchansky/utfbom v1.1.1 // indirect github.com/docker/cli v20.10.9+incompatible // indirect diff --git a/go.sum b/go.sum index 30e03edb4de41..ee4183dc4924d 100644 --- a/go.sum +++ b/go.sum @@ -274,6 +274,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.6.1 h1:1Pls85C5CFjhE3aH+h85/hyAk89kQ github.com/aws/aws-sdk-go-v2/service/sts v1.6.1/go.mod h1:hLZ/AnkIKHLuPGjEiyghNEdvJ2PP0MgOxcmv9EBJ4xs= github.com/aws/smithy-go v1.7.0 h1:+cLHMRrDZvQ4wk+KuQ9yH6eEg6KZEJ9RI2IkDqnygCg= github.com/aws/smithy-go v1.7.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= +github.com/axiomhq/hyperloglog v0.0.0-20220105174342-98591331716a h1:eqjiAL3qooftPm8b9C1GsSSRcmlw7iOva8vdBTmV2PY= +github.com/axiomhq/hyperloglog v0.0.0-20220105174342-98591331716a/go.mod h1:2stgcRjl6QmW+gU2h5E7BQXg4HU0gzxKWDuT5HviN9s= github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f h1:ZNv7On9kyUzm7fvRZumSyy/IUiSC7AzL0I1jKKtwooA= github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f/go.mod h1:AuiFmCCPBSrqvVMvuqFuk0qogytodnVFVSN5CeJB8Gc= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= @@ -500,6 +502,8 @@ github.com/denverdino/aliyungo v0.0.0-20190125010748-a747050bb1ba h1:p6poVbjHDkK github.com/denverdino/aliyungo v0.0.0-20190125010748-a747050bb1ba/go.mod h1:dV8lFg6daOBZbT6/BDGIz6Y3WFGn8juu6G+CQ6LHtl0= github.com/dgrijalva/jwt-go v0.0.0-20170104182250-a601269ab70c/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= +github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc h1:8WFBn63wegobsYAX0YjD+8suexZDga5CctH4CCTx2+8= +github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/digitalocean/godo v1.7.5 h1:JOQbAO6QT1GGjor0doT0mXefX2FgUDPOpYh2RaXA+ko= github.com/digitalocean/godo v1.7.5/go.mod h1:h6faOIcZ8lWIwNQ+DN7b3CgX4Kwby5T+nbpNqkUIozU= @@ -1095,6 +1099,7 @@ github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU= github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/influxdata/influxdb v1.7.6/go.mod h1:qZna6X/4elxqT3yI9iZYdZrWWdeFOOprn86kgg4+IzY= github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab h1:HqW4xhhynfjrtEiiSGcQUd6vrK23iMam1FO8rI7mwig= github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= github.com/j-keck/arping v0.0.0-20160618110441-2cf9dc699c56/go.mod h1:ymszkNOg6tORTn+6F6j+Jc8TOr5osrynvN6ivFWZ2GA= diff --git a/vault/activity_log.go b/vault/activity_log.go index e11c7c483a634..0af4fc4ed8a2b 100644 --- a/vault/activity_log.go +++ b/vault/activity_log.go @@ -16,6 +16,7 @@ import ( "time" "unicode/utf8" + "github.com/axiomhq/hyperloglog" "github.com/golang/protobuf/proto" log "github.com/hashicorp/go-hclog" "github.com/hashicorp/vault/helper/metricsutil" @@ -36,6 +37,9 @@ const ( activityConfigKey = "config" activityIntentLogKey = "endofmonth" + // sketch for each month that stores hash of client ids + distinctClientsBasePath = "log/distinctclients/" + // for testing purposes (public as needed) ActivityLogPrefix = "sys/counters/activity/log/" ActivityPrefix = "sys/counters/activity/" @@ -363,6 +367,53 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for return nil } +// CreateOrFetchHyperlogLog creates a new hyperlogLog for each startTime (month) if it does not exist in storage. +// hyperlogLog is used here to solve count-distinct problem i.e, to count the number of distinct clients +// In activity log, hyperloglog is a sketch containing clientID's in a given month +func (a *ActivityLog) CreateOrFetchHyperlogLog(ctx context.Context, startTime time.Time) *hyperloglog.Sketch { + monthlyHLLPath := fmt.Sprintf("%s%d", distinctClientsBasePath, startTime.Unix()) + hll := hyperloglog.New() + a.logger.Trace("fetching hyperloglog ", "path", monthlyHLLPath) + data, err := a.view.Get(ctx, monthlyHLLPath) + if err != nil { + a.logger.Error("error fetching hyperloglog", "path", monthlyHLLPath, "error", err) + return hll + } + if data == nil { + a.logger.Trace("creating hyperloglog ", "path", monthlyHLLPath) + err = a.StoreHyperlogLog(ctx, startTime, hll) + if err != nil { + a.logger.Error("error storing hyperloglog", "path", monthlyHLLPath, "error", err) + return hll + } + } else { + err = hll.UnmarshalBinary(data.Value) + if err != nil { + a.logger.Error("error unmarshaling hyperloglog", "path", monthlyHLLPath, "error", err) + return hll + } + } + return hll +} + +// StoreHyperlogLog stores the hyperloglog (a sketch containing client IDs) for startTime (month) in storage +func (a *ActivityLog) StoreHyperlogLog(ctx context.Context, startTime time.Time, newHll *hyperloglog.Sketch) error { + monthlyHLLPath := fmt.Sprintf("%s%d", distinctClientsBasePath, startTime.Unix()) + a.logger.Trace("storing hyperloglog ", "path", monthlyHLLPath) + marshalledHll, err := newHll.MarshalBinary() + if err != nil { + return err + } + err = a.view.Put(ctx, &logical.StorageEntry{ + Key: monthlyHLLPath, + Value: marshalledHll, + }) + if err != nil { + return err + } + return nil +} + // :force: forces a save of tokens/entities even if the in-memory log is empty func (a *ActivityLog) saveCurrentSegmentInternal(ctx context.Context, force bool) error { entityPath := fmt.Sprintf("%s%d/%d", activityEntityBasePath, a.currentSegment.startTimestamp, a.currentSegment.clientSequenceNumber) @@ -515,7 +566,7 @@ func (a *ActivityLog) getLastEntitySegmentNumber(ctx context.Context, startTime } // WalkEntitySegments loads each of the entity segments for a particular start time -func (a *ActivityLog) WalkEntitySegments(ctx context.Context, startTime time.Time, walkFn func(*activity.EntityActivityLog, time.Time) error) error { +func (a *ActivityLog) WalkEntitySegments(ctx context.Context, startTime time.Time, hll *hyperloglog.Sketch, walkFn func(*activity.EntityActivityLog, time.Time, *hyperloglog.Sketch) error) error { basePath := activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" pathList, err := a.view.List(ctx, basePath) if err != nil { @@ -537,7 +588,7 @@ func (a *ActivityLog) WalkEntitySegments(ctx context.Context, startTime time.Tim if err != nil { return fmt.Errorf("unable to parse segment %v%v: %w", basePath, path, err) } - err = walkFn(out, startTime) + err = walkFn(out, startTime, hll) if err != nil { return fmt.Errorf("unable to walk entities: %w", err) } @@ -2069,10 +2120,21 @@ func (a *ActivityLog) precomputedQueryWorker(ctx context.Context) error { byNamespace := make(map[string]*processByNamespace) byMonth := make(map[int64]*processMonth) - walkEntities := func(l *activity.EntityActivityLog, startTime time.Time) error { + walkEntities := func(l *activity.EntityActivityLog, startTime time.Time, hll *hyperloglog.Sketch) error { for _, e := range l.Clients { + processClientRecord(e, byNamespace, byMonth, startTime) + // We maintain an hyperloglog for each month + // hyperloglog is a sketch (hyperloglog data-structure) containing client ID's in a given month + // hyperloglog is used in activity log to get the approximate number new clients in the current billing month + // by counting the number of distinct clients in all the months including current month + // (this can be done by merging the hyperloglog all months with current month hyperloglog) + // and subtracting the number of distinct clients in the current month + // NOTE: current month here is not the month of startTime but the time period from the start of the current month, + // up until the time that this request was made. + hll.Insert([]byte(e.ClientID)) + // The byMonth map will be filled in the reverse order of time. For // example, if the billing period is from Jan to June, the byMonth // will be filled for June first, May next and so on till Jan. When @@ -2144,11 +2206,17 @@ func (a *ActivityLog) precomputedQueryWorker(ctx context.Context) error { break } - err = a.WalkEntitySegments(ctx, startTime, walkEntities) + hyperloglog := a.CreateOrFetchHyperlogLog(ctx, startTime) + err = a.WalkEntitySegments(ctx, startTime, hyperloglog, walkEntities) if err != nil { a.logger.Warn("failed to load previous segments", "error", err) return err } + // Store the hyperloglog + err = a.StoreHyperlogLog(ctx, startTime, hyperloglog) + if err != nil { + a.logger.Warn("failed to store hyperloglog for month", "start time", startTime, "error", err) + } err = a.WalkTokenSegments(ctx, startTime, walkTokens) if err != nil { a.logger.Warn("failed to load previous token counts", "error", err) @@ -2646,7 +2714,8 @@ func (a *ActivityLog) writeExport(ctx context.Context, rw http.ResponseWriter, f a.logger.Info("starting activity log export", "start_time", startTime, "end_time", endTime, "format", format) dedupedIds := make(map[string]struct{}) - walkEntities := func(l *activity.EntityActivityLog, startTime time.Time) error { + + walkEntities := func(l *activity.EntityActivityLog, startTime time.Time, hll *hyperloglog.Sketch) error { for _, e := range l.Clients { if _, ok := dedupedIds[e.ClientID]; ok { continue @@ -2663,8 +2732,9 @@ func (a *ActivityLog) writeExport(ctx context.Context, rw http.ResponseWriter, f } // For each month in the filtered list walk all the log segments + for _, startTime := range filteredList { - err := a.WalkEntitySegments(ctx, startTime, walkEntities) + err := a.WalkEntitySegments(ctx, startTime, nil, walkEntities) if err != nil { a.logger.Error("failed to load segments for export", "error", err) return fmt.Errorf("failed to load segments for export: %w", err) diff --git a/vault/activity_log_test.go b/vault/activity_log_test.go index ff0b700e35f4e..088134fc83d61 100644 --- a/vault/activity_log_test.go +++ b/vault/activity_log_test.go @@ -17,6 +17,7 @@ import ( "testing" "time" + "github.com/axiomhq/hyperloglog" "github.com/go-test/deep" "github.com/golang/protobuf/proto" "github.com/hashicorp/vault/helper/constants" @@ -472,6 +473,34 @@ func TestActivityLog_SaveEntitiesToStorage(t *testing.T) { expectedEntityIDs(t, out, ids) } +// Test to check store hyperloglog and fetch hyperloglog from storage +func TestActivityLog_StoreAndReadHyperloglog(t *testing.T) { + core, _, _ := TestCoreUnsealed(t) + ctx := context.Background() + + a := core.activityLog + a.SetStandbyEnable(ctx, true) + a.SetStartTimestamp(time.Now().Unix()) // set a nonzero segment + currentMonth := timeutil.StartOfMonth(time.Now()) + currentMonthHll := hyperloglog.New() + currentMonthHll.Insert([]byte("a")) + currentMonthHll.Insert([]byte("a")) + currentMonthHll.Insert([]byte("b")) + currentMonthHll.Insert([]byte("c")) + currentMonthHll.Insert([]byte("d")) + currentMonthHll.Insert([]byte("d")) + + err := a.StoreHyperlogLog(ctx, currentMonth, currentMonthHll) + if err != nil { + t.Fatalf("error storing hyperloglog in storage: %v", err) + } + fetchedHll := a.CreateOrFetchHyperlogLog(ctx, currentMonth) + // check the distinct count stored from hll + if fetchedHll.Estimate() != 4 { + t.Fatalf("wrong number of distinct elements: expected: 5 actual: %v", fetchedHll.Estimate()) + } +} + func TestModifyResponseMonthsNilAppend(t *testing.T) { end := time.Now().UTC() start := timeutil.StartOfMonth(end).AddDate(0, -5, 0)