Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port: Use Stored Hll to Compute New Clients For Current Month #16184

Merged
merged 2 commits into from Jun 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog/16184.txt
@@ -0,0 +1,3 @@
```release-note:improvement
core/activity: use monthly hyperloglogs to calculate new clients approximation for current month
```
56 changes: 7 additions & 49 deletions vault/activity_log.go
Expand Up @@ -367,53 +367,6 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for
return nil
}

// CreateOrFetchHyperlogLog creates a new hyperlogLog for each startTime (month) if it does not exist in storage.
// hyperlogLog is used here to solve count-distinct problem i.e, to count the number of distinct clients
// In activity log, hyperloglog is a sketch containing clientID's in a given month
func (a *ActivityLog) CreateOrFetchHyperlogLog(ctx context.Context, startTime time.Time) *hyperloglog.Sketch {
monthlyHLLPath := fmt.Sprintf("%s%d", distinctClientsBasePath, startTime.Unix())
hll := hyperloglog.New()
a.logger.Trace("fetching hyperloglog ", "path", monthlyHLLPath)
data, err := a.view.Get(ctx, monthlyHLLPath)
if err != nil {
a.logger.Error("error fetching hyperloglog", "path", monthlyHLLPath, "error", err)
return hll
}
if data == nil {
a.logger.Trace("creating hyperloglog ", "path", monthlyHLLPath)
err = a.StoreHyperlogLog(ctx, startTime, hll)
if err != nil {
a.logger.Error("error storing hyperloglog", "path", monthlyHLLPath, "error", err)
return hll
}
} else {
err = hll.UnmarshalBinary(data.Value)
if err != nil {
a.logger.Error("error unmarshaling hyperloglog", "path", monthlyHLLPath, "error", err)
return hll
}
}
return hll
}

// StoreHyperlogLog stores the hyperloglog (a sketch containing client IDs) for startTime (month) in storage
func (a *ActivityLog) StoreHyperlogLog(ctx context.Context, startTime time.Time, newHll *hyperloglog.Sketch) error {
monthlyHLLPath := fmt.Sprintf("%s%d", distinctClientsBasePath, startTime.Unix())
a.logger.Trace("storing hyperloglog ", "path", monthlyHLLPath)
marshalledHll, err := newHll.MarshalBinary()
if err != nil {
return err
}
err = a.view.Put(ctx, &logical.StorageEntry{
Key: monthlyHLLPath,
Value: marshalledHll,
})
if err != nil {
return err
}
return nil
}

