Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for #22951 #22953

Closed
wants to merge 15 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Streams;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
Expand Down Expand Up @@ -653,6 +651,7 @@ public void processElement(ProcessContext c, BoundedWindow window) {
ValidatesRunner.class,
NeedsRunner.class,
UsesTimersInParDo.class,
UsesTestStream.class,
UsesStatefulParDo.class,
UsesOnWindowExpiration.class
})
Expand All @@ -675,9 +674,26 @@ public void testMultipleLimitsAtOnceInGlobalWindowBatchSizeCountAndBatchSizeByte
.stream()
.map(s -> KV.of("key", s))
.collect(Collectors.toList());

// to ensure ordered firing
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// to ensure ordered firing
// to ensure ordered processing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TestStream.Builder<KV<String, String>> streamBuilder =
TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
.advanceWatermarkTo(Instant.EPOCH);

long offset = 0L;
for (KV<String, String> kv : dataToUse) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should advance the watermark on each element to ensure that it is processed in order. If we advance the watermark only at the end then all the elements can be processed in parallel and there is no guarantee that the elements will be processed in the order that they were added.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't the the different/increasing timestamps already guarantee that?

Copy link
Member

@lukecwik lukecwik Nov 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your right, each addElements call is its own batch that need to be processed.

streamBuilder =
streamBuilder.addElements(
TimestampedValue.of(kv, Instant.EPOCH.plus(Duration.standardSeconds(offset))));
offset++;
}

// fire them all at once
TestStream<KV<String, String>> stream = streamBuilder.advanceWatermarkToInfinity();
Copy link
Contributor Author

@nbali nbali Sep 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lukecwik is there any other (so not TestStream+TimestampedValue) simpler way to guarantee the order of the elements that I missed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using TestStream is a way to ensure that the output is produced and processed in a specific order ensuring exact output conditions. It makes it easier to write pipeline level integration tests for exact scenarios.

As the other tests have done the other option is to use PAssert with a custom matcher that passes for any valid combination of outputs. It is difficult to have a meaningful test for cases where runner determined re-ordering can produce lots of different combinations of output that are valid. Typically one just writes a check to make sure that certain properties are satisfied. For GroupIntoBatches with both limits you would ensure that if you take out the largest element from the group then the byte size is less than the limit and also ensure that the group into batches element count is never greater than the specified element count.

These properties would ensure that the GroupIntoBatches was honoring the contract but a naive implementation could choose to group inefficiently (e.g. each batch is one element) which would still be valid.


PCollection<KV<String, Iterable<String>>> collection =
pipeline
.apply("Input data", Create.of(dataToUse))
.apply("Input data", stream)
.apply(
GroupIntoBatches.<String, String>ofSize(BATCH_SIZE).withByteSize(BATCH_SIZE_BYTES))
// set output coder
Expand All @@ -693,7 +709,7 @@ private void assertExpectedBatchPrefix(
Streams.stream(element.getValue())
.map(s -> Iterables.get(Splitter.on('-').split(s), 0))
.collect(Collectors.toSet());
assertEquals("Found invalid batching: " + batchPrefixes, 1, batchPrefixes.size());
assertEquals("Found invalid batching: " + listToCheck, 1, batchPrefixes.size());
}
}

Expand All @@ -702,16 +718,16 @@ public Void apply(Iterable<KV<String, Iterable<String>>> input) {
assertTrue(checkBatchSizes(input));
assertTrue(checkBatchByteSizes(input));
assertExpectedBatchPrefix(input);
assertEquals(
Lists.newArrayList(3, 5, 1),
Streams.stream(input)
.map(KV::getValue)
.map(Iterables::size)
.collect(Collectors.toList()));
return null;
}
});

PAssert.thatSingleton("Incorrect batching", collection.apply("Count", Count.globally()))
.satisfies(
numberOfBatches -> {
MatcherAssert.assertThat(numberOfBatches, Matchers.equalTo(3L));
return null;
});
pipeline.run();
}

Expand Down