New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix awslogs batch size calculation #35726
Conversation
ping @anusha-ragunathan PTAL |
7791ffe
to
5ad88a9
Compare
I'm investigating these errors now, I didn't have these same errors when running tests locally. |
There's a race condition with the mock and the tests, I'm going to continue to narrow it down and fix it. |
5ad88a9
to
57874ae
Compare
Looks like jenkins failed to get janky and powerpc tests up and running. Can we restart those tests? |
Restarted 👍 |
@jahkeup if you need to make additional changes, can you remove the |
57874ae
to
9e92e03
Compare
@thaJeztah no problem! I've removed the 'fixes' reference and made a mental note to not include that in the future 👍 @anusha-ragunathan PTAL, I tested the latest set of changes locally with ~500 rounds to see if the test flaked out - this seems stable now and is passing tests. |
func (b *eventBatch) Events() []wrappedEvent { | ||
// events in a batch must be sorted by timestamp | ||
// see http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html | ||
sort.Sort(byTimestamp(b.events)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this undoing 443f251 (#24814)? /cc @samuelkarp
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is respecting that, the code that stable sorts according to timestamp and then insertion order is implemented in the sort for this type (https://github.com/jahkeup/moby/blob/b0046c3e68579640b5b08a4cb8b754924fbc2a30/daemon/logger/awslogs/cloudwatchlogs.go#L631).
The insertion order is recorded as the wrappedEvent is crafted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, thanks! Sorry, it wasn't clear directly 😊 Do you think we need to change byTimestamp
to something else? Alternatively, move the description of sorting to the GoDoc of the Events()
method, and describe that it's sorting by timestamp, but preserving insertOrder
if timestamp is equal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll update the GoDoc for this method to clarify the action and results of retrieving the events from the batch.
The byTimestamp
type could probably be better named for its usages, however I can't think of a more apt name without stringing together many many words. Do you have any suggestions on a different name? Do you think the GoDoc statement could instead be more clear without needing a name change?
// necessary overhead for an event to be logged. | ||
func (b *eventBatch) Add(event wrappedEvent, size int) { | ||
b.bytes = b.bytes + size + perEventBytes | ||
b.events = append(b.events, event) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks thread unsafe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same case with Count
and Size
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed. This type isn't intended for concurrently use and isn't accessed concurrently as it stands - hence the unsafe operations. Do you have suggestions to improve this given the current implementation of its consumer? My goal here was to keep the type small and simple for this use case, but I am happy to add some mechanisms for safety.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May need a mention in the function description; also given that it's only use in this file, we should probably un-export these methods
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. This type is only used here so I'll unexport the methods. I'll also include a statement about the thread unsafe nature in each method's GoDoc line to make sure future developers understand this design assumption.
@@ -460,37 +411,30 @@ func (l *logStream) processEvent(events []wrappedEvent, unprocessedLine []byte, | |||
} | |||
line := unprocessedLine[:lineBytes] | |||
unprocessedLine = unprocessedLine[lineBytes:] | |||
if (len(events) >= maximumLogEventsPerPut) || (bytes+lineBytes+perEventBytes > maximumBytesPerPut) { | |||
if (batch.Count() >= maximumLogEventsPerPut) || (batch.Size()+lineBytes+perEventBytes > maximumBytesPerPut) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just thinking out loud here; wondering if we could make batch.Add()
reject the event if the Size or Count limit was reached, something like (dummy code);
err := batch.Add(event)
switch err := err.(type) {
case <maximum size reached>, <maximum logs reached>:
l.publishBatch(batch)
case nil:
unprocessedLine = unprocessedLine[lineBytes:]
default:
// something bad happened :D
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would certainly clean up some of these checked Add()
s! I like the indication of the error and codification of the limit check in the batch itself, let me whip something up.
func (l *logStream) publishBatch(events []wrappedEvent) { | ||
if len(events) == 0 { | ||
func (l *logStream) publishBatch(batch *eventBatch) { | ||
if batch.Count() == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if batch.Count
is 0, but batch.bytes
is non-zero? Things could get out of whack. Probably better to Reset
before return anyways.
I'm not wild about how you've reorganized the code here. Previously, the code was intended to be ordered such that caller was above callee, or most-abstract to most-concrete. Now you have concrete implementation details (like the |
} | ||
|
||
switch batch.add(event, lineBytes) { | ||
case errBatchCountTooLarge, errBatchSizeTooLarge: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both of these errors result in the exact same code being run. Why not just return bool
so you can do
if ok := batch.add(event, lineBytes); ok {
unprocessedLine = unprocessedLine[lineBytes:]
} else {
l.publishBatch(batch)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's certainly simplicity in the bool
approach, but I think that having the result codified and explicitly handled is more clearly read. I prefer to know that the there's a reason behind not adding the event, not just that it was added or not and the overhead to do so is negligible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're not recording that anywhere though. When I was reading the code, the different errors confused me because I didn't know how they were handled differently. Since they're in fact handled the same way and we don't ever do anything with the reason it seems like an unnecessary distinction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, either way we'll be handling it and arriving at the same result. This is a YAGNI situation, I'll drop this.
// Reset occurs regardless of error. The payload itself may be | ||
// causing issues and would otherwise prevent more events from being | ||
// sent. | ||
batch.Reset() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should batch.Reset()
happen here or in processEvent
after the call to publishBatch
? My vote would probably be for the latter since processEvent
is what owns the eventBatch
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would make it clearer that we're reusing the object if we lift the call up as you suggest - note that the batch is owned by collectBatch
where additional call sites would exist. This change would mean that each publishBatch()
must be followed by batch.Reset()
, otherwise duplicate batch of events will be put into CloudWatch. It's simpler to leave it in publishBatch
, but this hides its side effect; I'm going to add the additional calls to batch.Reset()
unless you think that doesn't make sense or if you had something else in mind.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A different model (though I haven't thought about it enough to know whether I like the idea) is to make a new method batch.Publish()
that is responsible for both publishing and resetting the internal buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a similar thought, but I dismissed on the grounds of keeping the submission of the events in the log driver and out of the eventBatch
container type. I'd rather have the submission responsibility on the log driver for now.
|
@jahkeup : Can you look into these test failures? Do they happen when you run integration tests locally? |
@jahkeup : This patch has code rearrangement that's not necessary, as mentioned in #35726 (comment). Can you update the change such that only the code relevant to the fix is updated? We want to keep the code changes minimal, if and when possible. This also becomes important for considerations for backporting/crossporting to other releases. |
@anusha-ragunathan yes I think that's prudent, and what I had in mind also. I will work with @samuelkarp offline to clean up and make some other improvements to this package. I'm still trying to repro the above failures in the integration tests. I do have some failures, but they're different one tests from the ones observed on jenkins and they seem to be happening irregularly. I'll try rebasing to take in any test fixes to see if it makes a difference. |
The teardowntest itself sometimes fails, so not nescessarily related (haven't checked the full output) |
The previous bytes counter was moved out of scope was not counting the total number of bytes in the batch. This type encapsulates the counter and the batch for consideration and code ergonomics. Signed-off-by: Jacob Vallejo <jakeev@amazon.com>
0b4df70
to
ad14dbf
Compare
@anusha-ragunathan PTAL, I've updated the code and the tests are now passing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
// Warning: this type is not threadsafe and must not be used | ||
// concurrently. This type is expected to be consumed in a single go | ||
// routine and never concurrently. | ||
type eventBatch struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you move this down to be right above the methods for the type? Those start on line 636.
LGTM |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
left some suggestions for a follow up, but no showstoppers
} | ||
|
||
added := batch.add(event, lineBytes) | ||
if added { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit; this could be (no need to change)
if ok := batch.add(event, lineBytes); ok {
@@ -615,3 +620,70 @@ func unwrapEvents(events []wrappedEvent) []*cloudwatchlogs.InputLogEvent { | |||
} | |||
return cwEvents | |||
} | |||
|
|||
func newEventBatch() *eventBatch { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps in a future rewrite we could pass in maximumLogEventsPerPut
and maximumBytesPerPut
(they feel like a property of the eventsBatch
);
func newEventBatch(maxEvents uint, maxBytes uint) *eventBatch {
} | ||
|
||
func (b *eventBatch) isEmpty() bool { | ||
zeroEvents := b.count() == 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: these variables are a bit redundant;
return b.count() == 0 && b.size() == 0
} | ||
events = append(events, wrappedEvent{ | ||
|
||
event := wrappedEvent{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrapping and Unwrapping could be done in batch.add()
/ batch.events()
as they're really related to how the batch stores the events (i.e. just for preserving the events order)
Fixes #35725
- What I did
- How I did it
The added type holds the batching mechanism and the counter used for determining whether or not to submit a batch of log events.
- How to verify it
The change can be verified by using the steps from #35725 to reproduce the issue and then comparing it to a build from this changeset. The appropriate test and test mock have been updated to validate this behavior.
- Description for the changelog
awslogs: fix batch size calculation for large logs
- A picture of a cute animal (not mandatory but encouraged)
cc @samuelkarp @adnxn