Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for #22951 #22953

Closed
wants to merge 15 commits into from
Closed

Fix for #22951 #22953

wants to merge 15 commits into from

Conversation

nbali
Copy link
Contributor

@nbali nbali commented Aug 30, 2022

Closes #22951


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • [ ] Update CHANGES.md with noteworthy changes.
  • [ ] If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI.

@nbali
Copy link
Contributor Author

nbali commented Aug 30, 2022

R: @lukecwik

@github-actions
Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

@nbali
Copy link
Contributor Author

nbali commented Aug 30, 2022

fyi, most than likely it requires spotless, used github.dev for it

@nbali
Copy link
Contributor Author

nbali commented Aug 30, 2022

irrelevant test failure with "Java Tests / Java Wordcount Direct Runner (windows-latest) (pull_request)"

@nbali
Copy link
Contributor Author

nbali commented Aug 31, 2022

Run Java PreCommit

1 similar comment
@lukecwik
Copy link
Member

lukecwik commented Sep 1, 2022

Run Java PreCommit

@@ -113,6 +113,9 @@
// If user triggering is supplied, we will trigger the file write after this many records are
// written.
static final int FILE_TRIGGERING_RECORD_COUNT = 500000;
// If user triggering is supplied, we will trigger the file write after this many bytes are
// written.
static final long FILE_TRIGGERING_BYTE_COUNT = 100 * (1L << 20); // 100MiB
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we already have a memory limit for writing of 20 parallel writers with 64mb buffers. Should we limit this triggering to be 64mbs as well so that it fits in one chunk?

CC: @reuvenlax Any suggestions here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lukecwik Having the same limit as a buffer actually makes sense to me, but can you direct me towards where might I find that limit? I can see it in the comments for DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE, but instead of hardcoding 64MB here as well, I would rather reference the original limit directly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lukecwik
On second thought, I think there is a problem with using this 64MB default. We only flush the batch inside GroupIntoBatches, once the storedBatchSizeBytes is greater than or equal to the limit. So if we make the limit 64MB, more than likely we will flush just a bit more than 64MB so we won't fit into the 64MB buffer.

So either the triggering byte count should be x% smaller than the 64MB default, or GroupIntoBatches has to be modified that it the current element would make it go over the byte size limit, then fire the batch without that element being added to the it first. The second seems like a better solution, but I assume doing the storedBatchSizeBytes.read() sooner would have performance impact.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, didn't see this comment but I agree that we should change GroupIntoBatches to ensure that if we add an element that would make it go over the limit we would flush a batch.

Pseudo-code would be like:

