-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix for #22951 #22953
Fix for #22951 #22953
Changes from all commits
2eacbd4
ff6b5a0
5f79ed6
bcd4ba9
040b744
b1b732c
70a92b6
6abe4cd
3e53b0f
f00c85d
5840e55
6d016f1
9f874f7
5a3ee7f
fa1bd88
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -117,6 +117,11 @@ public class GroupIntoBatches<K, InputT> | |
*/ | ||
@AutoValue | ||
public abstract static class BatchingParams<InputT> implements Serializable { | ||
private static <InputT> BatchingParams<InputT> createDefault() { | ||
return new AutoValue_GroupIntoBatches_BatchingParams( | ||
Long.MAX_VALUE, Long.MAX_VALUE, null, Duration.ZERO); | ||
} | ||
|
||
public static <InputT> BatchingParams<InputT> create( | ||
long batchSize, | ||
long batchSizeBytes, | ||
|
@@ -170,8 +175,7 @@ private GroupIntoBatches(BatchingParams<InputT> params) { | |
/** Aim to create batches each with the specified element count. */ | ||
public static <K, InputT> GroupIntoBatches<K, InputT> ofSize(long batchSize) { | ||
Preconditions.checkState(batchSize < Long.MAX_VALUE); | ||
return new GroupIntoBatches<>( | ||
BatchingParams.create(batchSize, Long.MAX_VALUE, null, Duration.ZERO)); | ||
return new GroupIntoBatches<K, InputT>(BatchingParams.createDefault()).withSize(batchSize); | ||
} | ||
|
||
/** | ||
|
@@ -185,9 +189,8 @@ public static <K, InputT> GroupIntoBatches<K, InputT> ofSize(long batchSize) { | |
* {@link #ofByteSize(long, SerializableFunction)} to specify code to calculate the byte size. | ||
*/ | ||
public static <K, InputT> GroupIntoBatches<K, InputT> ofByteSize(long batchSizeBytes) { | ||
Preconditions.checkState(batchSizeBytes < Long.MAX_VALUE); | ||
return new GroupIntoBatches<>( | ||
BatchingParams.create(Long.MAX_VALUE, batchSizeBytes, null, Duration.ZERO)); | ||
return new GroupIntoBatches<K, InputT>(BatchingParams.createDefault()) | ||
.withByteSize(batchSizeBytes); | ||
} | ||
|
||
/** | ||
|
@@ -196,16 +199,54 @@ public static <K, InputT> GroupIntoBatches<K, InputT> ofByteSize(long batchSizeB | |
*/ | ||
public static <K, InputT> GroupIntoBatches<K, InputT> ofByteSize( | ||
long batchSizeBytes, SerializableFunction<InputT, Long> getElementByteSize) { | ||
Preconditions.checkState(batchSizeBytes < Long.MAX_VALUE); | ||
return new GroupIntoBatches<>( | ||
BatchingParams.create(Long.MAX_VALUE, batchSizeBytes, getElementByteSize, Duration.ZERO)); | ||
return new GroupIntoBatches<K, InputT>(BatchingParams.createDefault()) | ||
.withByteSize(batchSizeBytes, getElementByteSize); | ||
} | ||
|
||
/** Returns user supplied parameters for batching. */ | ||
public BatchingParams<InputT> getBatchingParams() { | ||
return params; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return super.toString() + ", params=" + params; | ||
} | ||
|
||
/** @see #ofSize(long) */ | ||
public GroupIntoBatches<K, InputT> withSize(long batchSize) { | ||
Preconditions.checkState(batchSize < Long.MAX_VALUE); | ||
return new GroupIntoBatches<>( | ||
BatchingParams.create( | ||
batchSize, | ||
params.getBatchSizeBytes(), | ||
params.getElementByteSize(), | ||
params.getMaxBufferingDuration())); | ||
} | ||
|
||
/** @see #ofByteSize(long) */ | ||
public GroupIntoBatches<K, InputT> withByteSize(long batchSizeBytes) { | ||
Preconditions.checkState(batchSizeBytes < Long.MAX_VALUE); | ||
return new GroupIntoBatches<>( | ||
BatchingParams.create( | ||
params.getBatchSize(), | ||
batchSizeBytes, | ||
params.getElementByteSize(), | ||
params.getMaxBufferingDuration())); | ||
} | ||
|
||
/** @see #ofByteSize(long, SerializableFunction) */ | ||
public GroupIntoBatches<K, InputT> withByteSize( | ||
long batchSizeBytes, SerializableFunction<InputT, Long> getElementByteSize) { | ||
Preconditions.checkState(batchSizeBytes < Long.MAX_VALUE); | ||
return new GroupIntoBatches<>( | ||
BatchingParams.create( | ||
params.getBatchSize(), | ||
batchSizeBytes, | ||
getElementByteSize, | ||
params.getMaxBufferingDuration())); | ||
} | ||
|
||
/** | ||
* Sets a time limit (in processing time) on how long an incomplete batch of elements is allowed | ||
* to be buffered. Once a batch is flushed to output, the timer is reset. The provided limit must | ||
|
@@ -423,20 +464,56 @@ public void processElement( | |
@Timestamp Instant elementTs, | ||
BoundedWindow window, | ||
OutputReceiver<KV<K, Iterable<InputT>>> receiver) { | ||
|
||
final boolean shouldCareAboutWeight = weigher != null && batchSizeBytes != Long.MAX_VALUE; | ||
final boolean shouldCareAboutMaxBufferingDuration = | ||
maxBufferingDuration.isLongerThan(Duration.ZERO); | ||
|
||
if (shouldCareAboutWeight) { | ||
storedBatchSizeBytes.readLater(); | ||
} | ||
storedBatchSize.readLater(); | ||
if (shouldCareAboutMaxBufferingDuration) { | ||
minBufferedTs.readLater(); | ||
} | ||
|
||
LOG.debug("*** BATCH *** Add element for window {} ", window); | ||
if (shouldCareAboutWeight) { | ||
final long elementWeight = weigher.apply(element.getValue()); | ||
if (elementWeight + storedBatchSizeBytes.read() > batchSizeBytes) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this defeats the readLater optimization, since you're eagerly reading the value here (meaning also there's no point in the below readLater). you should add readLaters for minBufferedTs (if needed) and storedBatchSize earlier in the function. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TBH I wasn't sure how So to sum things up does that mean that a value returned by a |
||
// Firing by count and size limits behave differently. | ||
// | ||
// We always increase the count by one, so we will fire at the exact limit without any | ||
// overflow. | ||
// before the addition: x < limit | ||
// after the addition: x < limit OR x == limit(triggered) | ||
// | ||
// Meanwhile when we increase the batched byte size we do it with an undeterministic | ||
// amount, so if we only check the limit after we already increased the batch size it | ||
// could mean we fire over the limit. | ||
// before the addition: x < limit | ||
// after the addition: x < limit OR x == limit(triggered) OR x > limit(triggered) | ||
// We shouldn't trigger a batch with bigger size than the config to limit it contains, so | ||
// we fire it early if necessary. | ||
LOG.debug("*** EARLY FIRE OF BATCH *** for window {}", window.toString()); | ||
flushBatch( | ||
receiver, | ||
element.getKey(), | ||
batch, | ||
storedBatchSize, | ||
storedBatchSizeBytes, | ||
timerTs, | ||
minBufferedTs); | ||
bufferingTimer.clear(); | ||
} | ||
storedBatchSizeBytes.add(elementWeight); | ||
} | ||
batch.add(element.getValue()); | ||
// Blind add is supported with combiningState | ||
storedBatchSize.add(1L); | ||
if (weigher != null) { | ||
storedBatchSizeBytes.add(weigher.apply(element.getValue())); | ||
storedBatchSizeBytes.readLater(); | ||
} | ||
|
||
long num; | ||
if (maxBufferingDuration.isLongerThan(Duration.ZERO)) { | ||
minBufferedTs.readLater(); | ||
num = storedBatchSize.read(); | ||
|
||
final long num = storedBatchSize.read(); | ||
if (shouldCareAboutMaxBufferingDuration) { | ||
long oldOutputTs = | ||
MoreObjects.firstNonNull( | ||
minBufferedTs.read(), BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()); | ||
|
@@ -455,15 +532,14 @@ public void processElement( | |
.set(Instant.ofEpochMilli(targetTs)); | ||
} | ||
} | ||
num = storedBatchSize.read(); | ||
|
||
if (num % prefetchFrequency == 0) { | ||
// Prefetch data and modify batch state (readLater() modifies this) | ||
batch.readLater(); | ||
} | ||
|
||
if (num >= batchSize | ||
|| (batchSizeBytes != Long.MAX_VALUE && storedBatchSizeBytes.read() >= batchSizeBytes)) { | ||
|| (shouldCareAboutWeight && storedBatchSizeBytes.read() >= batchSizeBytes)) { | ||
LOG.debug("*** END OF BATCH *** for window {}", window.toString()); | ||
flushBatch( | ||
receiver, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like your adding support for GroupIntoBatches to limit on count and byte size at the same time.
Can you add tests that cover this new scenario to:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see
040b744
(#22953) andb1b732c
(#22953)