Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for #22951 #22953

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ public class GroupIntoBatches<K, InputT>
*/
@AutoValue
public abstract static class BatchingParams<InputT> implements Serializable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like your adding support for GroupIntoBatches to limit on count and byte size at the same time.

Can you add tests that cover this new scenario to:

  • GroupIntoBatchesTest
  • GroupIntoBatchesTranslationTest

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public static <InputT> BatchingParams<InputT> createDefault() {
lukecwik marked this conversation as resolved.
Show resolved Hide resolved
return new AutoValue_GroupIntoBatches_BatchingParams(
Long.MAX_VALUE, Long.MAX_VALUE, null, Duration.ZERO);
}

public static <InputT> BatchingParams<InputT> create(
long batchSize,
long batchSizeBytes,
Expand Down Expand Up @@ -170,8 +175,7 @@ private GroupIntoBatches(BatchingParams<InputT> params) {
/** Aim to create batches each with the specified element count. */
public static <K, InputT> GroupIntoBatches<K, InputT> ofSize(long batchSize) {
Preconditions.checkState(batchSize < Long.MAX_VALUE);
return new GroupIntoBatches<>(
BatchingParams.create(batchSize, Long.MAX_VALUE, null, Duration.ZERO));
return new GroupIntoBatches<K, InputT>(BatchingParams.createDefault()).withSize(batchSize);
}

/**
Expand All @@ -185,9 +189,8 @@ public static <K, InputT> GroupIntoBatches<K, InputT> ofSize(long batchSize) {
* {@link #ofByteSize(long, SerializableFunction)} to specify code to calculate the byte size.
*/
public static <K, InputT> GroupIntoBatches<K, InputT> ofByteSize(long batchSizeBytes) {
Preconditions.checkState(batchSizeBytes < Long.MAX_VALUE);
return new GroupIntoBatches<>(
BatchingParams.create(Long.MAX_VALUE, batchSizeBytes, null, Duration.ZERO));
return new GroupIntoBatches<K, InputT>(BatchingParams.createDefault())
.withByteSize(batchSizeBytes);
}

/**
Expand All @@ -196,16 +199,49 @@ public static <K, InputT> GroupIntoBatches<K, InputT> ofByteSize(long batchSizeB
*/
public static <K, InputT> GroupIntoBatches<K, InputT> ofByteSize(
long batchSizeBytes, SerializableFunction<InputT, Long> getElementByteSize) {
Preconditions.checkState(batchSizeBytes < Long.MAX_VALUE);
return new GroupIntoBatches<>(
BatchingParams.create(Long.MAX_VALUE, batchSizeBytes, getElementByteSize, Duration.ZERO));
return new GroupIntoBatches<K, InputT>(BatchingParams.createDefault())
.withByteSize(batchSizeBytes, getElementByteSize);
}

/** Returns user supplied parameters for batching. */
public BatchingParams<InputT> getBatchingParams() {
return params;
}

/** @see #ofSize(long) */
public GroupIntoBatches<K, InputT> withSize(long batchSize) {
Preconditions.checkState(batchSize < Long.MAX_VALUE);
return new GroupIntoBatches<>(
BatchingParams.create(
batchSize,
params.getBatchSizeBytes(),
params.getElementByteSize(),
params.getMaxBufferingDuration()));
}

/** @see #ofByteSize(long) */
public GroupIntoBatches<K, InputT> withByteSize(long batchSizeBytes) {
Preconditions.checkState(batchSizeBytes < Long.MAX_VALUE);
return new GroupIntoBatches<>(
BatchingParams.create(
params.getBatchSize(),
batchSizeBytes,
params.getElementByteSize(),
params.getMaxBufferingDuration()));
}

/** @see #ofByteSize(long, SerializableFunction) */
public GroupIntoBatches<K, InputT> withByteSize(
long batchSizeBytes, SerializableFunction<InputT, Long> getElementByteSize) {
Preconditions.checkState(batchSizeBytes < Long.MAX_VALUE);
return new GroupIntoBatches<>(
BatchingParams.create(
params.getBatchSize(),
batchSizeBytes,
getElementByteSize,
params.getMaxBufferingDuration()));
}

/**
* Sets a time limit (in processing time) on how long an incomplete batch of elements is allowed
* to be buffered. Once a batch is flushed to output, the timer is reset. The provided limit must
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ class BatchLoads<DestinationT, ElementT>
// If user triggering is supplied, we will trigger the file write after this many records are
// written.
static final int FILE_TRIGGERING_RECORD_COUNT = 500000;
// If user triggering is supplied, we will trigger the file write after this many bytes are
// written.
static final long FILE_TRIGGERING_BYTE_COUNT = 100 * (1L << 20); // 100MiB
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we already have a memory limit for writing of 20 parallel writers with 64mb buffers. Should we limit this triggering to be 64mbs as well so that it fits in one chunk?

CC: @reuvenlax Any suggestions here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lukecwik Having the same limit as a buffer actually makes sense to me, but can you direct me towards where might I find that limit? I can see it in the comments for DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE, but instead of hardcoding 64MB here as well, I would rather reference the original limit directly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lukecwik
On second thought, I think there is a problem with using this 64MB default. We only flush the batch inside GroupIntoBatches, once the storedBatchSizeBytes is greater than or equal to the limit. So if we make the limit 64MB, more than likely we will flush just a bit more than 64MB so we won't fit into the 64MB buffer.

So either the triggering byte count should be x% smaller than the 64MB default, or GroupIntoBatches has to be modified that it the current element would make it go over the byte size limit, then fire the batch without that element being added to the it first. The second seems like a better solution, but I assume doing the storedBatchSizeBytes.read() sooner would have performance impact.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, didn't see this comment but I agree that we should change GroupIntoBatches to ensure that if we add an element that would make it go over the limit we would flush a batch.

Pseudo-code would be like:

byteSize = measure(obj)
if (byteSize >= byteSizeLimit) {
  output obj as single element batch
  continue
}
if (byteSize + previousNumBytes > byteSizeLimit) {
  output all buffered elements as batch
} else {
  add obj to batch
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used a different algo, but IMO it stays closed to the original concept of the transform now.


// If using auto-sharding for unbounded data, we batch the records before triggering file write
// to avoid generating too many small files.
Expand Down Expand Up @@ -647,6 +650,7 @@ PCollection<WriteBundlesToFiles.Result<DestinationT>> writeDynamicallyShardedFil
return input
.apply(
GroupIntoBatches.<DestinationT, ElementT>ofSize(FILE_TRIGGERING_RECORD_COUNT)
.withByteSize(FILE_TRIGGERING_BYTE_COUNT)
.withMaxBufferingDuration(maxBufferingDuration)
.withShardedKey())
.setCoder(
Expand Down