// :force: forces a save of tokens/entities even if the in-memory log is empty
func (a *ActivityLog) saveCurrentSegmentInternal(ctx context.Context, force bool) error {
entityPath := fmt.Sprintf("%s%d/%d", activityEntityBasePath, a.currentSegment.startTimestamp, a.currentSegment.clientSequenceNumber)
Expand Down Expand Up @@ -1613,7 +1566,7 @@ func (a *ActivityLog) handleQuery(ctx context.Context, startTime, endTime time.T

distinctEntitiesResponse := totalEntities
if computePartial {
currentMonth, err := a.computeCurrentMonthForBillingPeriod(partialByMonth, startTime, endTime)
currentMonth, err := a.computeCurrentMonthForBillingPeriod(ctx, partialByMonth, startTime, endTime)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -2067,7 +2020,12 @@ func (a *ActivityLog) precomputedQueryWorker(ctx context.Context) error {
break
}

hyperloglog := a.CreateOrFetchHyperlogLog(ctx, startTime)
hyperloglog, err := a.CreateOrFetchHyperlogLog(ctx, startTime)
if err != nil {
// We were unable to create or fetch the hll, but we should still
// continue with our precomputation
a.logger.Warn("unable to create or fetch hyperloglog", "start time", startTime, "error", err)
}
err = a.WalkEntitySegments(ctx, startTime, hyperloglog, walkEntities)
if err != nil {
a.logger.Warn("failed to load previous segments", "error", err)
Expand Down
2 changes: 1 addition & 1 deletion vault/activity_log_test.go
Expand Up @@ -494,7 +494,7 @@ func TestActivityLog_StoreAndReadHyperloglog(t *testing.T) {
if err != nil {
t.Fatalf("error storing hyperloglog in storage: %v", err)
}
fetchedHll := a.CreateOrFetchHyperlogLog(ctx, currentMonth)
fetchedHll, err := a.CreateOrFetchHyperlogLog(ctx, currentMonth)
// check the distinct count stored from hll
if fetchedHll.Estimate() != 4 {
t.Fatalf("wrong number of distinct elements: expected: 5 actual: %v", fetchedHll.Estimate())
Expand Down
156 changes: 144 additions & 12 deletions vault/activity_log_util_common.go
@@ -1,12 +1,156 @@
package vault

import (
"context"
"errors"
"fmt"
"sort"
"time"

"github.com/axiomhq/hyperloglog"
"github.com/hashicorp/vault/helper/timeutil"
"github.com/hashicorp/vault/sdk/logical"
"github.com/hashicorp/vault/vault/activity"
)

type HLLGetter func(ctx context.Context, startTime time.Time) (*hyperloglog.Sketch, error)

// computeCurrentMonthForBillingPeriod computes the current month's data with respect
// to a billing period.
func (a *ActivityLog) computeCurrentMonthForBillingPeriod(ctx context.Context, byMonth map[int64]*processMonth, startTime time.Time, endTime time.Time) (*activity.MonthRecord, error) {
return a.computeCurrentMonthForBillingPeriodInternal(ctx, byMonth, a.CreateOrFetchHyperlogLog, startTime, endTime)
}

// CreateOrFetchHyperlogLog creates a new hyperlogLog for each startTime (month) if it does not exist in storage.
// hyperlogLog is used here to solve count-distinct problem i.e, to count the number of distinct clients
// In activity log, hyperloglog is a sketch containing clientID's in a given month
func (a *ActivityLog) CreateOrFetchHyperlogLog(ctx context.Context, startTime time.Time) (*hyperloglog.Sketch, error) {
monthlyHLLPath := fmt.Sprintf("%s%d", distinctClientsBasePath, startTime.Unix())
hll := hyperloglog.New()
data, err := a.view.Get(ctx, monthlyHLLPath)
if err != nil {
// If there is no hll, we should log the error, as having this fire multiple times
// is a sign that something is wrong with hll store/get. However, this is not a
// critical failure (in fact it is expected during the first month rotation after
// this code is deployed), so we will not throw an error.
a.logger.Warn("fetch of hyperloglog threw an error at path", monthlyHLLPath, "error", err)
}
if data == nil {
a.logger.Trace("creating hyperloglog ", "path", monthlyHLLPath)
err = a.StoreHyperlogLog(ctx, startTime, hll)
if err != nil {
return hll, fmt.Errorf("error storing hyperloglog at path %s: error %w", monthlyHLLPath, err)
}
} else {
err = hll.UnmarshalBinary(data.Value)
if err != nil {
return hll, fmt.Errorf("error unmarshaling hyperloglog at path %s: error %w", monthlyHLLPath, err)
}
}
return hll, nil
}

// StoreHyperlogLog stores the hyperloglog (a sketch containing client IDs) for startTime (month) in storage
func (a *ActivityLog) StoreHyperlogLog(ctx context.Context, startTime time.Time, newHll *hyperloglog.Sketch) error {
monthlyHLLPath := fmt.Sprintf("%s%d", distinctClientsBasePath, startTime.Unix())
a.logger.Trace("storing hyperloglog ", "path", monthlyHLLPath)
marshalledHll, err := newHll.MarshalBinary()
if err != nil {
return err
}
err = a.view.Put(ctx, &logical.StorageEntry{
Key: monthlyHLLPath,
Value: marshalledHll,
})
if err != nil {
return err
}
return nil
}

func (a *ActivityLog) computeCurrentMonthForBillingPeriodInternal(ctx context.Context, byMonth map[int64]*processMonth, hllGetFunc HLLGetter, startTime time.Time, endTime time.Time) (*activity.MonthRecord, error) {
// Fetch all hyperloglogs for months from startMonth to endMonth. If a month doesn't have an associated
// hll, warn and continue.

// hllMonthlyTimestamp is the start time of the month corresponding to which a hyperloglog of that month's
// client data is stored. The path at which the hyperloglog for a month is stored containes this timestamp.
hllMonthlyTimestamp := timeutil.StartOfMonth(startTime)
billingPeriodHLL := hyperloglog.New()
for hllMonthlyTimestamp.Before(timeutil.StartOfMonth(endTime)) || hllMonthlyTimestamp.Equal(timeutil.StartOfMonth(endTime)) {
monthSketch, err := hllGetFunc(ctx, hllMonthlyTimestamp)
// If there's an error with the hyperloglog fetch, we should still deduplicate on
// the hlls that we have so we will warn that we couldn't find a hll for the month
// and continue.
if err != nil {
a.logger.Warn("no hyperloglog associated with timestamp", "timestamp", hllMonthlyTimestamp)
hllMonthlyTimestamp = timeutil.StartOfNextMonth(hllMonthlyTimestamp)
continue
}
// Union the monthly hll into the billing period's hll
err = billingPeriodHLL.Merge(monthSketch)
if err != nil {
// In this case we can't afford to fail silently. Since this error indicates
// data corruption, we should not try to do any further deduplication
return nil, err
}
hllMonthlyTimestamp = timeutil.StartOfNextMonth(hllMonthlyTimestamp)
}

// Now we will add the clients for the current month to a copy of the billing period's hll to
// see how the cardinality grows.
billingPeriodHLLWithCurrentMonthEntityClients := billingPeriodHLL.Clone()
billingPeriodHLLWithCurrentMonthNonEntityClients := billingPeriodHLL.Clone()

// There's at most one month of data here. We should validate this assumption explicitly
if len(byMonth) > 1 {
return nil, errors.New(fmt.Sprintf("multiple months of data found in partial month's client count breakdowns: %+v\n", byMonth))
}

totalEntities := 0
totalNonEntities := 0
for _, month := range byMonth {

if month.NewClients == nil || month.NewClients.Counts == nil || month.Counts == nil {
return nil, errors.New("malformed current month used to calculate current month's activity")
}

// Note that the following calculations assume that all clients seen are currently in
// the NewClients section of byMonth. It is best to explicitly check this, just verify
// our assumptions about the passed in byMonth argument.
if len(month.Counts.Entities) != len(month.NewClients.Counts.Entities) ||
len(month.Counts.NonEntities) != len(month.NewClients.Counts.NonEntities) {
return nil, errors.New("current month clients cache assumes billing period")
}

// All the clients for the current month are in the newClients section, initially.
// We need to deduplicate these clients across the billing period by adding them
// into the billing period hyperloglogs.
entities := month.NewClients.Counts.Entities
nonEntities := month.NewClients.Counts.NonEntities
if entities != nil {
for entityID := range entities {
billingPeriodHLLWithCurrentMonthEntityClients.Insert([]byte(entityID))
totalEntities += 1
}
}
if nonEntities != nil {
for nonEntityID := range nonEntities {
billingPeriodHLLWithCurrentMonthNonEntityClients.Insert([]byte(nonEntityID))
totalNonEntities += 1
}
}
}
// The number of new entities for the current month is approximately the size of the hll with
// the current month's entities minus the size of the initial billing period hll.
currentMonthNewEntities := billingPeriodHLLWithCurrentMonthEntityClients.Estimate() - billingPeriodHLL.Estimate()
currentMonthNewNonEntities := billingPeriodHLLWithCurrentMonthNonEntityClients.Estimate() - billingPeriodHLL.Estimate()

return &activity.MonthRecord{
NewClients: &activity.NewClientRecord{Counts: &activity.CountsRecord{EntityClients: int(currentMonthNewEntities), NonEntityClients: int(currentMonthNewNonEntities)}},
Counts: &activity.CountsRecord{EntityClients: totalEntities, NonEntityClients: totalNonEntities},
}, nil
}

// sortALResponseNamespaces sorts the namespaces for activity log responses.
func (a *ActivityLog) sortALResponseNamespaces(byNamespaceResponse []*ResponseNamespace) {
sort.Slice(byNamespaceResponse, func(i, j int) bool {
Expand Down Expand Up @@ -106,15 +250,3 @@ func (a *ActivityLog) sortActivityLogMonthsResponse(months []*ResponseMonth) {
}
}
}

// TODO
// computeCurrentMonthForBillingPeriod computes the current month's data with respect
// to a billing period. This function is currently a stub with the bare minimum amount
// of data to get the pre-existing tests to pass. It will be filled out in a separate PR
// and this comment will be removed.
func (a *ActivityLog) computeCurrentMonthForBillingPeriod(byMonth map[int64]*processMonth, startTime time.Time, endTime time.Time) (*activity.MonthRecord, error) {
return &activity.MonthRecord{
NewClients: &activity.NewClientRecord{Counts: &activity.CountsRecord{EntityClients: 0, NonEntityClients: 0}},
Counts: &activity.CountsRecord{EntityClients: 0, NonEntityClients: 0},
}, nil
}
128 changes: 128 additions & 0 deletions vault/activity_log_util_common_test.go
@@ -0,0 +1,128 @@
package vault

import (
"context"
"fmt"
"testing"
"time"

"github.com/axiomhq/hyperloglog"
"github.com/hashicorp/vault/helper/timeutil"
)

func Test_ActivityLog_ComputeCurrentMonthForBillingPeriodInternal(t *testing.T) {
// populate the first month with clients 1-10
monthOneHLL := hyperloglog.New()
// populate the second month with clients 5-15
monthTwoHLL := hyperloglog.New()
// populate the third month with clients 10-20
monthThreeHLL := hyperloglog.New()

for i := 0; i < 20; i++ {
clientID := []byte(fmt.Sprintf("client_%d", i))
if i < 10 {
monthOneHLL.Insert(clientID)
}
if 5 <= i && i < 15 {
monthTwoHLL.Insert(clientID)
}
if 10 <= i && i < 20 {
monthThreeHLL.Insert(clientID)
}
}
mockHLLGetFunc := func(ctx context.Context, startTime time.Time) (*hyperloglog.Sketch, error) {
currMonthStart := timeutil.StartOfMonth(time.Now())
if startTime.Equal(timeutil.MonthsPreviousTo(3, currMonthStart)) {
return monthThreeHLL, nil
}
if startTime.Equal(timeutil.MonthsPreviousTo(2, currMonthStart)) {
return monthTwoHLL, nil
}
if startTime.Equal(timeutil.MonthsPreviousTo(1, currMonthStart)) {
return monthOneHLL, nil
}
return nil, fmt.Errorf("bad start time")
}

// Let's add 2 entities exclusive to month 1 (clients 0,1),
// 2 entities shared by month 1 and 2 (clients 5,6),
// 2 entities shared by month 2 and 3 (clients 10,11), and
// 2 entities exclusive to month 3 (15,16). Furthermore, we can add
// 3 new entities (clients 20,21, and 22).
entitiesStruct := make(map[string]struct{}, 0)
entitiesStruct["client_0"] = struct{}{}
entitiesStruct["client_1"] = struct{}{}
entitiesStruct["client_5"] = struct{}{}
entitiesStruct["client_6"] = struct{}{}
entitiesStruct["client_10"] = struct{}{}
entitiesStruct["client_11"] = struct{}{}
entitiesStruct["client_15"] = struct{}{}
entitiesStruct["client_16"] = struct{}{}
entitiesStruct["client_20"] = struct{}{}
entitiesStruct["client_21"] = struct{}{}
entitiesStruct["client_22"] = struct{}{}

// We will add 3 nonentity clients from month 1 (clients 2,3,4),
// 3 shared by months 1 and 2 (7,8,9),
// 3 shared by months 2 and 3 (12,13,14), and
// 3 exclusive to month 3 (17,18,19). We will also
// add 4 new nonentity clients.
nonEntitiesStruct := make(map[string]struct{}, 0)
nonEntitiesStruct["client_2"] = struct{}{}
nonEntitiesStruct["client_3"] = struct{}{}
nonEntitiesStruct["client_4"] = struct{}{}
nonEntitiesStruct["client_7"] = struct{}{}
nonEntitiesStruct["client_8"] = struct{}{}
nonEntitiesStruct["client_9"] = struct{}{}
nonEntitiesStruct["client_12"] = struct{}{}
nonEntitiesStruct["client_13"] = struct{}{}
nonEntitiesStruct["client_14"] = struct{}{}
nonEntitiesStruct["client_17"] = struct{}{}
nonEntitiesStruct["client_18"] = struct{}{}
nonEntitiesStruct["client_19"] = struct{}{}
nonEntitiesStruct["client_23"] = struct{}{}
nonEntitiesStruct["client_24"] = struct{}{}
nonEntitiesStruct["client_25"] = struct{}{}
nonEntitiesStruct["client_26"] = struct{}{}

counts := &processCounts{
Entities: entitiesStruct,
NonEntities: nonEntitiesStruct,
}

currentMonthClientsMap := make(map[int64]*processMonth, 1)
currentMonthClients := &processMonth{
Counts: counts,
NewClients: &processNewClients{Counts: counts},
}
// Technially I think currentMonthClientsMap should have the keys as
// unix timestamps, but for the purposes of the unit test it doesn't
// matter what the values actually are.
currentMonthClientsMap[0] = currentMonthClients

core, _, _ := TestCoreUnsealed(t)
a := core.activityLog

endTime := timeutil.StartOfMonth(time.Now())
startTime := timeutil.MonthsPreviousTo(3, endTime)

monthRecord, err := a.computeCurrentMonthForBillingPeriodInternal(context.Background(), currentMonthClientsMap, mockHLLGetFunc, startTime, endTime)
if err != nil {
t.Fatal(err)
}

// We should have 11 entity clients and 16 nonentity clients, and 3 new entity clients
// and 4 new nonentity clients
if monthRecord.Counts.EntityClients != 11 {
t.Fatalf("wrong number of entity clients. Expected 11, got %d", monthRecord.Counts.EntityClients)
}
if monthRecord.Counts.NonEntityClients != 16 {
t.Fatalf("wrong number of non entity clients. Expected 16, got %d", monthRecord.Counts.NonEntityClients)
}
if monthRecord.NewClients.Counts.EntityClients != 3 {
t.Fatalf("wrong number of new entity clients. Expected 3, got %d", monthRecord.NewClients.Counts.EntityClients)
}
if monthRecord.NewClients.Counts.NonEntityClients != 4 {
t.Fatalf("wrong number of new non entity clients. Expected 4, got %d", monthRecord.NewClients.Counts.NonEntityClients)
}
}