diff --git a/daemon/logger/awslogs/cloudwatchlogs.go b/daemon/logger/awslogs/cloudwatchlogs.go index 4ea942071d2c5..25dd2152f0297 100644 --- a/daemon/logger/awslogs/cloudwatchlogs.go +++ b/daemon/logger/awslogs/cloudwatchlogs.go @@ -95,6 +95,17 @@ func init() { } } +// eventBatch holds the events that are batched for submission and the +// associated data about it. +// +// Warning: this type is not threadsafe and must not be used +// concurrently. This type is expected to be consumed in a single go +// routine and never concurrently. +type eventBatch struct { + batch []wrappedEvent + bytes int +} + // New creates an awslogs logger using the configuration passed in on the // context. Supported context configuration variables are awslogs-region, // awslogs-group, awslogs-stream, awslogs-create-group, awslogs-multiline-pattern @@ -389,32 +400,32 @@ var newTicker = func(freq time.Duration) *time.Ticker { // Logs, the processEvents method is called. If a multiline pattern is not // configured, log events are submitted to the processEvents method immediately. func (l *logStream) collectBatch() { - timer := newTicker(batchPublishFrequency) - var events []wrappedEvent + ticker := newTicker(batchPublishFrequency) var eventBuffer []byte var eventBufferTimestamp int64 + var batch = newEventBatch() for { select { - case t := <-timer.C: + case t := <-ticker.C: // If event buffer is older than batch publish frequency flush the event buffer if eventBufferTimestamp > 0 && len(eventBuffer) > 0 { eventBufferAge := t.UnixNano()/int64(time.Millisecond) - eventBufferTimestamp eventBufferExpired := eventBufferAge > int64(batchPublishFrequency)/int64(time.Millisecond) eventBufferNegative := eventBufferAge < 0 if eventBufferExpired || eventBufferNegative { - events = l.processEvent(events, eventBuffer, eventBufferTimestamp) + l.processEvent(batch, eventBuffer, eventBufferTimestamp) eventBuffer = eventBuffer[:0] } } - l.publishBatch(events) - events = events[:0] + l.publishBatch(batch) + batch.reset() case msg, more := <-l.messages: if !more { // Flush event buffer and release resources - events = l.processEvent(events, eventBuffer, eventBufferTimestamp) + l.processEvent(batch, eventBuffer, eventBufferTimestamp) eventBuffer = eventBuffer[:0] - l.publishBatch(events) - events = events[:0] + l.publishBatch(batch) + batch.reset() return } if eventBufferTimestamp == 0 { @@ -425,7 +436,7 @@ func (l *logStream) collectBatch() { if l.multilinePattern.Match(unprocessedLine) || len(eventBuffer)+len(unprocessedLine) > maximumBytesPerEvent { // This is a new log event or we will exceed max bytes per event // so flush the current eventBuffer to events and reset timestamp - events = l.processEvent(events, eventBuffer, eventBufferTimestamp) + l.processEvent(batch, eventBuffer, eventBufferTimestamp) eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond) eventBuffer = eventBuffer[:0] } @@ -434,7 +445,7 @@ func (l *logStream) collectBatch() { eventBuffer = append(eventBuffer, processedLine...) logger.PutMessage(msg) } else { - events = l.processEvent(events, unprocessedLine, msg.Timestamp.UnixNano()/int64(time.Millisecond)) + l.processEvent(batch, unprocessedLine, msg.Timestamp.UnixNano()/int64(time.Millisecond)) logger.PutMessage(msg) } } @@ -450,8 +461,7 @@ func (l *logStream) collectBatch() { // bytes per event (defined in maximumBytesPerEvent). There is a fixed per-event // byte overhead (defined in perEventBytes) which is accounted for in split- and // batch-calculations. -func (l *logStream) processEvent(events []wrappedEvent, unprocessedLine []byte, timestamp int64) []wrappedEvent { - bytes := 0 +func (l *logStream) processEvent(batch *eventBatch, unprocessedLine []byte, timestamp int64) { for len(unprocessedLine) > 0 { // Split line length so it does not exceed the maximum lineBytes := len(unprocessedLine) @@ -459,38 +469,33 @@ func (l *logStream) processEvent(events []wrappedEvent, unprocessedLine []byte, lineBytes = maximumBytesPerEvent } line := unprocessedLine[:lineBytes] - unprocessedLine = unprocessedLine[lineBytes:] - if (len(events) >= maximumLogEventsPerPut) || (bytes+lineBytes+perEventBytes > maximumBytesPerPut) { - // Publish an existing batch if it's already over the maximum number of events or if adding this - // event would push it over the maximum number of total bytes. - l.publishBatch(events) - events = events[:0] - bytes = 0 - } - events = append(events, wrappedEvent{ + + event := wrappedEvent{ inputLogEvent: &cloudwatchlogs.InputLogEvent{ Message: aws.String(string(line)), Timestamp: aws.Int64(timestamp), }, - insertOrder: len(events), - }) - bytes += (lineBytes + perEventBytes) + insertOrder: batch.count(), + } + + added := batch.add(event, lineBytes) + if added { + unprocessedLine = unprocessedLine[lineBytes:] + } else { + l.publishBatch(batch) + batch.reset() + } } - return events } // publishBatch calls PutLogEvents for a given set of InputLogEvents, // accounting for sequencing requirements (each request must reference the // sequence token returned by the previous request). -func (l *logStream) publishBatch(events []wrappedEvent) { - if len(events) == 0 { +func (l *logStream) publishBatch(batch *eventBatch) { + if batch.isEmpty() { return } - - // events in a batch must be sorted by timestamp - // see http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html - sort.Sort(byTimestamp(events)) - cwEvents := unwrapEvents(events) + cwEvents := unwrapEvents(batch.events()) nextSequenceToken, err := l.putLogEvents(cwEvents, l.sequenceToken) @@ -615,3 +620,70 @@ func unwrapEvents(events []wrappedEvent) []*cloudwatchlogs.InputLogEvent { } return cwEvents } + +func newEventBatch() *eventBatch { + return &eventBatch{ + batch: make([]wrappedEvent, 0), + bytes: 0, + } +} + +// events returns a slice of wrappedEvents sorted in order of their +// timestamps and then by their insertion order (see `byTimestamp`). +// +// Warning: this method is not threadsafe and must not be used +// concurrently. +func (b *eventBatch) events() []wrappedEvent { + sort.Sort(byTimestamp(b.batch)) + return b.batch +} + +// add adds an event to the batch of events accounting for the +// necessary overhead for an event to be logged. An error will be +// returned if the event cannot be added to the batch due to service +// limits. +// +// Warning: this method is not threadsafe and must not be used +// concurrently. +func (b *eventBatch) add(event wrappedEvent, size int) bool { + addBytes := size + perEventBytes + + // verify we are still within service limits + switch { + case len(b.batch)+1 > maximumLogEventsPerPut: + return false + case b.bytes+addBytes > maximumBytesPerPut: + return false + } + + b.bytes += addBytes + b.batch = append(b.batch, event) + + return true +} + +// count is the number of batched events. Warning: this method +// is not threadsafe and must not be used concurrently. +func (b *eventBatch) count() int { + return len(b.batch) +} + +// size is the total number of bytes that the batch represents. +// +// Warning: this method is not threadsafe and must not be used +// concurrently. +func (b *eventBatch) size() int { + return b.bytes +} + +func (b *eventBatch) isEmpty() bool { + zeroEvents := b.count() == 0 + zeroSize := b.size() == 0 + return zeroEvents && zeroSize +} + +// reset prepares the batch for reuse. +func (b *eventBatch) reset() { + b.bytes = 0 + b.batch = b.batch[:0] +} diff --git a/daemon/logger/awslogs/cloudwatchlogs_test.go b/daemon/logger/awslogs/cloudwatchlogs_test.go index 7ebc5dede2a55..67ea4747674c5 100644 --- a/daemon/logger/awslogs/cloudwatchlogs_test.go +++ b/daemon/logger/awslogs/cloudwatchlogs_test.go @@ -49,6 +49,15 @@ func (l *logStream) logGenerator(lineCount int, multilineCount int) { } } +func testEventBatch(events []wrappedEvent) *eventBatch { + batch := newEventBatch() + for _, event := range events { + eventlen := len([]byte(*event.inputLogEvent.Message)) + batch.add(event, eventlen) + } + return batch +} + func TestNewAWSLogsClientUserAgentHandler(t *testing.T) { info := logger.Info{ Config: map[string]string{ @@ -212,7 +221,7 @@ func TestPublishBatchSuccess(t *testing.T) { }, } - stream.publishBatch(events) + stream.publishBatch(testEventBatch(events)) if stream.sequenceToken == nil { t.Fatal("Expected non-nil sequenceToken") } @@ -257,7 +266,7 @@ func TestPublishBatchError(t *testing.T) { }, } - stream.publishBatch(events) + stream.publishBatch(testEventBatch(events)) if stream.sequenceToken == nil { t.Fatal("Expected non-nil sequenceToken") } @@ -291,7 +300,7 @@ func TestPublishBatchInvalidSeqSuccess(t *testing.T) { }, } - stream.publishBatch(events) + stream.publishBatch(testEventBatch(events)) if stream.sequenceToken == nil { t.Fatal("Expected non-nil sequenceToken") } @@ -354,7 +363,7 @@ func TestPublishBatchAlreadyAccepted(t *testing.T) { }, } - stream.publishBatch(events) + stream.publishBatch(testEventBatch(events)) if stream.sequenceToken == nil { t.Fatal("Expected non-nil sequenceToken") } @@ -859,7 +868,8 @@ func TestCollectBatchMaxEvents(t *testing.T) { } func TestCollectBatchMaxTotalBytes(t *testing.T) { - mockClient := newMockClientBuffered(1) + expectedPuts := 2 + mockClient := newMockClientBuffered(expectedPuts) stream := &logStream{ client: mockClient, logGroupName: groupName, @@ -867,11 +877,14 @@ func TestCollectBatchMaxTotalBytes(t *testing.T) { sequenceToken: aws.String(sequenceToken), messages: make(chan *logger.Message), } - mockClient.putLogEventsResult <- &putLogEventsResult{ - successResult: &cloudwatchlogs.PutLogEventsOutput{ - NextSequenceToken: aws.String(nextSequenceToken), - }, + for i := 0; i < expectedPuts; i++ { + mockClient.putLogEventsResult <- &putLogEventsResult{ + successResult: &cloudwatchlogs.PutLogEventsOutput{ + NextSequenceToken: aws.String(nextSequenceToken), + }, + } } + var ticks = make(chan time.Time) newTicker = func(_ time.Duration) *time.Ticker { return &time.Ticker{ @@ -881,32 +894,57 @@ func TestCollectBatchMaxTotalBytes(t *testing.T) { go stream.collectBatch() - longline := strings.Repeat("A", maximumBytesPerPut) + numPayloads := maximumBytesPerPut / (maximumBytesPerEvent + perEventBytes) + // maxline is the maximum line that could be submitted after + // accounting for its overhead. + maxline := strings.Repeat("A", maximumBytesPerPut-(perEventBytes*numPayloads)) + // This will be split and batched up to the `maximumBytesPerPut' + // (+/- `maximumBytesPerEvent'). This /should/ be aligned, but + // should also tolerate an offset within that range. stream.Log(&logger.Message{ - Line: []byte(longline + "B"), + Line: []byte(maxline[:len(maxline)/2]), + Timestamp: time.Time{}, + }) + stream.Log(&logger.Message{ + Line: []byte(maxline[len(maxline)/2:]), + Timestamp: time.Time{}, + }) + stream.Log(&logger.Message{ + Line: []byte("B"), Timestamp: time.Time{}, }) - // no ticks + // no ticks, guarantee batch by size (and chan close) stream.Close() argument := <-mockClient.putLogEventsArgument if argument == nil { t.Fatal("Expected non-nil PutLogEventsInput") } - bytes := 0 + + // Should total to the maximum allowed bytes. + eventBytes := 0 for _, event := range argument.LogEvents { - bytes += len(*event.Message) + eventBytes += len(*event.Message) + } + eventsOverhead := len(argument.LogEvents) * perEventBytes + payloadTotal := eventBytes + eventsOverhead + // lowestMaxBatch allows the payload to be offset if the messages + // don't lend themselves to align with the maximum event size. + lowestMaxBatch := maximumBytesPerPut - maximumBytesPerEvent + + if payloadTotal > maximumBytesPerPut { + t.Errorf("Expected <= %d bytes but was %d", maximumBytesPerPut, payloadTotal) } - if bytes > maximumBytesPerPut { - t.Errorf("Expected <= %d bytes but was %d", maximumBytesPerPut, bytes) + if payloadTotal < lowestMaxBatch { + t.Errorf("Batch to be no less than %d but was %d", lowestMaxBatch, payloadTotal) } argument = <-mockClient.putLogEventsArgument if len(argument.LogEvents) != 1 { t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents)) } - message := *argument.LogEvents[0].Message + message := *argument.LogEvents[len(argument.LogEvents)-1].Message if message[len(message)-1:] != "B" { t.Errorf("Expected message to be %s but was %s", "B", message[len(message)-1:]) } diff --git a/daemon/logger/awslogs/cwlogsiface_mock_test.go b/daemon/logger/awslogs/cwlogsiface_mock_test.go index 82bb34b0a6ea6..d0a2ebaca4e9c 100644 --- a/daemon/logger/awslogs/cwlogsiface_mock_test.go +++ b/daemon/logger/awslogs/cwlogsiface_mock_test.go @@ -1,6 +1,10 @@ package awslogs -import "github.com/aws/aws-sdk-go/service/cloudwatchlogs" +import ( + "fmt" + + "github.com/aws/aws-sdk-go/service/cloudwatchlogs" +) type mockcwlogsclient struct { createLogGroupArgument chan *cloudwatchlogs.CreateLogGroupInput @@ -67,7 +71,30 @@ func (m *mockcwlogsclient) PutLogEvents(input *cloudwatchlogs.PutLogEventsInput) LogGroupName: input.LogGroupName, LogStreamName: input.LogStreamName, } + + // Intended mock output output := <-m.putLogEventsResult + + // Checked enforced limits in mock + totalBytes := 0 + for _, evt := range events { + if evt.Message == nil { + continue + } + eventBytes := len([]byte(*evt.Message)) + if eventBytes > maximumBytesPerEvent { + // exceeded per event message size limits + return nil, fmt.Errorf("maximum bytes per event exceeded: Event too large %d, max allowed: %d", eventBytes, maximumBytesPerEvent) + } + // total event bytes including overhead + totalBytes += eventBytes + perEventBytes + } + + if totalBytes > maximumBytesPerPut { + // exceeded per put maximum size limit + return nil, fmt.Errorf("maximum bytes per put exceeded: Upload too large %d, max allowed: %d", totalBytes, maximumBytesPerPut) + } + return output.successResult, output.errorResult }