Skip to content

Commit

Permalink
Merge pull request #35726 from jahkeup/awslogs-batching
Browse files Browse the repository at this point in the history
Fix awslogs batch size calculation
  • Loading branch information
thaJeztah committed Dec 19, 2017
2 parents 602bce1 + ad14dbf commit c8f7f44
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 51 deletions.
138 changes: 105 additions & 33 deletions daemon/logger/awslogs/cloudwatchlogs.go
Expand Up @@ -95,6 +95,17 @@ func init() {
}
}

// eventBatch holds the events that are batched for submission and the
// associated data about it.
//
// Warning: this type is not threadsafe and must not be used
// concurrently. This type is expected to be consumed in a single go
// routine and never concurrently.
type eventBatch struct {
batch []wrappedEvent
bytes int
}

// New creates an awslogs logger using the configuration passed in on the
// context. Supported context configuration variables are awslogs-region,
// awslogs-group, awslogs-stream, awslogs-create-group, awslogs-multiline-pattern
Expand Down Expand Up @@ -389,32 +400,32 @@ var newTicker = func(freq time.Duration) *time.Ticker {
// Logs, the processEvents method is called. If a multiline pattern is not
// configured, log events are submitted to the processEvents method immediately.
func (l *logStream) collectBatch() {
timer := newTicker(batchPublishFrequency)
var events []wrappedEvent
ticker := newTicker(batchPublishFrequency)
var eventBuffer []byte
var eventBufferTimestamp int64
var batch = newEventBatch()
for {
select {
case t := <-timer.C:
case t := <-ticker.C:
// If event buffer is older than batch publish frequency flush the event buffer
if eventBufferTimestamp > 0 && len(eventBuffer) > 0 {
eventBufferAge := t.UnixNano()/int64(time.Millisecond) - eventBufferTimestamp
eventBufferExpired := eventBufferAge > int64(batchPublishFrequency)/int64(time.Millisecond)
eventBufferNegative := eventBufferAge < 0
if eventBufferExpired || eventBufferNegative {
events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
l.processEvent(batch, eventBuffer, eventBufferTimestamp)
eventBuffer = eventBuffer[:0]
}
}
l.publishBatch(events)
events = events[:0]
l.publishBatch(batch)
batch.reset()
case msg, more := <-l.messages:
if !more {
// Flush event buffer and release resources
events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
l.processEvent(batch, eventBuffer, eventBufferTimestamp)
eventBuffer = eventBuffer[:0]
l.publishBatch(events)
events = events[:0]
l.publishBatch(batch)
batch.reset()
return
}
if eventBufferTimestamp == 0 {
Expand All @@ -425,7 +436,7 @@ func (l *logStream) collectBatch() {
if l.multilinePattern.Match(unprocessedLine) || len(eventBuffer)+len(unprocessedLine) > maximumBytesPerEvent {
// This is a new log event or we will exceed max bytes per event
// so flush the current eventBuffer to events and reset timestamp
events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
l.processEvent(batch, eventBuffer, eventBufferTimestamp)
eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
eventBuffer = eventBuffer[:0]
}
Expand All @@ -434,7 +445,7 @@ func (l *logStream) collectBatch() {
eventBuffer = append(eventBuffer, processedLine...)
logger.PutMessage(msg)
} else {
events = l.processEvent(events, unprocessedLine, msg.Timestamp.UnixNano()/int64(time.Millisecond))
l.processEvent(batch, unprocessedLine, msg.Timestamp.UnixNano()/int64(time.Millisecond))
logger.PutMessage(msg)
}
}
Expand All @@ -450,47 +461,41 @@ func (l *logStream) collectBatch() {
// bytes per event (defined in maximumBytesPerEvent). There is a fixed per-event
// byte overhead (defined in perEventBytes) which is accounted for in split- and
// batch-calculations.
func (l *logStream) processEvent(events []wrappedEvent, unprocessedLine []byte, timestamp int64) []wrappedEvent {
bytes := 0
func (l *logStream) processEvent(batch *eventBatch, unprocessedLine []byte, timestamp int64) {
for len(unprocessedLine) > 0 {
// Split line length so it does not exceed the maximum
lineBytes := len(unprocessedLine)
if lineBytes > maximumBytesPerEvent {
lineBytes = maximumBytesPerEvent
}
line := unprocessedLine[:lineBytes]
unprocessedLine = unprocessedLine[lineBytes:]
if (len(events) >= maximumLogEventsPerPut) || (bytes+lineBytes+perEventBytes > maximumBytesPerPut) {
// Publish an existing batch if it's already over the maximum number of events or if adding this
// event would push it over the maximum number of total bytes.
l.publishBatch(events)
events = events[:0]
bytes = 0
}
events = append(events, wrappedEvent{

event := wrappedEvent{
inputLogEvent: &cloudwatchlogs.InputLogEvent{
Message: aws.String(string(line)),
Timestamp: aws.Int64(timestamp),
},
insertOrder: len(events),
})
bytes += (lineBytes + perEventBytes)
insertOrder: batch.count(),
}

added := batch.add(event, lineBytes)
if added {
unprocessedLine = unprocessedLine[lineBytes:]
} else {
l.publishBatch(batch)
batch.reset()
}
}
return events
}

// publishBatch calls PutLogEvents for a given set of InputLogEvents,
// accounting for sequencing requirements (each request must reference the
// sequence token returned by the previous request).
func (l *logStream) publishBatch(events []wrappedEvent) {
if len(events) == 0 {
func (l *logStream) publishBatch(batch *eventBatch) {
if batch.isEmpty() {
return
}

// events in a batch must be sorted by timestamp
// see http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
sort.Sort(byTimestamp(events))
cwEvents := unwrapEvents(events)
cwEvents := unwrapEvents(batch.events())

nextSequenceToken, err := l.putLogEvents(cwEvents, l.sequenceToken)

Expand Down Expand Up @@ -615,3 +620,70 @@ func unwrapEvents(events []wrappedEvent) []*cloudwatchlogs.InputLogEvent {
}
return cwEvents
}

func newEventBatch() *eventBatch {
return &eventBatch{
batch: make([]wrappedEvent, 0),
bytes: 0,
}
}

// events returns a slice of wrappedEvents sorted in order of their
// timestamps and then by their insertion order (see `byTimestamp`).
//
// Warning: this method is not threadsafe and must not be used
// concurrently.
func (b *eventBatch) events() []wrappedEvent {
sort.Sort(byTimestamp(b.batch))
return b.batch
}

// add adds an event to the batch of events accounting for the
// necessary overhead for an event to be logged. An error will be
// returned if the event cannot be added to the batch due to service
// limits.
//
// Warning: this method is not threadsafe and must not be used
// concurrently.
func (b *eventBatch) add(event wrappedEvent, size int) bool {
addBytes := size + perEventBytes

// verify we are still within service limits
switch {
case len(b.batch)+1 > maximumLogEventsPerPut:
return false
case b.bytes+addBytes > maximumBytesPerPut:
return false
}

b.bytes += addBytes
b.batch = append(b.batch, event)

return true
}

// count is the number of batched events. Warning: this method
// is not threadsafe and must not be used concurrently.
func (b *eventBatch) count() int {
return len(b.batch)
}

// size is the total number of bytes that the batch represents.
//
// Warning: this method is not threadsafe and must not be used
// concurrently.
func (b *eventBatch) size() int {
return b.bytes
}

func (b *eventBatch) isEmpty() bool {
zeroEvents := b.count() == 0
zeroSize := b.size() == 0
return zeroEvents && zeroSize
}

// reset prepares the batch for reuse.
func (b *eventBatch) reset() {
b.bytes = 0
b.batch = b.batch[:0]
}
72 changes: 55 additions & 17 deletions daemon/logger/awslogs/cloudwatchlogs_test.go
Expand Up @@ -49,6 +49,15 @@ func (l *logStream) logGenerator(lineCount int, multilineCount int) {
}
}

func testEventBatch(events []wrappedEvent) *eventBatch {
batch := newEventBatch()
for _, event := range events {
eventlen := len([]byte(*event.inputLogEvent.Message))
batch.add(event, eventlen)
}
return batch
}

func TestNewAWSLogsClientUserAgentHandler(t *testing.T) {
info := logger.Info{
Config: map[string]string{
Expand Down Expand Up @@ -212,7 +221,7 @@ func TestPublishBatchSuccess(t *testing.T) {
},
}

stream.publishBatch(events)
stream.publishBatch(testEventBatch(events))
if stream.sequenceToken == nil {
t.Fatal("Expected non-nil sequenceToken")
}
Expand Down Expand Up @@ -257,7 +266,7 @@ func TestPublishBatchError(t *testing.T) {
},
}

stream.publishBatch(events)
stream.publishBatch(testEventBatch(events))
if stream.sequenceToken == nil {
t.Fatal("Expected non-nil sequenceToken")
}
Expand Down Expand Up @@ -291,7 +300,7 @@ func TestPublishBatchInvalidSeqSuccess(t *testing.T) {
},
}

stream.publishBatch(events)
stream.publishBatch(testEventBatch(events))
if stream.sequenceToken == nil {
t.Fatal("Expected non-nil sequenceToken")
}
Expand Down Expand Up @@ -354,7 +363,7 @@ func TestPublishBatchAlreadyAccepted(t *testing.T) {
},
}

stream.publishBatch(events)
stream.publishBatch(testEventBatch(events))
if stream.sequenceToken == nil {
t.Fatal("Expected non-nil sequenceToken")
}
Expand Down Expand Up @@ -859,19 +868,23 @@ func TestCollectBatchMaxEvents(t *testing.T) {
}

func TestCollectBatchMaxTotalBytes(t *testing.T) {
mockClient := newMockClientBuffered(1)
expectedPuts := 2
mockClient := newMockClientBuffered(expectedPuts)
stream := &logStream{
client: mockClient,
logGroupName: groupName,
logStreamName: streamName,
sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message),
}
mockClient.putLogEventsResult <- &putLogEventsResult{
successResult: &cloudwatchlogs.PutLogEventsOutput{
NextSequenceToken: aws.String(nextSequenceToken),
},
for i := 0; i < expectedPuts; i++ {
mockClient.putLogEventsResult <- &putLogEventsResult{
successResult: &cloudwatchlogs.PutLogEventsOutput{
NextSequenceToken: aws.String(nextSequenceToken),
},
}
}

var ticks = make(chan time.Time)
newTicker = func(_ time.Duration) *time.Ticker {
return &time.Ticker{
Expand All @@ -881,32 +894,57 @@ func TestCollectBatchMaxTotalBytes(t *testing.T) {

go stream.collectBatch()

longline := strings.Repeat("A", maximumBytesPerPut)
numPayloads := maximumBytesPerPut / (maximumBytesPerEvent + perEventBytes)
// maxline is the maximum line that could be submitted after
// accounting for its overhead.
maxline := strings.Repeat("A", maximumBytesPerPut-(perEventBytes*numPayloads))
// This will be split and batched up to the `maximumBytesPerPut'
// (+/- `maximumBytesPerEvent'). This /should/ be aligned, but
// should also tolerate an offset within that range.
stream.Log(&logger.Message{
Line: []byte(longline + "B"),
Line: []byte(maxline[:len(maxline)/2]),
Timestamp: time.Time{},
})
stream.Log(&logger.Message{
Line: []byte(maxline[len(maxline)/2:]),
Timestamp: time.Time{},
})
stream.Log(&logger.Message{
Line: []byte("B"),
Timestamp: time.Time{},
})

// no ticks
// no ticks, guarantee batch by size (and chan close)
stream.Close()

argument := <-mockClient.putLogEventsArgument
if argument == nil {
t.Fatal("Expected non-nil PutLogEventsInput")
}
bytes := 0

// Should total to the maximum allowed bytes.
eventBytes := 0
for _, event := range argument.LogEvents {
bytes += len(*event.Message)
eventBytes += len(*event.Message)
}
eventsOverhead := len(argument.LogEvents) * perEventBytes
payloadTotal := eventBytes + eventsOverhead
// lowestMaxBatch allows the payload to be offset if the messages
// don't lend themselves to align with the maximum event size.
lowestMaxBatch := maximumBytesPerPut - maximumBytesPerEvent

if payloadTotal > maximumBytesPerPut {
t.Errorf("Expected <= %d bytes but was %d", maximumBytesPerPut, payloadTotal)
}
if bytes > maximumBytesPerPut {
t.Errorf("Expected <= %d bytes but was %d", maximumBytesPerPut, bytes)
if payloadTotal < lowestMaxBatch {
t.Errorf("Batch to be no less than %d but was %d", lowestMaxBatch, payloadTotal)
}

argument = <-mockClient.putLogEventsArgument
if len(argument.LogEvents) != 1 {
t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents))
}
message := *argument.LogEvents[0].Message
message := *argument.LogEvents[len(argument.LogEvents)-1].Message
if message[len(message)-1:] != "B" {
t.Errorf("Expected message to be %s but was %s", "B", message[len(message)-1:])
}
Expand Down

0 comments on commit c8f7f44

Please sign in to comment.