Skip to content

Commit

Permalink
Fixing State.readLater() usage + Unit test fix to reflect changed exp…
Browse files Browse the repository at this point in the history
…ectations
  • Loading branch information
nbali committed Nov 16, 2022
1 parent 5840e55 commit 1e1770b
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -464,8 +464,21 @@ public void processElement(
@Timestamp Instant elementTs,
BoundedWindow window,
OutputReceiver<KV<K, Iterable<InputT>>> receiver) {

final boolean shouldCareAboutWeight = weigher != null && batchSizeBytes != Long.MAX_VALUE;
final boolean shouldCareAboutMaxBufferingDuration =
maxBufferingDuration.isLongerThan(Duration.ZERO);

if (shouldCareAboutWeight) {
storedBatchSizeBytes.readLater();
}
storedBatchSize.readLater();
if (shouldCareAboutMaxBufferingDuration) {
minBufferedTs.readLater();
}

LOG.debug("*** BATCH *** Add element for window {} ", window);
if (shouldCareAboutWeight()) {
if (shouldCareAboutWeight) {
final long elementWeight = weigher.apply(element.getValue());
if (elementWeight + storedBatchSizeBytes.read() > batchSizeBytes) {
// Firing by count and size limits behave differently.
Expand Down Expand Up @@ -494,17 +507,13 @@ public void processElement(
bufferingTimer.clear();
}
storedBatchSizeBytes.add(elementWeight);
storedBatchSizeBytes.readLater();
}
batch.add(element.getValue());
// Blind add is supported with combiningState
storedBatchSize.add(1L);

long num;
if (maxBufferingDuration.isLongerThan(Duration.ZERO)) {
minBufferedTs.readLater();
num = storedBatchSize.read();

final long num = storedBatchSize.read();
if (shouldCareAboutMaxBufferingDuration) {
long oldOutputTs =
MoreObjects.firstNonNull(
minBufferedTs.read(), BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
Expand All @@ -523,15 +532,14 @@ public void processElement(
.set(Instant.ofEpochMilli(targetTs));
}
}
num = storedBatchSize.read();

if (num % prefetchFrequency == 0) {
// Prefetch data and modify batch state (readLater() modifies this)
batch.readLater();
}

if (num >= batchSize
|| (shouldCareAboutWeight() && storedBatchSizeBytes.read() >= batchSizeBytes)) {
|| (shouldCareAboutWeight && storedBatchSizeBytes.read() >= batchSizeBytes)) {
LOG.debug("*** END OF BATCH *** for window {}", window.toString());
flushBatch(
receiver,
Expand All @@ -545,10 +553,6 @@ public void processElement(
}
}

private boolean shouldCareAboutWeight() {
return weigher != null && batchSizeBytes != Long.MAX_VALUE;
}

@OnTimer(END_OF_BUFFERING_ID)
public void onBufferingTimer(
OutputReceiver<KV<K, Iterable<InputT>>> receiver,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public Void apply(Iterable<KV<String, Iterable<String>>> input) {
}
});
PAssert.thatSingleton("Incorrect collection size", collection.apply("Count", Count.globally()))
.isEqualTo(3L);
.isEqualTo(4L);
pipeline.run();
}

Expand Down Expand Up @@ -196,7 +196,7 @@ public Void apply(Iterable<KV<String, Iterable<String>>> input) {
}
});
PAssert.thatSingleton("Incorrect collection size", collection.apply("Count", Count.globally()))
.isEqualTo(5L);
.isEqualTo(9L);
pipeline.run();
}

Expand Down

0 comments on commit 1e1770b

Please sign in to comment.