From ad14dbf1346742f0607d7c28a8ef3d4064f5f9fd Mon Sep 17 00:00:00 2001 From: Jacob Vallejo Date: Thu, 30 Nov 2017 16:17:17 -0800 Subject: [PATCH] awslogs: Use batching type for ergonomics and correct counting The previous bytes counter was moved out of scope was not counting the total number of bytes in the batch. This type encapsulates the counter and the batch for consideration and code ergonomics. Signed-off-by: Jacob Vallejo --- daemon/logger/awslogs/cloudwatchlogs.go | 138 +++++++++++++----- daemon/logger/awslogs/cloudwatchlogs_test.go | 72 ++++++--- .../logger/awslogs/cwlogsiface_mock_test.go | 29 +++- 3 files changed, 188 insertions(+), 51 deletions(-) diff --git a/daemon/logger/awslogs/cloudwatchlogs.go b/daemon/logger/awslogs/cloudwatchlogs.go index 4ea942071d2c5..25dd2152f0297 100644 --- a/daemon/logger/awslogs/cloudwatchlogs.go +++ b/daemon/logger/awslogs/cloudwatchlogs.go @@ -95,6 +95,17 @@ func init() { } } +// eventBatch holds the events that are batched for submission and the +// associated data about it. +// +// Warning: this type is not threadsafe and must not be used +// concurrently. This type is expected to be consumed in a single go +// routine and never concurrently. +type eventBatch struct { + batch []wrappedEvent + bytes int +} + // New creates an awslogs logger using the configuration passed in on the // context. Supported context configuration variables are awslogs-region, // awslogs-group, awslogs-stream, awslogs-create-group, awslogs-multiline-pattern @@ -389,32 +400,32 @@ var newTicker = func(freq time.Duration) *time.Ticker { // Logs, the processEvents method is called. If a multiline pattern is not // configured, log events are submitted to the processEvents method immediately. func (l *logStream) collectBatch() { - timer := newTicker(batchPublishFrequency) - var events []wrappedEvent + ticker := newTicker(batchPublishFrequency) var eventBuffer []byte var eventBufferTimestamp int64 + var batch = newEventBatch() for { select { - case t := <-timer.C: + case t := <-ticker.C: // If event buffer is older than batch publish frequency flush the event buffer if eventBufferTimestamp > 0 && len(eventBuffer) > 0 { eventBufferAge := t.UnixNano()/int64(time.Millisecond) - eventBufferTimestamp eventBufferExpired := eventBufferAge > int64(batchPublishFrequency)/int64(time.Millisecond) eventBufferNegative := eventBufferAge < 0 if eventBufferExpired || eventBufferNegative { - events = l.processEvent(events, eventBuffer, eventBufferTimestamp) + l.processEvent(batch, eventBuffer, eventBufferTimestamp) eventBuffer = eventBuffer[:0] } } - l.publishBatch(events) - events = events[:0] + l.publishBatch(batch) + batch.reset() case msg, more := <-l.messages: if !more { // Flush event buffer and release resources - events = l.processEvent(events, eventBuffer, eventBufferTimestamp) + l.processEvent(batch, eventBuffer, eventBufferTimestamp) eventBuffer = eventBuffer[:0] - l.publishBatch(events) - events = events[:0] + l.publishBatch(batch) + batch.reset() return } if eventBufferTimestamp == 0 { @@ -425,7 +436,7 @@ func (l *logStream) collectBatch() { if l.multilinePattern.Match(unprocessedLine) || len(eventBuffer)+len(unprocessedLine) > maximumBytesPerEvent { // This is a new log event or we will exceed max bytes per event // so flush the current eventBuffer to events and reset timestamp - events = l.processEvent(events, eventBuffer, eventBufferTimestamp) + l.processEvent(batch, eventBuffer, eventBufferTimestamp) eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond) eventBuffer = eventBuffer[:0] } @@ -434,7 +445,7 @@ func (l *logStream) collectBatch() { eventBuffer = append(eventBuffer, processedLine...) logger.PutMessage(msg) } else { - events = l.processEvent(events, unprocessedLine, msg.Timestamp.UnixNano()/int64(time.Millisecond)) + l.processEvent(batch, unprocessedLine, msg.Timestamp.UnixNano()/int64(time.Millisecond)) logger.PutMessage(msg) } } @@ -450,8 +461,7 @@ func (l *logStream) collectBatch() { // bytes per event (defined in maximumBytesPerEvent). There is a fixed per-event // byte overhead (defined in perEventBytes) which is accounted for in split- and // batch-calculations. -func (l *logStream) processEvent(events []wrappedEvent, unprocessedLine []byte, timestamp int64) []wrappedEvent { - bytes := 0 +func (l *logStream) processEvent(batch *eventBatch, unprocessedLine []byte, timestamp int64) { for len(unprocessedLine) > 0 { // Split line length so it does not exceed the maximum lineBytes := len(unprocessedLine) @@ -459,38 +469,33 @@ func (l *logStream) processEvent(events []wrappedEvent, unprocessedLine []byte, lineBytes = maximumBytesPerEvent } line := unprocessedLine[:lineBytes] - unprocessedLine = unprocessedLine[lineBytes:] - if (len(events) >= maximumLogEventsPerPut) || (bytes+lineBytes+perEventBytes > maximumBytesPerPut) { - // Publish an existing batch if it's already over the maximum number of events or if adding this - // event would push it over the maximum number of total bytes. - l.publishBatch(events) - events = events[:0] - bytes = 0 - } - events = append(events, wrappedEvent{ + + event := wrappedEvent{ inputLogEvent: &cloudwatchlogs.InputLogEvent{ Message: aws.String(string(line)), Timestamp: aws.Int64(timestamp), }, - insertOrder: len(events), - }) - bytes += (lineBytes + perEventBytes) + insertOrder: batch.count(), + } + + added := batch.add(event, lineBytes) + if added { + unprocessedLine = unprocessedLine[lineBytes:] + } else { + l.publishBatch(batch) + batch.reset() + } } - return events } // publishBatch calls PutLogEvents for a given set of InputLogEvents, // accounting for sequencing requirements (each request must reference the // sequence token returned by the previous request). -func (l *logStream) publishBatch(events []wrappedEvent) { - if len(events) == 0 { +func (l *logStream) publishBatch(batch *eventBatch) { + if batch.isEmpty() { return } - - // events in a batch must be sorted by timestamp - // see http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html - sort.Sort(byTimestamp(events)) - cwEvents := unwrapEvents(events) + cwEvents := unwrapEvents(batch.events()) nextSequenceToken, err := l.putLogEvents(cwEvents, l.sequenceToken) @@ -615,3 +620,70 @@ func unwrapEvents(events []wrappedEvent) []*cloudwatchlogs.InputLogEvent { } return cwEvents } + +func newEventBatch() *eventBatch { + return &eventBatch{ + batch: make([]wrappedEvent, 0), + bytes: 0, + } +} + +// events returns a slice of wrappedEvents sorted in order of their +// timestamps and then by their insertion order (see `byTimestamp`). +// +// Warning: this method is not threadsafe and must not be used +// concurrently. +func (b *eventBatch) events() []wrappedEvent { + sort.Sort(byTimestamp(b.batch)) + return b.batch +} + +// add adds an event to the batch of events accounting for the +// necessary overhead for an event to be logged. An error will be +// returned if the event cannot be added to the batch due to service +// limits. +// +// Warning: this method is not threadsafe and must not be used +// concurrently. +func (b *eventBatch) add(event wrappedEvent, size int) bool { + addBytes := size + perEventBytes + + // verify we are still within service limits + switch { + case len(b.batch)+1 > maximumLogEventsPerPut: + return false + case b.bytes+addBytes > maximumBytesPerPut: + return false + } + + b.bytes += addBytes + b.batch = append(b.batch, event) + + return true +} + +// count is the number of batched events. Warning: this method +// is not threadsafe and must not be used concurrently. +func (b *eventBatch) count() int { + return len(b.batch) +} + +// size is the total number of bytes that the batch represents. +// +// Warning: this method is not threadsafe and must not be used +// concurrently. +func (b *eventBatch) size() int { + return b.bytes +} + +func (b *eventBatch) isEmpty() bool { + zeroEvents := b.count() == 0 + zeroSize := b.size() == 0 + return zeroEvents && zeroSize +} + +// reset prepares the batch for reuse. +func (b *eventBatch) reset() { + b.bytes = 0 + b.batch = b.batch[:0] +} diff --git a/daemon/logger/awslogs/cloudwatchlogs_test.go b/daemon/logger/awslogs/cloudwatchlogs_test.go index 7ebc5dede2a55..67ea4747674c5 100644 --- a/daemon/logger/awslogs/cloudwatchlogs_test.go +++ b/daemon/logger/awslogs/cloudwatchlogs_test.go @@ -49,6 +49,15 @@ func (l *logStream) logGenerator(lineCount int, multilineCount int) { } } +func testEventBatch(events []wrappedEvent) *eventBatch { + batch := newEventBatch() + for _, event := range events { + eventlen := len([]byte(*event.inputLogEvent.Message)) + batch.add(event, eventlen) + } + return batch +} + func TestNewAWSLogsClientUserAgentHandler(t *testing.T) { info := logger.Info{ Config: map[string]string{ @@ -212,7 +221,7 @@ func TestPublishBatchSuccess(t *testing.T) { }, } - stream.publishBatch(events) + stream.publishBatch(testEventBatch(events)) if stream.sequenceToken == nil { t.Fatal("Expected non-nil sequenceToken") } @@ -257,7 +266,7 @@ func TestPublishBatchError(t *testing.T) { }, } - stream.publishBatch(events) + stream.publishBatch(testEventBatch(events)) if stream.sequenceToken == nil { t.Fatal("Expected non-nil sequenceToken") } @@ -291,7 +300,7 @@ func TestPublishBatchInvalidSeqSuccess(t *testing.T) { }, } - stream.publishBatch(events) + stream.publishBatch(testEventBatch(events)) if stream.sequenceToken == nil { t.Fatal("Expected non-nil sequenceToken") } @@ -354,7 +363,7 @@ func TestPublishBatchAlreadyAccepted(t *testing.T) { }, } - stream.publishBatch(events) + stream.publishBatch(testEventBatch(events)) if stream.sequenceToken == nil { t.Fatal("Expected non-nil sequenceToken") } @@ -859,7 +868,8 @@ func TestCollectBatchMaxEvents(t *testing.T) { } func TestCollectBatchMaxTotalBytes(t *testing.T) { - mockClient := newMockClientBuffered(1) + expectedPuts := 2 + mockClient := newMockClientBuffered(expectedPuts) stream := &logStream{ client: mockClient, logGroupName: groupName, @@ -867,11 +877,14 @@ func TestCollectBatchMaxTotalBytes(t *testing.T) { sequenceToken: aws.String(sequenceToken), messages: make(chan *logger.Message), } - mockClient.putLogEventsResult <- &putLogEventsResult{ - successResult: &cloudwatchlogs.PutLogEventsOutput{ - NextSequenceToken: aws.String(nextSequenceToken), - }, + for i := 0; i < expectedPuts; i++ { + mockClient.putLogEventsResult <- &putLogEventsResult{ + successResult: &cloudwatchlogs.PutLogEventsOutput{ + NextSequenceToken: aws.String(nextSequenceToken), + }, + } } + var ticks = make(chan time.Time) newTicker = func(_ time.Duration) *time.Ticker { return &time.Ticker{ @@ -881,32 +894,57 @@ func TestCollectBatchMaxTotalBytes(t *testing.T) { go stream.collectBatch() - longline := strings.Repeat("A", maximumBytesPerPut) + numPayloads := maximumBytesPerPut / (maximumBytesPerEvent + perEventBytes) + // maxline is the maximum line that could be submitted after + // accounting for its overhead. + maxline := strings.Repeat("A", maximumBytesPerPut-(perEventBytes*numPayloads)) + // This will be split and batched up to the `maximumBytesPerPut' + // (+/- `maximumBytesPerEvent'). This /should/ be aligned, but + // should also tolerate an offset within that range. stream.Log(&logger.Message{ - Line: []byte(longline + "B"), + Line: []byte(maxline[:len(maxline)/2]), + Timestamp: time.Time{}, + }) + stream.Log(&logger.Message{ + Line: []byte(maxline[len(maxline)/2:]), + Timestamp: time.Time{}, + }) + stream.Log(&logger.Message{ + Line: []byte("B"), Timestamp: time.Time{}, }) - // no ticks + // no ticks, guarantee batch by size (and chan close) stream.Close() argument := <-mockClient.putLogEventsArgument if argument == nil { t.Fatal("Expected non-nil PutLogEventsInput") } - bytes := 0 + + // Should total to the maximum allowed bytes. + eventBytes := 0 for _, event := range argument.LogEvents { - bytes += len(*event.Message) + eventBytes += len(*event.Message) + } + eventsOverhead := len(argument.LogEvents) * perEventBytes + payloadTotal := eventBytes + eventsOverhead + // lowestMaxBatch allows the payload to be offset if the messages + // don't lend themselves to align with the maximum event size. + lowestMaxBatch := maximumBytesPerPut - maximumBytesPerEvent + + if payloadTotal > maximumBytesPerPut { + t.Errorf("Expected <= %d bytes but was %d", maximumBytesPerPut, payloadTotal) } - if bytes > maximumBytesPerPut { - t.Errorf("Expected <= %d bytes but was %d", maximumBytesPerPut, bytes) + if payloadTotal < lowestMaxBatch { + t.Errorf("Batch to be no less than %d but was %d", lowestMaxBatch, payloadTotal) } argument = <-mockClient.putLogEventsArgument if len(argument.LogEvents) != 1 { t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents)) } - message := *argument.LogEvents[0].Message + message := *argument.LogEvents[len(argument.LogEvents)-1].Message if message[len(message)-1:] != "B" { t.Errorf("Expected message to be %s but was %s", "B", message[len(message)-1:]) } diff --git a/daemon/logger/awslogs/cwlogsiface_mock_test.go b/daemon/logger/awslogs/cwlogsiface_mock_test.go index 82bb34b0a6ea6..d0a2ebaca4e9c 100644 --- a/daemon/logger/awslogs/cwlogsiface_mock_test.go +++ b/daemon/logger/awslogs/cwlogsiface_mock_test.go @@ -1,6 +1,10 @@ package awslogs -import "github.com/aws/aws-sdk-go/service/cloudwatchlogs" +import ( + "fmt" + + "github.com/aws/aws-sdk-go/service/cloudwatchlogs" +) type mockcwlogsclient struct { createLogGroupArgument chan *cloudwatchlogs.CreateLogGroupInput @@ -67,7 +71,30 @@ func (m *mockcwlogsclient) PutLogEvents(input *cloudwatchlogs.PutLogEventsInput) LogGroupName: input.LogGroupName, LogStreamName: input.LogStreamName, } + + // Intended mock output output := <-m.putLogEventsResult + + // Checked enforced limits in mock + totalBytes := 0 + for _, evt := range events { + if evt.Message == nil { + continue + } + eventBytes := len([]byte(*evt.Message)) + if eventBytes > maximumBytesPerEvent { + // exceeded per event message size limits + return nil, fmt.Errorf("maximum bytes per event exceeded: Event too large %d, max allowed: %d", eventBytes, maximumBytesPerEvent) + } + // total event bytes including overhead + totalBytes += eventBytes + perEventBytes + } + + if totalBytes > maximumBytesPerPut { + // exceeded per put maximum size limit + return nil, fmt.Errorf("maximum bytes per put exceeded: Upload too large %d, max allowed: %d", totalBytes, maximumBytesPerPut) + } + return output.successResult, output.errorResult }