diff --git a/changelog/16184.txt b/changelog/16184.txt new file mode 100644 index 0000000000000..e7a8b065e3980 --- /dev/null +++ b/changelog/16184.txt @@ -0,0 +1,3 @@ +```release-note:improvement +core/activity: use monthly hyperloglogs to calculate new clients approximation for current month +``` \ No newline at end of file diff --git a/vault/activity_log.go b/vault/activity_log.go index b5cac367bc364..1450bef95cd96 100644 --- a/vault/activity_log.go +++ b/vault/activity_log.go @@ -367,53 +367,6 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for return nil } -// CreateOrFetchHyperlogLog creates a new hyperlogLog for each startTime (month) if it does not exist in storage. -// hyperlogLog is used here to solve count-distinct problem i.e, to count the number of distinct clients -// In activity log, hyperloglog is a sketch containing clientID's in a given month -func (a *ActivityLog) CreateOrFetchHyperlogLog(ctx context.Context, startTime time.Time) *hyperloglog.Sketch { - monthlyHLLPath := fmt.Sprintf("%s%d", distinctClientsBasePath, startTime.Unix()) - hll := hyperloglog.New() - a.logger.Trace("fetching hyperloglog ", "path", monthlyHLLPath) - data, err := a.view.Get(ctx, monthlyHLLPath) - if err != nil { - a.logger.Error("error fetching hyperloglog", "path", monthlyHLLPath, "error", err) - return hll - } - if data == nil { - a.logger.Trace("creating hyperloglog ", "path", monthlyHLLPath) - err = a.StoreHyperlogLog(ctx, startTime, hll) - if err != nil { - a.logger.Error("error storing hyperloglog", "path", monthlyHLLPath, "error", err) - return hll - } - } else { - err = hll.UnmarshalBinary(data.Value) - if err != nil { - a.logger.Error("error unmarshaling hyperloglog", "path", monthlyHLLPath, "error", err) - return hll - } - } - return hll -} - -// StoreHyperlogLog stores the hyperloglog (a sketch containing client IDs) for startTime (month) in storage -func (a *ActivityLog) StoreHyperlogLog(ctx context.Context, startTime time.Time, newHll *hyperloglog.Sketch) error { - monthlyHLLPath := fmt.Sprintf("%s%d", distinctClientsBasePath, startTime.Unix()) - a.logger.Trace("storing hyperloglog ", "path", monthlyHLLPath) - marshalledHll, err := newHll.MarshalBinary() - if err != nil { - return err - } - err = a.view.Put(ctx, &logical.StorageEntry{ - Key: monthlyHLLPath, - Value: marshalledHll, - }) - if err != nil { - return err - } - return nil -} - // :force: forces a save of tokens/entities even if the in-memory log is empty func (a *ActivityLog) saveCurrentSegmentInternal(ctx context.Context, force bool) error { entityPath := fmt.Sprintf("%s%d/%d", activityEntityBasePath, a.currentSegment.startTimestamp, a.currentSegment.clientSequenceNumber) @@ -1613,7 +1566,7 @@ func (a *ActivityLog) handleQuery(ctx context.Context, startTime, endTime time.T distinctEntitiesResponse := totalEntities if computePartial { - currentMonth, err := a.computeCurrentMonthForBillingPeriod(partialByMonth, startTime, endTime) + currentMonth, err := a.computeCurrentMonthForBillingPeriod(ctx, partialByMonth, startTime, endTime) if err != nil { return nil, err } @@ -2067,7 +2020,12 @@ func (a *ActivityLog) precomputedQueryWorker(ctx context.Context) error { break } - hyperloglog := a.CreateOrFetchHyperlogLog(ctx, startTime) + hyperloglog, err := a.CreateOrFetchHyperlogLog(ctx, startTime) + if err != nil { + // We were unable to create or fetch the hll, but we should still + // continue with our precomputation + a.logger.Warn("unable to create or fetch hyperloglog", "start time", startTime, "error", err) + } err = a.WalkEntitySegments(ctx, startTime, hyperloglog, walkEntities) if err != nil { a.logger.Warn("failed to load previous segments", "error", err) diff --git a/vault/activity_log_test.go b/vault/activity_log_test.go index 088134fc83d61..ca434fbb205ce 100644 --- a/vault/activity_log_test.go +++ b/vault/activity_log_test.go @@ -494,7 +494,7 @@ func TestActivityLog_StoreAndReadHyperloglog(t *testing.T) { if err != nil { t.Fatalf("error storing hyperloglog in storage: %v", err) } - fetchedHll := a.CreateOrFetchHyperlogLog(ctx, currentMonth) + fetchedHll, err := a.CreateOrFetchHyperlogLog(ctx, currentMonth) // check the distinct count stored from hll if fetchedHll.Estimate() != 4 { t.Fatalf("wrong number of distinct elements: expected: 5 actual: %v", fetchedHll.Estimate()) diff --git a/vault/activity_log_util_common.go b/vault/activity_log_util_common.go index c12abace2e9f8..aff819528df16 100644 --- a/vault/activity_log_util_common.go +++ b/vault/activity_log_util_common.go @@ -1,12 +1,156 @@ package vault import ( + "context" + "errors" + "fmt" "sort" "time" + "github.com/axiomhq/hyperloglog" + "github.com/hashicorp/vault/helper/timeutil" + "github.com/hashicorp/vault/sdk/logical" "github.com/hashicorp/vault/vault/activity" ) +type HLLGetter func(ctx context.Context, startTime time.Time) (*hyperloglog.Sketch, error) + +// computeCurrentMonthForBillingPeriod computes the current month's data with respect +// to a billing period. +func (a *ActivityLog) computeCurrentMonthForBillingPeriod(ctx context.Context, byMonth map[int64]*processMonth, startTime time.Time, endTime time.Time) (*activity.MonthRecord, error) { + return a.computeCurrentMonthForBillingPeriodInternal(ctx, byMonth, a.CreateOrFetchHyperlogLog, startTime, endTime) +} + +// CreateOrFetchHyperlogLog creates a new hyperlogLog for each startTime (month) if it does not exist in storage. +// hyperlogLog is used here to solve count-distinct problem i.e, to count the number of distinct clients +// In activity log, hyperloglog is a sketch containing clientID's in a given month +func (a *ActivityLog) CreateOrFetchHyperlogLog(ctx context.Context, startTime time.Time) (*hyperloglog.Sketch, error) { + monthlyHLLPath := fmt.Sprintf("%s%d", distinctClientsBasePath, startTime.Unix()) + hll := hyperloglog.New() + data, err := a.view.Get(ctx, monthlyHLLPath) + if err != nil { + // If there is no hll, we should log the error, as having this fire multiple times + // is a sign that something is wrong with hll store/get. However, this is not a + // critical failure (in fact it is expected during the first month rotation after + // this code is deployed), so we will not throw an error. + a.logger.Warn("fetch of hyperloglog threw an error at path", monthlyHLLPath, "error", err) + } + if data == nil { + a.logger.Trace("creating hyperloglog ", "path", monthlyHLLPath) + err = a.StoreHyperlogLog(ctx, startTime, hll) + if err != nil { + return hll, fmt.Errorf("error storing hyperloglog at path %s: error %w", monthlyHLLPath, err) + } + } else { + err = hll.UnmarshalBinary(data.Value) + if err != nil { + return hll, fmt.Errorf("error unmarshaling hyperloglog at path %s: error %w", monthlyHLLPath, err) + } + } + return hll, nil +} + +// StoreHyperlogLog stores the hyperloglog (a sketch containing client IDs) for startTime (month) in storage +func (a *ActivityLog) StoreHyperlogLog(ctx context.Context, startTime time.Time, newHll *hyperloglog.Sketch) error { + monthlyHLLPath := fmt.Sprintf("%s%d", distinctClientsBasePath, startTime.Unix()) + a.logger.Trace("storing hyperloglog ", "path", monthlyHLLPath) + marshalledHll, err := newHll.MarshalBinary() + if err != nil { + return err + } + err = a.view.Put(ctx, &logical.StorageEntry{ + Key: monthlyHLLPath, + Value: marshalledHll, + }) + if err != nil { + return err + } + return nil +} + +func (a *ActivityLog) computeCurrentMonthForBillingPeriodInternal(ctx context.Context, byMonth map[int64]*processMonth, hllGetFunc HLLGetter, startTime time.Time, endTime time.Time) (*activity.MonthRecord, error) { + // Fetch all hyperloglogs for months from startMonth to endMonth. If a month doesn't have an associated + // hll, warn and continue. + + // hllMonthlyTimestamp is the start time of the month corresponding to which a hyperloglog of that month's + // client data is stored. The path at which the hyperloglog for a month is stored containes this timestamp. + hllMonthlyTimestamp := timeutil.StartOfMonth(startTime) + billingPeriodHLL := hyperloglog.New() + for hllMonthlyTimestamp.Before(timeutil.StartOfMonth(endTime)) || hllMonthlyTimestamp.Equal(timeutil.StartOfMonth(endTime)) { + monthSketch, err := hllGetFunc(ctx, hllMonthlyTimestamp) + // If there's an error with the hyperloglog fetch, we should still deduplicate on + // the hlls that we have so we will warn that we couldn't find a hll for the month + // and continue. + if err != nil { + a.logger.Warn("no hyperloglog associated with timestamp", "timestamp", hllMonthlyTimestamp) + hllMonthlyTimestamp = timeutil.StartOfNextMonth(hllMonthlyTimestamp) + continue + } + // Union the monthly hll into the billing period's hll + err = billingPeriodHLL.Merge(monthSketch) + if err != nil { + // In this case we can't afford to fail silently. Since this error indicates + // data corruption, we should not try to do any further deduplication + return nil, err + } + hllMonthlyTimestamp = timeutil.StartOfNextMonth(hllMonthlyTimestamp) + } + + // Now we will add the clients for the current month to a copy of the billing period's hll to + // see how the cardinality grows. + billingPeriodHLLWithCurrentMonthEntityClients := billingPeriodHLL.Clone() + billingPeriodHLLWithCurrentMonthNonEntityClients := billingPeriodHLL.Clone() + + // There's at most one month of data here. We should validate this assumption explicitly + if len(byMonth) > 1 { + return nil, errors.New(fmt.Sprintf("multiple months of data found in partial month's client count breakdowns: %+v\n", byMonth)) + } + + totalEntities := 0 + totalNonEntities := 0 + for _, month := range byMonth { + + if month.NewClients == nil || month.NewClients.Counts == nil || month.Counts == nil { + return nil, errors.New("malformed current month used to calculate current month's activity") + } + + // Note that the following calculations assume that all clients seen are currently in + // the NewClients section of byMonth. It is best to explicitly check this, just verify + // our assumptions about the passed in byMonth argument. + if len(month.Counts.Entities) != len(month.NewClients.Counts.Entities) || + len(month.Counts.NonEntities) != len(month.NewClients.Counts.NonEntities) { + return nil, errors.New("current month clients cache assumes billing period") + } + + // All the clients for the current month are in the newClients section, initially. + // We need to deduplicate these clients across the billing period by adding them + // into the billing period hyperloglogs. + entities := month.NewClients.Counts.Entities + nonEntities := month.NewClients.Counts.NonEntities + if entities != nil { + for entityID := range entities { + billingPeriodHLLWithCurrentMonthEntityClients.Insert([]byte(entityID)) + totalEntities += 1 + } + } + if nonEntities != nil { + for nonEntityID := range nonEntities { + billingPeriodHLLWithCurrentMonthNonEntityClients.Insert([]byte(nonEntityID)) + totalNonEntities += 1 + } + } + } + // The number of new entities for the current month is approximately the size of the hll with + // the current month's entities minus the size of the initial billing period hll. + currentMonthNewEntities := billingPeriodHLLWithCurrentMonthEntityClients.Estimate() - billingPeriodHLL.Estimate() + currentMonthNewNonEntities := billingPeriodHLLWithCurrentMonthNonEntityClients.Estimate() - billingPeriodHLL.Estimate() + + return &activity.MonthRecord{ + NewClients: &activity.NewClientRecord{Counts: &activity.CountsRecord{EntityClients: int(currentMonthNewEntities), NonEntityClients: int(currentMonthNewNonEntities)}}, + Counts: &activity.CountsRecord{EntityClients: totalEntities, NonEntityClients: totalNonEntities}, + }, nil +} + // sortALResponseNamespaces sorts the namespaces for activity log responses. func (a *ActivityLog) sortALResponseNamespaces(byNamespaceResponse []*ResponseNamespace) { sort.Slice(byNamespaceResponse, func(i, j int) bool { @@ -106,15 +250,3 @@ func (a *ActivityLog) sortActivityLogMonthsResponse(months []*ResponseMonth) { } } } - -// TODO -// computeCurrentMonthForBillingPeriod computes the current month's data with respect -// to a billing period. This function is currently a stub with the bare minimum amount -// of data to get the pre-existing tests to pass. It will be filled out in a separate PR -// and this comment will be removed. -func (a *ActivityLog) computeCurrentMonthForBillingPeriod(byMonth map[int64]*processMonth, startTime time.Time, endTime time.Time) (*activity.MonthRecord, error) { - return &activity.MonthRecord{ - NewClients: &activity.NewClientRecord{Counts: &activity.CountsRecord{EntityClients: 0, NonEntityClients: 0}}, - Counts: &activity.CountsRecord{EntityClients: 0, NonEntityClients: 0}, - }, nil -} diff --git a/vault/activity_log_util_common_test.go b/vault/activity_log_util_common_test.go new file mode 100644 index 0000000000000..72addebce0fbf --- /dev/null +++ b/vault/activity_log_util_common_test.go @@ -0,0 +1,128 @@ +package vault + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/axiomhq/hyperloglog" + "github.com/hashicorp/vault/helper/timeutil" +) + +func Test_ActivityLog_ComputeCurrentMonthForBillingPeriodInternal(t *testing.T) { + // populate the first month with clients 1-10 + monthOneHLL := hyperloglog.New() + // populate the second month with clients 5-15 + monthTwoHLL := hyperloglog.New() + // populate the third month with clients 10-20 + monthThreeHLL := hyperloglog.New() + + for i := 0; i < 20; i++ { + clientID := []byte(fmt.Sprintf("client_%d", i)) + if i < 10 { + monthOneHLL.Insert(clientID) + } + if 5 <= i && i < 15 { + monthTwoHLL.Insert(clientID) + } + if 10 <= i && i < 20 { + monthThreeHLL.Insert(clientID) + } + } + mockHLLGetFunc := func(ctx context.Context, startTime time.Time) (*hyperloglog.Sketch, error) { + currMonthStart := timeutil.StartOfMonth(time.Now()) + if startTime.Equal(timeutil.MonthsPreviousTo(3, currMonthStart)) { + return monthThreeHLL, nil + } + if startTime.Equal(timeutil.MonthsPreviousTo(2, currMonthStart)) { + return monthTwoHLL, nil + } + if startTime.Equal(timeutil.MonthsPreviousTo(1, currMonthStart)) { + return monthOneHLL, nil + } + return nil, fmt.Errorf("bad start time") + } + + // Let's add 2 entities exclusive to month 1 (clients 0,1), + // 2 entities shared by month 1 and 2 (clients 5,6), + // 2 entities shared by month 2 and 3 (clients 10,11), and + // 2 entities exclusive to month 3 (15,16). Furthermore, we can add + // 3 new entities (clients 20,21, and 22). + entitiesStruct := make(map[string]struct{}, 0) + entitiesStruct["client_0"] = struct{}{} + entitiesStruct["client_1"] = struct{}{} + entitiesStruct["client_5"] = struct{}{} + entitiesStruct["client_6"] = struct{}{} + entitiesStruct["client_10"] = struct{}{} + entitiesStruct["client_11"] = struct{}{} + entitiesStruct["client_15"] = struct{}{} + entitiesStruct["client_16"] = struct{}{} + entitiesStruct["client_20"] = struct{}{} + entitiesStruct["client_21"] = struct{}{} + entitiesStruct["client_22"] = struct{}{} + + // We will add 3 nonentity clients from month 1 (clients 2,3,4), + // 3 shared by months 1 and 2 (7,8,9), + // 3 shared by months 2 and 3 (12,13,14), and + // 3 exclusive to month 3 (17,18,19). We will also + // add 4 new nonentity clients. + nonEntitiesStruct := make(map[string]struct{}, 0) + nonEntitiesStruct["client_2"] = struct{}{} + nonEntitiesStruct["client_3"] = struct{}{} + nonEntitiesStruct["client_4"] = struct{}{} + nonEntitiesStruct["client_7"] = struct{}{} + nonEntitiesStruct["client_8"] = struct{}{} + nonEntitiesStruct["client_9"] = struct{}{} + nonEntitiesStruct["client_12"] = struct{}{} + nonEntitiesStruct["client_13"] = struct{}{} + nonEntitiesStruct["client_14"] = struct{}{} + nonEntitiesStruct["client_17"] = struct{}{} + nonEntitiesStruct["client_18"] = struct{}{} + nonEntitiesStruct["client_19"] = struct{}{} + nonEntitiesStruct["client_23"] = struct{}{} + nonEntitiesStruct["client_24"] = struct{}{} + nonEntitiesStruct["client_25"] = struct{}{} + nonEntitiesStruct["client_26"] = struct{}{} + + counts := &processCounts{ + Entities: entitiesStruct, + NonEntities: nonEntitiesStruct, + } + + currentMonthClientsMap := make(map[int64]*processMonth, 1) + currentMonthClients := &processMonth{ + Counts: counts, + NewClients: &processNewClients{Counts: counts}, + } + // Technially I think currentMonthClientsMap should have the keys as + // unix timestamps, but for the purposes of the unit test it doesn't + // matter what the values actually are. + currentMonthClientsMap[0] = currentMonthClients + + core, _, _ := TestCoreUnsealed(t) + a := core.activityLog + + endTime := timeutil.StartOfMonth(time.Now()) + startTime := timeutil.MonthsPreviousTo(3, endTime) + + monthRecord, err := a.computeCurrentMonthForBillingPeriodInternal(context.Background(), currentMonthClientsMap, mockHLLGetFunc, startTime, endTime) + if err != nil { + t.Fatal(err) + } + + // We should have 11 entity clients and 16 nonentity clients, and 3 new entity clients + // and 4 new nonentity clients + if monthRecord.Counts.EntityClients != 11 { + t.Fatalf("wrong number of entity clients. Expected 11, got %d", monthRecord.Counts.EntityClients) + } + if monthRecord.Counts.NonEntityClients != 16 { + t.Fatalf("wrong number of non entity clients. Expected 16, got %d", monthRecord.Counts.NonEntityClients) + } + if monthRecord.NewClients.Counts.EntityClients != 3 { + t.Fatalf("wrong number of new entity clients. Expected 3, got %d", monthRecord.NewClients.Counts.EntityClients) + } + if monthRecord.NewClients.Counts.NonEntityClients != 4 { + t.Fatalf("wrong number of new non entity clients. Expected 4, got %d", monthRecord.NewClients.Counts.NonEntityClients) + } +}