Skip to content

Commit

Permalink
backport of commit 5002489
Browse files Browse the repository at this point in the history
  • Loading branch information
miagilepner authored and mpalmi committed Jun 14, 2023
1 parent 7606c7a commit 3cc42d4
Show file tree
Hide file tree
Showing 2 changed files with 221 additions and 115 deletions.
75 changes: 75 additions & 0 deletions vault/logical_system_activity_write_testonly.go
Expand Up @@ -8,6 +8,7 @@ package vault
import (
"context"
"fmt"
"io"
"sync"
"time"

Expand Down Expand Up @@ -315,7 +316,23 @@ func (m *multipleMonthsActivityClients) addRepeatedClients(monthsAgo int32, c *g
func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[generation.WriteOptions]struct{}, activityLog *ActivityLog) ([]string, error) {
now := timeutil.StartOfMonth(time.Now().UTC())
paths := []string{}

_, writePQ := opts[generation.WriteOptions_WRITE_PRECOMPUTED_QUERIES]
_, writeDistinctClients := opts[generation.WriteOptions_WRITE_DISTINCT_CLIENTS]

pqOpts := pqOptions{}
if writePQ || writeDistinctClients {
pqOpts.byNamespace = make(map[string]*processByNamespace)
pqOpts.byMonth = make(map[int64]*processMonth)
pqOpts.activePeriodEnd = m.latestTimestamp(now)
pqOpts.endTime = timeutil.EndOfMonth(pqOpts.activePeriodEnd)
pqOpts.activePeriodStart = m.earliestTimestamp(now)
}

for i, month := range m.months {
if month.generationParameters == nil {
continue
}
var timestamp time.Time
if i > 0 {
timestamp = timeutil.StartOfMonth(timeutil.MonthsPreviousTo(i, now))
Expand Down Expand Up @@ -344,6 +361,14 @@ func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[gene
paths = append(paths, entityPath)
}
}

if writePQ || writeDistinctClients {
reader := newProtoSegmentReader(segments)
err = activityLog.segmentToPrecomputedQuery(ctx, timestamp, reader, pqOpts)
if err != nil {
return nil, err
}
}
}
wg := sync.WaitGroup{}
err := activityLog.refreshFromStoredLog(ctx, &wg, now)
Expand All @@ -353,6 +378,25 @@ func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[gene
return paths, nil
}

func (m *multipleMonthsActivityClients) latestTimestamp(now time.Time) time.Time {
for i, month := range m.months {
if month.generationParameters != nil {
return timeutil.StartOfMonth(timeutil.MonthsPreviousTo(i, now))
}
}
return time.Time{}
}

func (m *multipleMonthsActivityClients) earliestTimestamp(now time.Time) time.Time {
for i := len(m.months) - 1; i >= 0; i-- {
month := m.months[i]
if month.generationParameters != nil {
return timeutil.StartOfMonth(timeutil.MonthsPreviousTo(i, now))
}
}
return time.Time{}
}

func newMultipleMonthsActivityClients(numberOfMonths int) *multipleMonthsActivityClients {
m := &multipleMonthsActivityClients{
months: make([]*singleMonthActivityClients, numberOfMonths),
Expand All @@ -364,3 +408,34 @@ func newMultipleMonthsActivityClients(numberOfMonths int) *multipleMonthsActivit
}
return m
}

func newProtoSegmentReader(segments map[int][]*activity.EntityRecord) SegmentReader {
allRecords := make([][]*activity.EntityRecord, 0, len(segments))
for _, records := range segments {
if segments == nil {
continue
}
allRecords = append(allRecords, records)
}
return &sliceSegmentReader{
records: allRecords,
}
}

type sliceSegmentReader struct {
records [][]*activity.EntityRecord
i int
}

func (p *sliceSegmentReader) ReadToken(ctx context.Context) (*activity.TokenCount, error) {
return nil, io.EOF
}

func (p *sliceSegmentReader) ReadEntity(ctx context.Context) (*activity.EntityActivityLog, error) {
if p.i == len(p.records) {
return nil, io.EOF
}
record := p.records[p.i]
p.i++
return &activity.EntityActivityLog{Clients: record}, nil
}
261 changes: 146 additions & 115 deletions vault/logical_system_activity_write_testonly_test.go
Expand Up @@ -9,8 +9,10 @@ import (
"context"
"sort"
"testing"
"time"

"github.com/hashicorp/vault/helper/namespace"
"github.com/hashicorp/vault/helper/timeutil"
"github.com/hashicorp/vault/sdk/logical"
"github.com/hashicorp/vault/vault/activity"
"github.com/hashicorp/vault/vault/activity/generation"
Expand Down Expand Up @@ -441,136 +443,165 @@ func Test_singleMonthActivityClients_populateSegments(t *testing.T) {
}
}

// Test_multipleMonthsActivityClients_write_entities writes 4 months of data
// splitting some months across segments and using empty segments and skipped
// segments. Entities are written and then storage is queried. The test verifies
// that the correct timestamps are present in the activity log and that the correct
// segment numbers for each month contain the correct number of clients
func Test_multipleMonthsActivityClients_write_entities(t *testing.T) {
// Test_handleActivityWriteData writes 4 months of data splitting some months
// across segments and using empty segments and skipped segments. Entities and
// precomputed queries are written. written and then storage is queried. The
// test verifies that the correct timestamps are present in the activity log and
// that the correct segment numbers for each month contain the correct number of
// clients
func Test_handleActivityWriteData(t *testing.T) {
index5 := int32(5)
index4 := int32(4)
data := &generation.ActivityLogMockInput{
Write: []generation.WriteOptions{
generation.WriteOptions_WRITE_ENTITIES,
data := []*generation.Data{
{
// segments: 0:[x,y], 1:[z]
Month: &generation.Data_MonthsAgo{MonthsAgo: 3},
Clients: &generation.Data_All{All: &generation.Clients{Clients: []*generation.Client{{Count: 3}}}},
NumSegments: 2,
},
Data: []*generation.Data{
{
// segments: 0:[x,y], 1:[z]
Month: &generation.Data_MonthsAgo{MonthsAgo: 3},
Clients: &generation.Data_All{All: &generation.Clients{Clients: []*generation.Client{{Count: 3}}}},
NumSegments: 2,
},
{
// segments: 1:[a,b,c], 2:[d,e]
Month: &generation.Data_MonthsAgo{MonthsAgo: 2},
Clients: &generation.Data_All{All: &generation.Clients{Clients: []*generation.Client{{Count: 5}}}},
NumSegments: 3,
SkipSegmentIndexes: []int32{0},
{
// segments: 1:[a,b,c], 2:[d,e]
Month: &generation.Data_MonthsAgo{MonthsAgo: 2},
Clients: &generation.Data_All{All: &generation.Clients{Clients: []*generation.Client{{Count: 5}}}},
NumSegments: 3,
SkipSegmentIndexes: []int32{0},
},
{
// segments: 5:[f,g]
Month: &generation.Data_MonthsAgo{MonthsAgo: 1},
Clients: &generation.Data_Segments{
Segments: &generation.Segments{Segments: []*generation.Segment{{
SegmentIndex: &index5,
Clients: &generation.Clients{Clients: []*generation.Client{{Count: 2}}},
}}},
},
{
// segments: 5:[f,g]
Month: &generation.Data_MonthsAgo{MonthsAgo: 1},
Clients: &generation.Data_Segments{
Segments: &generation.Segments{Segments: []*generation.Segment{{
},
{
// segments: 1:[], 2:[], 4:[n], 5:[o]
Month: &generation.Data_CurrentMonth{},
EmptySegmentIndexes: []int32{1, 2},
Clients: &generation.Data_Segments{
Segments: &generation.Segments{Segments: []*generation.Segment{
{
SegmentIndex: &index5,
Clients: &generation.Clients{Clients: []*generation.Client{{Count: 2}}},
}}},
},
},
{
// segments: 1:[], 2:[], 4:[n], 5:[o]
Month: &generation.Data_CurrentMonth{},
EmptySegmentIndexes: []int32{1, 2},
Clients: &generation.Data_Segments{
Segments: &generation.Segments{Segments: []*generation.Segment{
{
SegmentIndex: &index5,
Clients: &generation.Clients{Clients: []*generation.Client{{Count: 1}}},
},
{
SegmentIndex: &index4,
Clients: &generation.Clients{Clients: []*generation.Client{{Count: 1}}},
},
}},
},
Clients: &generation.Clients{Clients: []*generation.Client{{Count: 1}}},
},
{
SegmentIndex: &index4,
Clients: &generation.Clients{Clients: []*generation.Client{{Count: 1}}},
},
}},
},
},
}

core, _, _ := TestCoreUnsealed(t)
marshaled, err := protojson.Marshal(data)
require.NoError(t, err)
req := logical.TestRequest(t, logical.CreateOperation, "internal/counters/activity/write")
req.Data = map[string]interface{}{"input": string(marshaled)}
resp, err := core.systemBackend.HandleRequest(namespace.RootContext(nil), req)
require.NoError(t, err)
paths := resp.Data["paths"].([]string)
require.Len(t, paths, 9)
t.Run("write entitites", func(t *testing.T) {
core, _, _ := TestCoreUnsealed(t)
marshaled, err := protojson.Marshal(&generation.ActivityLogMockInput{
Data: data,
Write: []generation.WriteOptions{generation.WriteOptions_WRITE_ENTITIES},
})
require.NoError(t, err)
req := logical.TestRequest(t, logical.CreateOperation, "internal/counters/activity/write")
req.Data = map[string]interface{}{"input": string(marshaled)}
resp, err := core.systemBackend.HandleRequest(namespace.RootContext(nil), req)
require.NoError(t, err)
paths := resp.Data["paths"].([]string)
require.Len(t, paths, 9)

times, err := core.activityLog.availableLogs(context.Background())
require.NoError(t, err)
require.Len(t, times, 4)
times, err := core.activityLog.availableLogs(context.Background())
require.NoError(t, err)
require.Len(t, times, 4)

sortPaths := func(monthPaths []string) {
sort.Slice(monthPaths, func(i, j int) bool {
iVal, _ := parseSegmentNumberFromPath(monthPaths[i])
jVal, _ := parseSegmentNumberFromPath(monthPaths[j])
return iVal < jVal
})
}
sortPaths := func(monthPaths []string) {
sort.Slice(monthPaths, func(i, j int) bool {
iVal, _ := parseSegmentNumberFromPath(monthPaths[i])
jVal, _ := parseSegmentNumberFromPath(monthPaths[j])
return iVal < jVal
})
}

month0Paths := paths[0:4]
month1Paths := paths[4:5]
month2Paths := paths[5:7]
month3Paths := paths[7:9]
sortPaths(month0Paths)
sortPaths(month1Paths)
sortPaths(month2Paths)
sortPaths(month3Paths)
entities := func(paths []string) map[int][]*activity.EntityRecord {
segments := make(map[int][]*activity.EntityRecord)
for _, path := range paths {
segmentNum, _ := parseSegmentNumberFromPath(path)
entry, err := core.activityLog.view.Get(context.Background(), path)
require.NoError(t, err)
if entry == nil {
segments[segmentNum] = []*activity.EntityRecord{}
continue
month0Paths := paths[0:4]
month1Paths := paths[4:5]
month2Paths := paths[5:7]
month3Paths := paths[7:9]
sortPaths(month0Paths)
sortPaths(month1Paths)
sortPaths(month2Paths)
sortPaths(month3Paths)
entities := func(paths []string) map[int][]*activity.EntityRecord {
segments := make(map[int][]*activity.EntityRecord)
for _, path := range paths {
segmentNum, _ := parseSegmentNumberFromPath(path)
entry, err := core.activityLog.view.Get(context.Background(), path)
require.NoError(t, err)
if entry == nil {
segments[segmentNum] = []*activity.EntityRecord{}
continue
}
activities := &activity.EntityActivityLog{}
err = proto.Unmarshal(entry.Value, activities)
require.NoError(t, err)
segments[segmentNum] = activities.Clients
}
activities := &activity.EntityActivityLog{}
err = proto.Unmarshal(entry.Value, activities)
require.NoError(t, err)
segments[segmentNum] = activities.Clients
return segments
}
return segments
}
month0Entities := entities(month0Paths)
require.Len(t, month0Entities, 4)
require.Contains(t, month0Entities, 1)
require.Contains(t, month0Entities, 2)
require.Contains(t, month0Entities, 4)
require.Contains(t, month0Entities, 5)
require.Len(t, month0Entities[1], 0)
require.Len(t, month0Entities[2], 0)
require.Len(t, month0Entities[4], 1)
require.Len(t, month0Entities[5], 1)
month0Entities := entities(month0Paths)
require.Len(t, month0Entities, 4)
require.Contains(t, month0Entities, 1)
require.Contains(t, month0Entities, 2)
require.Contains(t, month0Entities, 4)
require.Contains(t, month0Entities, 5)
require.Len(t, month0Entities[1], 0)
require.Len(t, month0Entities[2], 0)
require.Len(t, month0Entities[4], 1)
require.Len(t, month0Entities[5], 1)

month1Entities := entities(month1Paths)
require.Len(t, month1Entities, 1)
require.Contains(t, month1Entities, 5)
require.Len(t, month1Entities[5], 2)
month1Entities := entities(month1Paths)
require.Len(t, month1Entities, 1)
require.Contains(t, month1Entities, 5)
require.Len(t, month1Entities[5], 2)

month2Entities := entities(month2Paths)
require.Len(t, month2Entities, 2)
require.Contains(t, month2Entities, 1)
require.Contains(t, month2Entities, 2)
require.Len(t, month2Entities[1], 3)
require.Len(t, month2Entities[2], 2)

month3Entities := entities(month3Paths)
require.Len(t, month3Entities, 2)
require.Contains(t, month3Entities, 0)
require.Contains(t, month3Entities, 1)
require.Len(t, month3Entities[0], 2)
require.Len(t, month3Entities[1], 1)
})
t.Run("write precomputed queries", func(t *testing.T) {
core, _, _ := TestCoreUnsealed(t)
marshaled, err := protojson.Marshal(&generation.ActivityLogMockInput{
Data: data,
Write: []generation.WriteOptions{generation.WriteOptions_WRITE_PRECOMPUTED_QUERIES},
})
require.NoError(t, err)
req := logical.TestRequest(t, logical.CreateOperation, "internal/counters/activity/write")
req.Data = map[string]interface{}{"input": string(marshaled)}
_, err = core.systemBackend.HandleRequest(namespace.RootContext(nil), req)
require.NoError(t, err)

month2Entities := entities(month2Paths)
require.Len(t, month2Entities, 2)
require.Contains(t, month2Entities, 1)
require.Contains(t, month2Entities, 2)
require.Len(t, month2Entities[1], 3)
require.Len(t, month2Entities[2], 2)
queries, err := core.activityLog.queryStore.QueriesAvailable(context.Background())
require.NoError(t, err)
require.True(t, queries)

month3Entities := entities(month3Paths)
require.Len(t, month3Entities, 2)
require.Contains(t, month3Entities, 0)
require.Contains(t, month3Entities, 1)
require.Len(t, month3Entities[0], 2)
require.Len(t, month3Entities[1], 1)
now := time.Now().UTC()
start := timeutil.StartOfMonth(timeutil.MonthsPreviousTo(3, now))
end := timeutil.EndOfMonth(now)
pq, err := core.activityLog.queryStore.Get(context.Background(), start, end)
require.NoError(t, err)
require.NotNil(t, pq)
require.Equal(t, end, pq.EndTime)
require.Equal(t, start, pq.StartTime)
require.Len(t, pq.Namespaces, 1)
require.Equal(t, uint64(12), pq.Namespaces[0].Entities)
require.Len(t, pq.Months, 4)
})
}

0 comments on commit 3cc42d4

Please sign in to comment.