diff --git a/vault/logical_system_activity_write_testonly.go b/vault/logical_system_activity_write_testonly.go index 4186ca14d7c3d..1ddca629b7eb5 100644 --- a/vault/logical_system_activity_write_testonly.go +++ b/vault/logical_system_activity_write_testonly.go @@ -8,6 +8,7 @@ package vault import ( "context" "fmt" + "io" "sync" "time" @@ -315,7 +316,23 @@ func (m *multipleMonthsActivityClients) addRepeatedClients(monthsAgo int32, c *g func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[generation.WriteOptions]struct{}, activityLog *ActivityLog) ([]string, error) { now := timeutil.StartOfMonth(time.Now().UTC()) paths := []string{} + + _, writePQ := opts[generation.WriteOptions_WRITE_PRECOMPUTED_QUERIES] + _, writeDistinctClients := opts[generation.WriteOptions_WRITE_DISTINCT_CLIENTS] + + pqOpts := pqOptions{} + if writePQ || writeDistinctClients { + pqOpts.byNamespace = make(map[string]*processByNamespace) + pqOpts.byMonth = make(map[int64]*processMonth) + pqOpts.activePeriodEnd = m.latestTimestamp(now) + pqOpts.endTime = timeutil.EndOfMonth(pqOpts.activePeriodEnd) + pqOpts.activePeriodStart = m.earliestTimestamp(now) + } + for i, month := range m.months { + if month.generationParameters == nil { + continue + } var timestamp time.Time if i > 0 { timestamp = timeutil.StartOfMonth(timeutil.MonthsPreviousTo(i, now)) @@ -344,6 +361,14 @@ func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[gene paths = append(paths, entityPath) } } + + if writePQ || writeDistinctClients { + reader := newProtoSegmentReader(segments) + err = activityLog.segmentToPrecomputedQuery(ctx, timestamp, reader, pqOpts) + if err != nil { + return nil, err + } + } } wg := sync.WaitGroup{} err := activityLog.refreshFromStoredLog(ctx, &wg, now) @@ -353,6 +378,25 @@ func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[gene return paths, nil } +func (m *multipleMonthsActivityClients) latestTimestamp(now time.Time) time.Time { + for i, month := range m.months { + if month.generationParameters != nil { + return timeutil.StartOfMonth(timeutil.MonthsPreviousTo(i, now)) + } + } + return time.Time{} +} + +func (m *multipleMonthsActivityClients) earliestTimestamp(now time.Time) time.Time { + for i := len(m.months) - 1; i >= 0; i-- { + month := m.months[i] + if month.generationParameters != nil { + return timeutil.StartOfMonth(timeutil.MonthsPreviousTo(i, now)) + } + } + return time.Time{} +} + func newMultipleMonthsActivityClients(numberOfMonths int) *multipleMonthsActivityClients { m := &multipleMonthsActivityClients{ months: make([]*singleMonthActivityClients, numberOfMonths), @@ -364,3 +408,34 @@ func newMultipleMonthsActivityClients(numberOfMonths int) *multipleMonthsActivit } return m } + +func newProtoSegmentReader(segments map[int][]*activity.EntityRecord) SegmentReader { + allRecords := make([][]*activity.EntityRecord, 0, len(segments)) + for _, records := range segments { + if segments == nil { + continue + } + allRecords = append(allRecords, records) + } + return &sliceSegmentReader{ + records: allRecords, + } +} + +type sliceSegmentReader struct { + records [][]*activity.EntityRecord + i int +} + +func (p *sliceSegmentReader) ReadToken(ctx context.Context) (*activity.TokenCount, error) { + return nil, io.EOF +} + +func (p *sliceSegmentReader) ReadEntity(ctx context.Context) (*activity.EntityActivityLog, error) { + if p.i == len(p.records) { + return nil, io.EOF + } + record := p.records[p.i] + p.i++ + return &activity.EntityActivityLog{Clients: record}, nil +} diff --git a/vault/logical_system_activity_write_testonly_test.go b/vault/logical_system_activity_write_testonly_test.go index b9b1a939a8c5a..f104d82ad7004 100644 --- a/vault/logical_system_activity_write_testonly_test.go +++ b/vault/logical_system_activity_write_testonly_test.go @@ -9,8 +9,10 @@ import ( "context" "sort" "testing" + "time" "github.com/hashicorp/vault/helper/namespace" + "github.com/hashicorp/vault/helper/timeutil" "github.com/hashicorp/vault/sdk/logical" "github.com/hashicorp/vault/vault/activity" "github.com/hashicorp/vault/vault/activity/generation" @@ -441,136 +443,165 @@ func Test_singleMonthActivityClients_populateSegments(t *testing.T) { } } -// Test_multipleMonthsActivityClients_write_entities writes 4 months of data -// splitting some months across segments and using empty segments and skipped -// segments. Entities are written and then storage is queried. The test verifies -// that the correct timestamps are present in the activity log and that the correct -// segment numbers for each month contain the correct number of clients -func Test_multipleMonthsActivityClients_write_entities(t *testing.T) { +// Test_handleActivityWriteData writes 4 months of data splitting some months +// across segments and using empty segments and skipped segments. Entities and +// precomputed queries are written. written and then storage is queried. The +// test verifies that the correct timestamps are present in the activity log and +// that the correct segment numbers for each month contain the correct number of +// clients +func Test_handleActivityWriteData(t *testing.T) { index5 := int32(5) index4 := int32(4) - data := &generation.ActivityLogMockInput{ - Write: []generation.WriteOptions{ - generation.WriteOptions_WRITE_ENTITIES, + data := []*generation.Data{ + { + // segments: 0:[x,y], 1:[z] + Month: &generation.Data_MonthsAgo{MonthsAgo: 3}, + Clients: &generation.Data_All{All: &generation.Clients{Clients: []*generation.Client{{Count: 3}}}}, + NumSegments: 2, }, - Data: []*generation.Data{ - { - // segments: 0:[x,y], 1:[z] - Month: &generation.Data_MonthsAgo{MonthsAgo: 3}, - Clients: &generation.Data_All{All: &generation.Clients{Clients: []*generation.Client{{Count: 3}}}}, - NumSegments: 2, - }, - { - // segments: 1:[a,b,c], 2:[d,e] - Month: &generation.Data_MonthsAgo{MonthsAgo: 2}, - Clients: &generation.Data_All{All: &generation.Clients{Clients: []*generation.Client{{Count: 5}}}}, - NumSegments: 3, - SkipSegmentIndexes: []int32{0}, + { + // segments: 1:[a,b,c], 2:[d,e] + Month: &generation.Data_MonthsAgo{MonthsAgo: 2}, + Clients: &generation.Data_All{All: &generation.Clients{Clients: []*generation.Client{{Count: 5}}}}, + NumSegments: 3, + SkipSegmentIndexes: []int32{0}, + }, + { + // segments: 5:[f,g] + Month: &generation.Data_MonthsAgo{MonthsAgo: 1}, + Clients: &generation.Data_Segments{ + Segments: &generation.Segments{Segments: []*generation.Segment{{ + SegmentIndex: &index5, + Clients: &generation.Clients{Clients: []*generation.Client{{Count: 2}}}, + }}}, }, - { - // segments: 5:[f,g] - Month: &generation.Data_MonthsAgo{MonthsAgo: 1}, - Clients: &generation.Data_Segments{ - Segments: &generation.Segments{Segments: []*generation.Segment{{ + }, + { + // segments: 1:[], 2:[], 4:[n], 5:[o] + Month: &generation.Data_CurrentMonth{}, + EmptySegmentIndexes: []int32{1, 2}, + Clients: &generation.Data_Segments{ + Segments: &generation.Segments{Segments: []*generation.Segment{ + { SegmentIndex: &index5, - Clients: &generation.Clients{Clients: []*generation.Client{{Count: 2}}}, - }}}, - }, - }, - { - // segments: 1:[], 2:[], 4:[n], 5:[o] - Month: &generation.Data_CurrentMonth{}, - EmptySegmentIndexes: []int32{1, 2}, - Clients: &generation.Data_Segments{ - Segments: &generation.Segments{Segments: []*generation.Segment{ - { - SegmentIndex: &index5, - Clients: &generation.Clients{Clients: []*generation.Client{{Count: 1}}}, - }, - { - SegmentIndex: &index4, - Clients: &generation.Clients{Clients: []*generation.Client{{Count: 1}}}, - }, - }}, - }, + Clients: &generation.Clients{Clients: []*generation.Client{{Count: 1}}}, + }, + { + SegmentIndex: &index4, + Clients: &generation.Clients{Clients: []*generation.Client{{Count: 1}}}, + }, + }}, }, }, } - core, _, _ := TestCoreUnsealed(t) - marshaled, err := protojson.Marshal(data) - require.NoError(t, err) - req := logical.TestRequest(t, logical.CreateOperation, "internal/counters/activity/write") - req.Data = map[string]interface{}{"input": string(marshaled)} - resp, err := core.systemBackend.HandleRequest(namespace.RootContext(nil), req) - require.NoError(t, err) - paths := resp.Data["paths"].([]string) - require.Len(t, paths, 9) + t.Run("write entitites", func(t *testing.T) { + core, _, _ := TestCoreUnsealed(t) + marshaled, err := protojson.Marshal(&generation.ActivityLogMockInput{ + Data: data, + Write: []generation.WriteOptions{generation.WriteOptions_WRITE_ENTITIES}, + }) + require.NoError(t, err) + req := logical.TestRequest(t, logical.CreateOperation, "internal/counters/activity/write") + req.Data = map[string]interface{}{"input": string(marshaled)} + resp, err := core.systemBackend.HandleRequest(namespace.RootContext(nil), req) + require.NoError(t, err) + paths := resp.Data["paths"].([]string) + require.Len(t, paths, 9) - times, err := core.activityLog.availableLogs(context.Background()) - require.NoError(t, err) - require.Len(t, times, 4) + times, err := core.activityLog.availableLogs(context.Background()) + require.NoError(t, err) + require.Len(t, times, 4) - sortPaths := func(monthPaths []string) { - sort.Slice(monthPaths, func(i, j int) bool { - iVal, _ := parseSegmentNumberFromPath(monthPaths[i]) - jVal, _ := parseSegmentNumberFromPath(monthPaths[j]) - return iVal < jVal - }) - } + sortPaths := func(monthPaths []string) { + sort.Slice(monthPaths, func(i, j int) bool { + iVal, _ := parseSegmentNumberFromPath(monthPaths[i]) + jVal, _ := parseSegmentNumberFromPath(monthPaths[j]) + return iVal < jVal + }) + } - month0Paths := paths[0:4] - month1Paths := paths[4:5] - month2Paths := paths[5:7] - month3Paths := paths[7:9] - sortPaths(month0Paths) - sortPaths(month1Paths) - sortPaths(month2Paths) - sortPaths(month3Paths) - entities := func(paths []string) map[int][]*activity.EntityRecord { - segments := make(map[int][]*activity.EntityRecord) - for _, path := range paths { - segmentNum, _ := parseSegmentNumberFromPath(path) - entry, err := core.activityLog.view.Get(context.Background(), path) - require.NoError(t, err) - if entry == nil { - segments[segmentNum] = []*activity.EntityRecord{} - continue + month0Paths := paths[0:4] + month1Paths := paths[4:5] + month2Paths := paths[5:7] + month3Paths := paths[7:9] + sortPaths(month0Paths) + sortPaths(month1Paths) + sortPaths(month2Paths) + sortPaths(month3Paths) + entities := func(paths []string) map[int][]*activity.EntityRecord { + segments := make(map[int][]*activity.EntityRecord) + for _, path := range paths { + segmentNum, _ := parseSegmentNumberFromPath(path) + entry, err := core.activityLog.view.Get(context.Background(), path) + require.NoError(t, err) + if entry == nil { + segments[segmentNum] = []*activity.EntityRecord{} + continue + } + activities := &activity.EntityActivityLog{} + err = proto.Unmarshal(entry.Value, activities) + require.NoError(t, err) + segments[segmentNum] = activities.Clients } - activities := &activity.EntityActivityLog{} - err = proto.Unmarshal(entry.Value, activities) - require.NoError(t, err) - segments[segmentNum] = activities.Clients + return segments } - return segments - } - month0Entities := entities(month0Paths) - require.Len(t, month0Entities, 4) - require.Contains(t, month0Entities, 1) - require.Contains(t, month0Entities, 2) - require.Contains(t, month0Entities, 4) - require.Contains(t, month0Entities, 5) - require.Len(t, month0Entities[1], 0) - require.Len(t, month0Entities[2], 0) - require.Len(t, month0Entities[4], 1) - require.Len(t, month0Entities[5], 1) + month0Entities := entities(month0Paths) + require.Len(t, month0Entities, 4) + require.Contains(t, month0Entities, 1) + require.Contains(t, month0Entities, 2) + require.Contains(t, month0Entities, 4) + require.Contains(t, month0Entities, 5) + require.Len(t, month0Entities[1], 0) + require.Len(t, month0Entities[2], 0) + require.Len(t, month0Entities[4], 1) + require.Len(t, month0Entities[5], 1) - month1Entities := entities(month1Paths) - require.Len(t, month1Entities, 1) - require.Contains(t, month1Entities, 5) - require.Len(t, month1Entities[5], 2) + month1Entities := entities(month1Paths) + require.Len(t, month1Entities, 1) + require.Contains(t, month1Entities, 5) + require.Len(t, month1Entities[5], 2) + + month2Entities := entities(month2Paths) + require.Len(t, month2Entities, 2) + require.Contains(t, month2Entities, 1) + require.Contains(t, month2Entities, 2) + require.Len(t, month2Entities[1], 3) + require.Len(t, month2Entities[2], 2) + + month3Entities := entities(month3Paths) + require.Len(t, month3Entities, 2) + require.Contains(t, month3Entities, 0) + require.Contains(t, month3Entities, 1) + require.Len(t, month3Entities[0], 2) + require.Len(t, month3Entities[1], 1) + }) + t.Run("write precomputed queries", func(t *testing.T) { + core, _, _ := TestCoreUnsealed(t) + marshaled, err := protojson.Marshal(&generation.ActivityLogMockInput{ + Data: data, + Write: []generation.WriteOptions{generation.WriteOptions_WRITE_PRECOMPUTED_QUERIES}, + }) + require.NoError(t, err) + req := logical.TestRequest(t, logical.CreateOperation, "internal/counters/activity/write") + req.Data = map[string]interface{}{"input": string(marshaled)} + _, err = core.systemBackend.HandleRequest(namespace.RootContext(nil), req) + require.NoError(t, err) - month2Entities := entities(month2Paths) - require.Len(t, month2Entities, 2) - require.Contains(t, month2Entities, 1) - require.Contains(t, month2Entities, 2) - require.Len(t, month2Entities[1], 3) - require.Len(t, month2Entities[2], 2) + queries, err := core.activityLog.queryStore.QueriesAvailable(context.Background()) + require.NoError(t, err) + require.True(t, queries) - month3Entities := entities(month3Paths) - require.Len(t, month3Entities, 2) - require.Contains(t, month3Entities, 0) - require.Contains(t, month3Entities, 1) - require.Len(t, month3Entities[0], 2) - require.Len(t, month3Entities[1], 1) + now := time.Now().UTC() + start := timeutil.StartOfMonth(timeutil.MonthsPreviousTo(3, now)) + end := timeutil.EndOfMonth(now) + pq, err := core.activityLog.queryStore.Get(context.Background(), start, end) + require.NoError(t, err) + require.NotNil(t, pq) + require.Equal(t, end, pq.EndTime) + require.Equal(t, start, pq.StartTime) + require.Len(t, pq.Namespaces, 1) + require.Equal(t, uint64(12), pq.Namespaces[0].Entities) + require.Len(t, pq.Months, 4) + }) }