Skip to content

Commit

Permalink
awslogs: Use batching type for ergonomics and correct counting
Browse files Browse the repository at this point in the history
The previous bytes counter was moved out of scope was not counting the
total number of bytes in the batch. This type encapsulates the counter
and the batch for consideration and code ergonomics.

Signed-off-by: Jacob Vallejo <jakeev@amazon.com>
  • Loading branch information
jahkeup committed Dec 13, 2017
1 parent 1cea9d3 commit ad14dbf
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 51 deletions.
138 changes: 105 additions & 33 deletions daemon/logger/awslogs/cloudwatchlogs.go
Expand Up @@ -95,6 +95,17 @@ func init() {
}
}

// eventBatch holds the events that are batched for submission and the
// associated data about it.
//
// Warning: this type is not threadsafe and must not be used
// concurrently. This type is expected to be consumed in a single go
// routine and never concurrently.
type eventBatch struct {
batch []wrappedEvent
bytes int
}

// New creates an awslogs logger using the configuration passed in on the
// context. Supported context configuration variables are awslogs-region,
// awslogs-group, awslogs-stream, awslogs-create-group, awslogs-multiline-pattern
Expand Down Expand Up @@ -389,32 +400,32 @@ var newTicker = func(freq time.Duration) *time.Ticker {
// Logs, the processEvents method is called. If a multiline pattern is not
// configured, log events are submitted to the processEvents method immediately.
func (l *logStream) collectBatch() {
timer := newTicker(batchPublishFrequency)
var events []wrappedEvent
ticker := newTicker(batchPublishFrequency)
var eventBuffer []byte
var eventBufferTimestamp int64
var batch = newEventBatch()
for {
select {
case t := <-timer.C:
case t := <-ticker.C:
// If event buffer is older than batch publish frequency flush the event buffer
if eventBufferTimestamp > 0 && len(eventBuffer) > 0 {
eventBufferAge := t.UnixNano()/int64(time.Millisecond) - eventBufferTimestamp
eventBufferExpired := eventBufferAge > int64(batchPublishFrequency)/int64(time.Millisecond)
eventBufferNegative := eventBufferAge < 0
if eventBufferExpired || eventBufferNegative {
events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
l.processEvent(batch, eventBuffer, eventBufferTimestamp)
eventBuffer = eventBuffer[:0]
}
}
l.publishBatch(events)
events = events[:0]
l.publishBatch(batch)
batch.reset()
case msg, more := <-l.messages:
if !more {
// Flush event buffer and release resources
events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
l.processEvent(batch, eventBuffer, eventBufferTimestamp)
eventBuffer = eventBuffer[:0]
l.publishBatch(events)
events = events[:0]
l.publishBatch(batch)
batch.reset()
return
}
if eventBufferTimestamp == 0 {
Expand All @@ -425,7 +436,7 @@ func (l *logStream) collectBatch() {
if l.multilinePattern.Match(unprocessedLine) || len(eventBuffer)+len(unprocessedLine) > maximumBytesPerEvent {
// This is a new log event or we will exceed max bytes per event
// so flush the current eventBuffer to events and reset timestamp
events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
l.processEvent(batch, eventBuffer, eventBufferTimestamp)
eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
eventBuffer = eventBuffer[:0]
}
Expand All @@ -434,7 +445,7 @@ func (l *logStream) collectBatch() {
eventBuffer = append(eventBuffer, processedLine...)
logger.PutMessage(msg)
} else {
events = l.processEvent(events, unprocessedLine, msg.Timestamp.UnixNano()/int64(time.Millisecond))
l.processEvent(batch, unprocessedLine, msg.Timestamp.UnixNano()/int64(time.Millisecond))
logger.PutMessage(msg)
}
}
Expand All @@ -450,47 +461,41 @@ func (l *logStream) collectBatch() {
// bytes per event (defined in maximumBytesPerEvent). There is a fixed per-event
// byte overhead (defined in perEventBytes) which is accounted for in split- and
// batch-calculations.
func (l *logStream) processEvent(events []wrappedEvent, unprocessedLine []byte, timestamp int64) []wrappedEvent {
bytes := 0
func (l *logStream) processEvent(batch *eventBatch, unprocessedLine []byte, timestamp int64) {
for len(unprocessedLine) > 0 {
// Split line length so it does not exceed the maximum
lineBytes := len(unprocessedLine)
if lineBytes > maximumBytesPerEvent {
lineBytes = maximumBytesPerEvent
}
line := unprocessedLine[:lineBytes]
unprocessedLine = unprocessedLine[lineBytes:]
if (len(events) >= maximumLogEventsPerPut) || (bytes+lineBytes+perEventBytes > maximumBytesPerPut) {
// Publish an existing batch if it's already over the maximum number of events or if adding this
// event would push it over the maximum number of total bytes.
l.publishBatch(events)
events = events[:0]
bytes = 0
}
events = append(events, wrappedEvent{

event := wrappedEvent{
inputLogEvent: &cloudwatchlogs.InputLogEvent{
Message: aws.String(string(line)),
Timestamp: aws.Int64(timestamp),
},
insertOrder: len(events),
})
bytes += (lineBytes + perEventBytes)
insertOrder: batch.count(),
}

added := batch.add(event, lineBytes)
if added {
unprocessedLine = unprocessedLine[lineBytes:]
} else {
l.publishBatch(batch)
batch.reset()
}
}
return events
}

// publishBatch calls PutLogEvents for a given set of InputLogEvents,
// accounting for sequencing requirements (each request must reference the
// sequence token returned by the previous request).
func (l *logStream) publishBatch(events []wrappedEvent) {
if len(events) == 0 {
func (l *logStream) publishBatch(batch *eventBatch) {
if batch.isEmpty() {
return
}

// events in a batch must be sorted by timestamp
// see http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
sort.Sort(byTimestamp(events))
cwEvents := unwrapEvents(events)
cwEvents := unwrapEvents(batch.events())

nextSequenceToken, err := l.putLogEvents(cwEvents, l.sequenceToken)

Expand Down Expand Up @@ -615,3 +620,70 @@ func unwrapEvents(events []wrappedEvent) []*cloudwatchlogs.InputLogEvent {
}
return cwEvents
}

func newEventBatch() *eventBatch {
return &eventBatch{
batch: make([]wrappedEvent, 0),
bytes: 0,
}
}

// events returns a slice of wrappedEvents sorted in order of their
// timestamps and then by their insertion order (see `byTimestamp`).
//
// Warning: this method is not threadsafe and must not be used
// concurrently.
func (b *eventBatch) events() []wrappedEvent {
sort.Sort(byTimestamp(b.batch))
return b.batch
}

// add adds an event to the batch of events accounting for the
// necessary overhead for an event to be logged. An error will be
// returned if the event cannot be added to the batch due to service
// limits.
//
// Warning: this method is not threadsafe and must not be used
// concurrently.
func (b *eventBatch) add(event wrappedEvent, size int) bool {
addBytes := size + perEventBytes

// verify we are still within service limits
switch {
case len(b.batch)+1 > maximumLogEventsPerPut:
return false
case b.bytes+addBytes > maximumBytesPerPut:
return false
}

b.bytes += addBytes
b.batch = append(b.batch, event)

return true
}

// count is the number of batched events. Warning: this method
// is not threadsafe and must not be used concurrently.
func (b *eventBatch) count() int {
return len(b.batch)
}

// size is the total number of bytes that the batch represents.
//
// Warning: this method is not threadsafe and must not be used
// concurrently.
func (b *eventBatch) size() int {
return b.bytes
}

func (b *eventBatch) isEmpty() bool {
zeroEvents := b.count() == 0
zeroSize := b.size() == 0
return zeroEvents && zeroSize
}

// reset prepares the batch for reuse.
func (b *eventBatch) reset() {
b.bytes = 0
b.batch = b.batch[:0]
}
72 changes: 55 additions & 17 deletions daemon/logger/awslogs/cloudwatchlogs_test.go
Expand Up @@ -49,6 +49,15 @@ func (l *logStream) logGenerator(lineCount int, multilineCount int) {
}
}

func testEventBatch(events []wrappedEvent) *eventBatch {
batch := newEventBatch()
for _, event := range events {
eventlen := len([]byte(*event.inputLogEvent.Message))
batch.add(event, eventlen)
}
return batch
}

func TestNewAWSLogsClientUserAgentHandler(t *testing.T) {
info := logger.Info{
Config: map[string]string{
Expand Down Expand Up @@ -212,7 +221,7 @@ func TestPublishBatchSuccess(t *testing.T) {
},
}

stream.publishBatch(events)
stream.publishBatch(testEventBatch(events))
if stream.sequenceToken == nil {
t.Fatal("Expected non-nil sequenceToken")
}
Expand Down Expand Up @@ -257,7 +266,7 @@ func TestPublishBatchError(t *testing.T) {
},
}

stream.publishBatch(events)
stream.publishBatch(testEventBatch(events))
if stream.sequenceToken == nil {
t.Fatal("Expected non-nil sequenceToken")
}
Expand Down Expand Up @@ -291,7 +300,7 @@ func TestPublishBatchInvalidSeqSuccess(t *testing.T) {
},
}

stream.publishBatch(events)
stream.publishBatch(testEventBatch(events))
if stream.sequenceToken == nil {
t.Fatal("Expected non-nil sequenceToken")
}
Expand Down Expand Up @@ -354,7 +363,7 @@ func TestPublishBatchAlreadyAccepted(t *testing.T) {
},
}

stream.publishBatch(events)
stream.publishBatch(testEventBatch(events))
if stream.sequenceToken == nil {
t.Fatal("Expected non-nil sequenceToken")
}
Expand Down Expand Up @@ -859,19 +868,23 @@ func TestCollectBatchMaxEvents(t *testing.T) {
}

func TestCollectBatchMaxTotalBytes(t *testing.T) {
mockClient := newMockClientBuffered(1)
expectedPuts := 2
mockClient := newMockClientBuffered(expectedPuts)
stream := &logStream{
client: mockClient,
logGroupName: groupName,
logStreamName: streamName,
sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message),
}
mockClient.putLogEventsResult <- &putLogEventsResult{
successResult: &cloudwatchlogs.PutLogEventsOutput{
NextSequenceToken: aws.String(nextSequenceToken),
},
for i := 0; i < expectedPuts; i++ {
mockClient.putLogEventsResult <- &putLogEventsResult{
successResult: &cloudwatchlogs.PutLogEventsOutput{
NextSequenceToken: aws.String(nextSequenceToken),
},
}
}

var ticks = make(chan time.Time)
newTicker = func(_ time.Duration) *time.Ticker {
return &time.Ticker{
Expand All @@ -881,32 +894,57 @@ func TestCollectBatchMaxTotalBytes(t *testing.T) {

go stream.collectBatch()

longline := strings.Repeat("A", maximumBytesPerPut)
numPayloads := maximumBytesPerPut / (maximumBytesPerEvent + perEventBytes)
// maxline is the maximum line that could be submitted after
// accounting for its overhead.
maxline := strings.Repeat("A", maximumBytesPerPut-(perEventBytes*numPayloads))
// This will be split and batched up to the `maximumBytesPerPut'
// (+/- `maximumBytesPerEvent'). This /should/ be aligned, but
// should also tolerate an offset within that range.
stream.Log(&logger.Message{
Line: []byte(longline + "B"),
Line: []byte(maxline[:len(maxline)/2]),
Timestamp: time.Time{},
})
stream.Log(&logger.Message{
Line: []byte(maxline[len(maxline)/2:]),
Timestamp: time.Time{},
})
stream.Log(&logger.Message{
Line: []byte("B"),
Timestamp: time.Time{},
})

// no ticks
// no ticks, guarantee batch by size (and chan close)
stream.Close()

argument := <-mockClient.putLogEventsArgument
if argument == nil {
t.Fatal("Expected non-nil PutLogEventsInput")
}
bytes := 0

// Should total to the maximum allowed bytes.
eventBytes := 0
for _, event := range argument.LogEvents {
bytes += len(*event.Message)
eventBytes += len(*event.Message)
}
eventsOverhead := len(argument.LogEvents) * perEventBytes
payloadTotal := eventBytes + eventsOverhead
// lowestMaxBatch allows the payload to be offset if the messages
// don't lend themselves to align with the maximum event size.
lowestMaxBatch := maximumBytesPerPut - maximumBytesPerEvent

if payloadTotal > maximumBytesPerPut {
t.Errorf("Expected <= %d bytes but was %d", maximumBytesPerPut, payloadTotal)
}
if bytes > maximumBytesPerPut {
t.Errorf("Expected <= %d bytes but was %d", maximumBytesPerPut, bytes)
if payloadTotal < lowestMaxBatch {
t.Errorf("Batch to be no less than %d but was %d", lowestMaxBatch, payloadTotal)
}

argument = <-mockClient.putLogEventsArgument
if len(argument.LogEvents) != 1 {
t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents))
}
message := *argument.LogEvents[0].Message
message := *argument.LogEvents[len(argument.LogEvents)-1].Message
if message[len(message)-1:] != "B" {
t.Errorf("Expected message to be %s but was %s", "B", message[len(message)-1:])
}
Expand Down

0 comments on commit ad14dbf

Please sign in to comment.