Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix awslogs batch size calculation #35726

Merged
merged 1 commit into from Dec 19, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
138 changes: 105 additions & 33 deletions daemon/logger/awslogs/cloudwatchlogs.go
Expand Up @@ -95,6 +95,17 @@ func init() {
}
}

// eventBatch holds the events that are batched for submission and the
// associated data about it.
//
// Warning: this type is not threadsafe and must not be used
// concurrently. This type is expected to be consumed in a single go
// routine and never concurrently.
type eventBatch struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move this down to be right above the methods for the type? Those start on line 636.

batch []wrappedEvent
bytes int
}

// New creates an awslogs logger using the configuration passed in on the
// context. Supported context configuration variables are awslogs-region,
// awslogs-group, awslogs-stream, awslogs-create-group, awslogs-multiline-pattern
Expand Down Expand Up @@ -389,32 +400,32 @@ var newTicker = func(freq time.Duration) *time.Ticker {
// Logs, the processEvents method is called. If a multiline pattern is not
// configured, log events are submitted to the processEvents method immediately.
func (l *logStream) collectBatch() {
timer := newTicker(batchPublishFrequency)
var events []wrappedEvent
ticker := newTicker(batchPublishFrequency)
var eventBuffer []byte
var eventBufferTimestamp int64
var batch = newEventBatch()
for {
select {
case t := <-timer.C:
case t := <-ticker.C:
// If event buffer is older than batch publish frequency flush the event buffer
if eventBufferTimestamp > 0 && len(eventBuffer) > 0 {
eventBufferAge := t.UnixNano()/int64(time.Millisecond) - eventBufferTimestamp
eventBufferExpired := eventBufferAge > int64(batchPublishFrequency)/int64(time.Millisecond)
eventBufferNegative := eventBufferAge < 0
if eventBufferExpired || eventBufferNegative {
events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
l.processEvent(batch, eventBuffer, eventBufferTimestamp)
eventBuffer = eventBuffer[:0]
}
}
l.publishBatch(events)
events = events[:0]
l.publishBatch(batch)
batch.reset()
case msg, more := <-l.messages:
if !more {
// Flush event buffer and release resources
events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
l.processEvent(batch, eventBuffer, eventBufferTimestamp)
eventBuffer = eventBuffer[:0]
l.publishBatch(events)
events = events[:0]
l.publishBatch(batch)
batch.reset()
return
}
if eventBufferTimestamp == 0 {
Expand All @@ -425,7 +436,7 @@ func (l *logStream) collectBatch() {
if l.multilinePattern.Match(unprocessedLine) || len(eventBuffer)+len(unprocessedLine) > maximumBytesPerEvent {
// This is a new log event or we will exceed max bytes per event
// so flush the current eventBuffer to events and reset timestamp
events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
l.processEvent(batch, eventBuffer, eventBufferTimestamp)
eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
eventBuffer = eventBuffer[:0]
}
Expand All @@ -434,7 +445,7 @@ func (l *logStream) collectBatch() {
eventBuffer = append(eventBuffer, processedLine...)
logger.PutMessage(msg)
} else {
events = l.processEvent(events, unprocessedLine, msg.Timestamp.UnixNano()/int64(time.Millisecond))
l.processEvent(batch, unprocessedLine, msg.Timestamp.UnixNano()/int64(time.Millisecond))
logger.PutMessage(msg)
}
}
Expand All @@ -450,47 +461,41 @@ func (l *logStream) collectBatch() {
// bytes per event (defined in maximumBytesPerEvent). There is a fixed per-event
// byte overhead (defined in perEventBytes) which is accounted for in split- and
// batch-calculations.
func (l *logStream) processEvent(events []wrappedEvent, unprocessedLine []byte, timestamp int64) []wrappedEvent {
bytes := 0
func (l *logStream) processEvent(batch *eventBatch, unprocessedLine []byte, timestamp int64) {
for len(unprocessedLine) > 0 {
// Split line length so it does not exceed the maximum
lineBytes := len(unprocessedLine)
if lineBytes > maximumBytesPerEvent {
lineBytes = maximumBytesPerEvent
}
line := unprocessedLine[:lineBytes]
unprocessedLine = unprocessedLine[lineBytes:]
if (len(events) >= maximumLogEventsPerPut) || (bytes+lineBytes+perEventBytes > maximumBytesPerPut) {
// Publish an existing batch if it's already over the maximum number of events or if adding this
// event would push it over the maximum number of total bytes.
l.publishBatch(events)
events = events[:0]
bytes = 0
}
events = append(events, wrappedEvent{

event := wrappedEvent{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrapping and Unwrapping could be done in batch.add() / batch.events() as they're really related to how the batch stores the events (i.e. just for preserving the events order)

inputLogEvent: &cloudwatchlogs.InputLogEvent{
Message: aws.String(string(line)),
Timestamp: aws.Int64(timestamp),
},
insertOrder: len(events),
})
bytes += (lineBytes + perEventBytes)
insertOrder: batch.count(),
}

added := batch.add(event, lineBytes)
if added {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit; this could be (no need to change)

if ok := batch.add(event, lineBytes); ok {

unprocessedLine = unprocessedLine[lineBytes:]
} else {
l.publishBatch(batch)
batch.reset()
}
}
return events
}

// publishBatch calls PutLogEvents for a given set of InputLogEvents,
// accounting for sequencing requirements (each request must reference the
// sequence token returned by the previous request).
func (l *logStream) publishBatch(events []wrappedEvent) {
if len(events) == 0 {
func (l *logStream) publishBatch(batch *eventBatch) {
if batch.isEmpty() {
return
}

// events in a batch must be sorted by timestamp
// see http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
sort.Sort(byTimestamp(events))
cwEvents := unwrapEvents(events)
cwEvents := unwrapEvents(batch.events())

nextSequenceToken, err := l.putLogEvents(cwEvents, l.sequenceToken)

Expand Down Expand Up @@ -615,3 +620,70 @@ func unwrapEvents(events []wrappedEvent) []*cloudwatchlogs.InputLogEvent {
}
return cwEvents
}

func newEventBatch() *eventBatch {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps in a future rewrite we could pass in maximumLogEventsPerPut and maximumBytesPerPut (they feel like a property of the eventsBatch);

func newEventBatch(maxEvents uint, maxBytes uint) *eventBatch { 

return &eventBatch{
batch: make([]wrappedEvent, 0),
bytes: 0,
}
}

// events returns a slice of wrappedEvents sorted in order of their
// timestamps and then by their insertion order (see `byTimestamp`).
//
// Warning: this method is not threadsafe and must not be used
// concurrently.
func (b *eventBatch) events() []wrappedEvent {
sort.Sort(byTimestamp(b.batch))
return b.batch
}

// add adds an event to the batch of events accounting for the
// necessary overhead for an event to be logged. An error will be
// returned if the event cannot be added to the batch due to service
// limits.
//
// Warning: this method is not threadsafe and must not be used
// concurrently.
func (b *eventBatch) add(event wrappedEvent, size int) bool {
addBytes := size + perEventBytes

// verify we are still within service limits
switch {
case len(b.batch)+1 > maximumLogEventsPerPut:
return false
case b.bytes+addBytes > maximumBytesPerPut:
return false
}

b.bytes += addBytes
b.batch = append(b.batch, event)

return true
}

// count is the number of batched events. Warning: this method
// is not threadsafe and must not be used concurrently.
func (b *eventBatch) count() int {
return len(b.batch)
}

// size is the total number of bytes that the batch represents.
//
// Warning: this method is not threadsafe and must not be used
// concurrently.
func (b *eventBatch) size() int {
return b.bytes
}

func (b *eventBatch) isEmpty() bool {
zeroEvents := b.count() == 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: these variables are a bit redundant;

return b.count() == 0 && b.size() == 0

zeroSize := b.size() == 0
return zeroEvents && zeroSize
}

// reset prepares the batch for reuse.
func (b *eventBatch) reset() {
b.bytes = 0
b.batch = b.batch[:0]
}
72 changes: 55 additions & 17 deletions daemon/logger/awslogs/cloudwatchlogs_test.go
Expand Up @@ -49,6 +49,15 @@ func (l *logStream) logGenerator(lineCount int, multilineCount int) {
}
}

func testEventBatch(events []wrappedEvent) *eventBatch {
batch := newEventBatch()
for _, event := range events {
eventlen := len([]byte(*event.inputLogEvent.Message))
batch.add(event, eventlen)
}
return batch
}

func TestNewAWSLogsClientUserAgentHandler(t *testing.T) {
info := logger.Info{
Config: map[string]string{
Expand Down Expand Up @@ -212,7 +221,7 @@ func TestPublishBatchSuccess(t *testing.T) {
},
}

stream.publishBatch(events)
stream.publishBatch(testEventBatch(events))
if stream.sequenceToken == nil {
t.Fatal("Expected non-nil sequenceToken")
}
Expand Down Expand Up @@ -257,7 +266,7 @@ func TestPublishBatchError(t *testing.T) {
},
}

stream.publishBatch(events)
stream.publishBatch(testEventBatch(events))
if stream.sequenceToken == nil {
t.Fatal("Expected non-nil sequenceToken")
}
Expand Down Expand Up @@ -291,7 +300,7 @@ func TestPublishBatchInvalidSeqSuccess(t *testing.T) {
},
}

stream.publishBatch(events)
stream.publishBatch(testEventBatch(events))
if stream.sequenceToken == nil {
t.Fatal("Expected non-nil sequenceToken")
}
Expand Down Expand Up @@ -354,7 +363,7 @@ func TestPublishBatchAlreadyAccepted(t *testing.T) {
},
}

stream.publishBatch(events)
stream.publishBatch(testEventBatch(events))
if stream.sequenceToken == nil {
t.Fatal("Expected non-nil sequenceToken")
}
Expand Down Expand Up @@ -859,19 +868,23 @@ func TestCollectBatchMaxEvents(t *testing.T) {
}

func TestCollectBatchMaxTotalBytes(t *testing.T) {
mockClient := newMockClientBuffered(1)
expectedPuts := 2
mockClient := newMockClientBuffered(expectedPuts)
stream := &logStream{
client: mockClient,
logGroupName: groupName,
logStreamName: streamName,
sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message),
}
mockClient.putLogEventsResult <- &putLogEventsResult{
successResult: &cloudwatchlogs.PutLogEventsOutput{
NextSequenceToken: aws.String(nextSequenceToken),
},
for i := 0; i < expectedPuts; i++ {
mockClient.putLogEventsResult <- &putLogEventsResult{
successResult: &cloudwatchlogs.PutLogEventsOutput{
NextSequenceToken: aws.String(nextSequenceToken),
},
}
}

var ticks = make(chan time.Time)
newTicker = func(_ time.Duration) *time.Ticker {
return &time.Ticker{
Expand All @@ -881,32 +894,57 @@ func TestCollectBatchMaxTotalBytes(t *testing.T) {

go stream.collectBatch()

longline := strings.Repeat("A", maximumBytesPerPut)
numPayloads := maximumBytesPerPut / (maximumBytesPerEvent + perEventBytes)
// maxline is the maximum line that could be submitted after
// accounting for its overhead.
maxline := strings.Repeat("A", maximumBytesPerPut-(perEventBytes*numPayloads))
// This will be split and batched up to the `maximumBytesPerPut'
// (+/- `maximumBytesPerEvent'). This /should/ be aligned, but
// should also tolerate an offset within that range.
stream.Log(&logger.Message{
Line: []byte(longline + "B"),
Line: []byte(maxline[:len(maxline)/2]),
Timestamp: time.Time{},
})
stream.Log(&logger.Message{
Line: []byte(maxline[len(maxline)/2:]),
Timestamp: time.Time{},
})
stream.Log(&logger.Message{
Line: []byte("B"),
Timestamp: time.Time{},
})

// no ticks
// no ticks, guarantee batch by size (and chan close)
stream.Close()

argument := <-mockClient.putLogEventsArgument
if argument == nil {
t.Fatal("Expected non-nil PutLogEventsInput")
}
bytes := 0

// Should total to the maximum allowed bytes.
eventBytes := 0
for _, event := range argument.LogEvents {
bytes += len(*event.Message)
eventBytes += len(*event.Message)
}
eventsOverhead := len(argument.LogEvents) * perEventBytes
payloadTotal := eventBytes + eventsOverhead
// lowestMaxBatch allows the payload to be offset if the messages
// don't lend themselves to align with the maximum event size.
lowestMaxBatch := maximumBytesPerPut - maximumBytesPerEvent

if payloadTotal > maximumBytesPerPut {
t.Errorf("Expected <= %d bytes but was %d", maximumBytesPerPut, payloadTotal)
}
if bytes > maximumBytesPerPut {
t.Errorf("Expected <= %d bytes but was %d", maximumBytesPerPut, bytes)
if payloadTotal < lowestMaxBatch {
t.Errorf("Batch to be no less than %d but was %d", lowestMaxBatch, payloadTotal)
}

argument = <-mockClient.putLogEventsArgument
if len(argument.LogEvents) != 1 {
t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents))
}
message := *argument.LogEvents[0].Message
message := *argument.LogEvents[len(argument.LogEvents)-1].Message
if message[len(message)-1:] != "B" {
t.Errorf("Expected message to be %s but was %s", "B", message[len(message)-1:])
}
Expand Down