byteSize = measure(obj)
if (byteSize >= byteSizeLimit) {
  output obj as single element batch
  continue
}
if (byteSize + previousNumBytes > byteSizeLimit) {
  output all buffered elements as batch
} else {
  add obj to batch
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used a different algo, but IMO it stays closed to the original concept of the transform now.

@@ -117,6 +117,11 @@
*/
@AutoValue
public abstract static class BatchingParams<InputT> implements Serializable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like your adding support for GroupIntoBatches to limit on count and byte size at the same time.

Can you add tests that cover this new scenario to:

  • GroupIntoBatchesTest
  • GroupIntoBatchesTranslationTest

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codecov
Copy link

codecov bot commented Sep 6, 2022

Codecov Report

Merging #22953 (b2a6f46) into master (1dd2ccd) will increase coverage by 0.14%.
The diff coverage is n/a.

❗ Current head b2a6f46 differs from pull request most recent head fa1bd88. Consider uploading reports for the commit fa1bd88 to get more accurate results

@@            Coverage Diff             @@
##           master   #22953      +/-   ##
==========================================
+ Coverage   73.45%   73.59%   +0.14%     
==========================================
  Files         714      716       +2     
  Lines       96497    95282    -1215     
==========================================
- Hits        70886    70127     -759     
+ Misses      24289    23859     -430     
+ Partials     1322     1296      -26     
Flag Coverage Δ
go 50.94% <0.00%> (-0.52%) ⬇️
python 83.42% <0.00%> (+0.26%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
sdks/go/pkg/beam/core/metrics/dumper.go 0.00% <0.00%> (-53.97%) ⬇️
sdks/go/pkg/beam/core/metrics/store.go 10.63% <0.00%> (-25.54%) ⬇️
sdks/go/pkg/beam/runners/direct/gbk.go 72.35% <0.00%> (-11.24%) ⬇️
sdks/go/pkg/beam/core/metrics/metrics.go 49.36% <0.00%> (-10.93%) ⬇️
sdks/go/pkg/beam/register/emitter.go 47.69% <0.00%> (-6.37%) ⬇️
sdks/go/pkg/beam/options/jobopts/options.go 87.27% <0.00%> (-5.91%) ⬇️
sdks/go/pkg/beam/register/iter.go 67.21% <0.00%> (-5.76%) ⬇️
...ython/apache_beam/io/gcp/experimental/spannerio.py 82.15% <0.00%> (-4.79%) ⬇️
.../python/apache_beam/testing/test_stream_service.py 88.09% <0.00%> (-4.77%) ⬇️
...ks/python/apache_beam/runners/worker/statecache.py 89.69% <0.00%> (-4.37%) ⬇️
... and 153 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more


private boolean checkBatchSizes(Iterable<KV<String, Iterable<String>>> listToCheck) {
for (KV<String, Iterable<String>> element : listToCheck) {
if (Iterables.size(element.getValue()) != BATCH_SIZE) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did notice that it's != and not > here, but the test is still valid with > (we have 10 elements, and 5 batch size, so it can't be anything but 5, and we check the batch count at the end with EVEN_NUM_ELEMENTS / BATCH_SIZE)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say that this previous test was too strict and your update makes sense. GroupIntoBatches ensures that the batches aren't bigger than BATCH_SIZE elements.

Unfortunately I think the GroupIntoBatches specification is too loose since it uses words like Aim to create batches. It would be great if we could make it a strict guarantee, for example batches will never be bigger then element count, or that they will never be bigger then byte size (except for the case where a single element is bigger then byte size and it will show up in its own group). I wouldn't try to solve this here but it would make sense to have a bug/and or follow-up PR to make this explicit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I think I did solved that. I mean apart from the inaccuracy of the weigher.

… introduced test in GroupIntoBatchesTest
}

// fire them all at once
TestStream<KV<String, String>> stream = streamBuilder.advanceWatermarkToInfinity();
Copy link
Contributor Author

@nbali nbali Sep 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lukecwik is there any other (so not TestStream+TimestampedValue) simpler way to guarantee the order of the elements that I missed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using TestStream is a way to ensure that the output is produced and processed in a specific order ensuring exact output conditions. It makes it easier to write pipeline level integration tests for exact scenarios.

As the other tests have done the other option is to use PAssert with a custom matcher that passes for any valid combination of outputs. It is difficult to have a meaningful test for cases where runner determined re-ordering can produce lots of different combinations of output that are valid. Typically one just writes a check to make sure that certain properties are satisfied. For GroupIntoBatches with both limits you would ensure that if you take out the largest element from the group then the byte size is less than the limit and also ensure that the group into batches element count is never greater than the specified element count.

These properties would ensure that the GroupIntoBatches was honoring the contract but a naive implementation could choose to group inefficiently (e.g. each batch is one element) which would still be valid.

@nbali
Copy link
Contributor Author

nbali commented Sep 8, 2022

Run Java PreCommit

2 similar comments
@nbali
Copy link
Contributor Author

nbali commented Sep 9, 2022

Run Java PreCommit

@nbali
Copy link
Contributor Author

nbali commented Sep 9, 2022

Run Java PreCommit

@nbali
Copy link
Contributor Author

nbali commented Sep 12, 2022

@nbali
Copy link
Contributor Author

nbali commented Sep 12, 2022

@lukecwik Can I use the same PipelineOptions there (in WriteFiles) as well? Does it use the same network layer?

Copy link
Member

@lukecwik lukecwik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the long wait.

}

// fire them all at once
TestStream<KV<String, String>> stream = streamBuilder.advanceWatermarkToInfinity();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using TestStream is a way to ensure that the output is produced and processed in a specific order ensuring exact output conditions. It makes it easier to write pipeline level integration tests for exact scenarios.

As the other tests have done the other option is to use PAssert with a custom matcher that passes for any valid combination of outputs. It is difficult to have a meaningful test for cases where runner determined re-ordering can produce lots of different combinations of output that are valid. Typically one just writes a check to make sure that certain properties are satisfied. For GroupIntoBatches with both limits you would ensure that if you take out the largest element from the group then the byte size is less than the limit and also ensure that the group into batches element count is never greater than the specified element count.

These properties would ensure that the GroupIntoBatches was honoring the contract but a naive implementation could choose to group inefficiently (e.g. each batch is one element) which would still be valid.


private boolean checkBatchSizes(Iterable<KV<String, Iterable<String>>> listToCheck) {
for (KV<String, Iterable<String>> element : listToCheck) {
if (Iterables.size(element.getValue()) != BATCH_SIZE) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say that this previous test was too strict and your update makes sense. GroupIntoBatches ensures that the batches aren't bigger than BATCH_SIZE elements.

Unfortunately I think the GroupIntoBatches specification is too loose since it uses words like Aim to create batches. It would be great if we could make it a strict guarantee, for example batches will never be bigger then element count, or that they will never be bigger then byte size (except for the case where a single element is bigger then byte size and it will show up in its own group). I wouldn't try to solve this here but it would make sense to have a bug/and or follow-up PR to make this explicit.

@@ -267,20 +225,9 @@ public void testWithShardedKeyInGlobalWindow() {
PAssert.that("Incorrect batch size in one or more elements", collection)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should move the comment just above into checkBatchSizes:

    // Since with default sharding, the number of subshards of a key is nondeterministic, create
    // a large number of input elements and a small batch size and check there is no batch larger
    // than the specified size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Erhm, isn't this comment only valid for .withShardedKey()?

})
public void testMultipleLimitsAtOnceInGlobalWindowBatchSizeCountAndBatchSizeByteSize() {
// with using only one of the limits the result would be only 2 batches,
// if we have 3 both limit works
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// if we have 3 both limit works
// if we have 3 both limits are exercised

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.map(s -> KV.of("key", s))
.collect(Collectors.toList());

// to ensure ordered firing
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// to ensure ordered firing
// to ensure ordered processing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.advanceWatermarkTo(Instant.EPOCH);

long offset = 0L;
for (KV<String, String> kv : dataToUse) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should advance the watermark on each element to ensure that it is processed in order. If we advance the watermark only at the end then all the elements can be processed in parallel and there is no guarantee that the elements will be processed in the order that they were added.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't the the different/increasing timestamps already guarantee that?

Copy link
Member

@lukecwik lukecwik Nov 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your right, each addElements call is its own batch that need to be processed.

@lukecwik
Copy link
Member

lukecwik commented Oct 4, 2022

@lukecwik Can I use the same PipelineOptions there (in WriteFiles) as well? Does it use the same network layer?

Sort of, the issue is that the person might be writing to a different file system that isn't GCS. If you had a way to check the filesystem then you could apply the GCS limit. On the other hand it might make sense to use it anyways.

@lukecwik
Copy link
Member

lukecwik commented Oct 4, 2022

It would be great if we could update GroupIntoBatches to honor the byte size limit for batches with more than one element and output batches of exactly one element if that one is too big.

This could be done by changing the logic within GroupIntoBatches to measure the size first and only add it to the batch if adding it would make it go over. Some pseudo code:

byteSize = measure(obj)
if (byteSize >= byteSizeLimit) {
  output obj as single element batch
  continue
}
if (byteSize + previousNumBytes > byteSizeLimit) {
  output all buffered elements as batch
} else {
  add obj to batch
}

We could update the javadoc contract to be stricter as well with this change and it would solve the GroupIntoBatches causing GCS buffering overflow problem since batches would try to be under 64mibs unless there is a large element.

@nbali
Copy link
Contributor Author

nbali commented Nov 15, 2022

@lukecwik Can I use the same PipelineOptions there (in WriteFiles) as well? Does it use the same network layer?

Sort of, the issue is that the person might be writing to a different file system that isn't GCS. If you had a way to check the filesystem then you could apply the GCS limit. On the other hand it might make sense to use it anyways.

The class wasn't even available as a dependency - for a good reason -, so I just hardcoded 64MB there.

@@ -424,13 +465,40 @@ public void processElement(
BoundedWindow window,
OutputReceiver<KV<K, Iterable<InputT>>> receiver) {
LOG.debug("*** BATCH *** Add element for window {} ", window);
if (shouldCareAboutWeight()) {
final long elementWeight = weigher.apply(element.getValue());
if (elementWeight + storedBatchSizeBytes.read() > batchSizeBytes) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this defeats the readLater optimization, since you're eagerly reading the value here (meaning also there's no point in the below readLater). you should add readLaters for minBufferedTs (if needed) and storedBatchSize earlier in the function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH I wasn't sure how read()/readLater() implementation works. Like if we read it once, then will it be cached for the whole duration already or will it fetch it again.. but I assumed readLater() - as every prefetching method should be - is already optimized to be noop for already present values, so worst case is that we have an unnecessary noop call.

So to sum things up does that mean that a value returned by a read() call will be always available from that point forward? Anyway modified the PR/code to reflect this.

@nbali
Copy link
Contributor Author

nbali commented Nov 18, 2022

Waiting for #20819

@nbali
Copy link
Contributor Author

nbali commented Nov 23, 2022

Run Java PreCommit

1 similar comment
@nbali
Copy link
Contributor Author

nbali commented Nov 29, 2022

Run Java PreCommit

@nbali
Copy link
Contributor Author

nbali commented Nov 29, 2022

Run Java_Examples_Dataflow PreCommit

@lukecwik
Copy link
Member

Run Dataflow ValidatesRunner

@lukecwik
Copy link
Member

Run Dataflow Streaming ValidatesRunner

@lukecwik
Copy link
Member

Run Java Dataflow V2 ValidatesRunner

@lukecwik
Copy link
Member

Run Java Dataflow V2 ValidatesRunner Streaming

@lukecwik
Copy link
Member

lukecwik commented Dec 1, 2022

Run Java PreCommit

1 similar comment
@lukecwik
Copy link
Member

lukecwik commented Dec 1, 2022

Run Java PreCommit

@lukecwik
Copy link
Member

lukecwik commented Dec 1, 2022

Note that I cloned this PR and added this patch and opened up a new PR. If the Dataflow tests there pass I intend to merge it and close this one.

@lukecwik
Copy link
Member

lukecwik commented Dec 2, 2022

#24463 containing this plus a fix was merged.

@lukecwik lukecwik closed this Dec 2, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug]: KafkaIO could fail with BigQueryIO.Write.withAutoSharding()
3 participants