Skip to content

Commit

Permalink
Compilation fix
Browse files Browse the repository at this point in the history
  • Loading branch information
nbali committed Aug 30, 2022
1 parent 2eacbd4 commit ff6b5a0
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ public class GroupIntoBatches<K, InputT>
@AutoValue
public abstract static class BatchingParams<InputT> implements Serializable {
public static <InputT> BatchingParams<InputT> createDefault() {
return new AutoValue_GroupIntoBatches_BatchingParams(Long.MAX_VALUE, Long.MAX_VALUE, null, Duration.ZERO);
return new AutoValue_GroupIntoBatches_BatchingParams(
Long.MAX_VALUE, Long.MAX_VALUE, null, Duration.ZERO);
}

public static <InputT> BatchingParams<InputT> create(
Expand Down Expand Up @@ -163,7 +164,7 @@ public SerializableFunction<InputT, Long> getWeigher(Coder<InputT> valueCoder) {
return weigher;
}
}

private final BatchingParams<InputT> params;
private static final UUID workerUuid = UUID.randomUUID();

Expand All @@ -174,8 +175,7 @@ private GroupIntoBatches(BatchingParams<InputT> params) {
/** Aim to create batches each with the specified element count. */
public static <K, InputT> GroupIntoBatches<K, InputT> ofSize(long batchSize) {
Preconditions.checkState(batchSize < Long.MAX_VALUE);
return new GroupIntoBatches<>(BatchingParams.createDefault())
.withSize(batchSize);
return new GroupIntoBatches<K, InputT>(BatchingParams.createDefault()).withSize(batchSize);
}

/**
Expand All @@ -189,7 +189,7 @@ public static <K, InputT> GroupIntoBatches<K, InputT> ofSize(long batchSize) {
* {@link #ofByteSize(long, SerializableFunction)} to specify code to calculate the byte size.
*/
public static <K, InputT> GroupIntoBatches<K, InputT> ofByteSize(long batchSizeBytes) {
return new GroupIntoBatches<>(BatchingParams.createDefault())
return new GroupIntoBatches<K, InputT>(BatchingParams.createDefault())
.withByteSize(batchSizeBytes);
}

Expand All @@ -199,7 +199,7 @@ public static <K, InputT> GroupIntoBatches<K, InputT> ofByteSize(long batchSizeB
*/
public static <K, InputT> GroupIntoBatches<K, InputT> ofByteSize(
long batchSizeBytes, SerializableFunction<InputT, Long> getElementByteSize) {
return new GroupIntoBatches<>(BatchingParams.createDefault())
return new GroupIntoBatches<K, InputT>(BatchingParams.createDefault())
.withByteSize(batchSizeBytes, getElementByteSize);
}

Expand All @@ -208,22 +208,18 @@ public BatchingParams<InputT> getBatchingParams() {
return params;
}

/**
* @see GroupIntoBatches#ofSize(long)
*/
/** @see #ofSize(long) */
public GroupIntoBatches<K, InputT> withSize(long batchSize) {
Preconditions.checkState(batchSize < Long.MAX_VALUE);
return new GroupIntoBatches<>(
BatchingParams.create(
params.getBatchSize(),
batchSizeBytes,
batchSize,
params.getBatchSizeBytes(),
params.getElementByteSize(),
params.getMaxBufferingDuration()));
}

/**
* @see GroupIntoBatches#ofByteSize(long)
*/
/** @see #ofByteSize(long) */
public GroupIntoBatches<K, InputT> withByteSize(long batchSizeBytes) {
Preconditions.checkState(batchSizeBytes < Long.MAX_VALUE);
return new GroupIntoBatches<>(
Expand All @@ -234,9 +230,7 @@ public GroupIntoBatches<K, InputT> withByteSize(long batchSizeBytes) {
params.getMaxBufferingDuration()));
}

/**
* @see GroupIntoBatches#ofByteSize(long, SerializableFunction)
*/
/** @see #ofByteSize(long, SerializableFunction) */
public GroupIntoBatches<K, InputT> withByteSize(
long batchSizeBytes, SerializableFunction<InputT, Long> getElementByteSize) {
Preconditions.checkState(batchSizeBytes < Long.MAX_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class BatchLoads<DestinationT, ElementT>
static final int FILE_TRIGGERING_RECORD_COUNT = 500000;
// If user triggering is supplied, we will trigger the file write after this many bytes are
// written.
static final int FILE_TRIGGERING_BYTE_COUNT = 100 * (1L << 20) // 100MiB
static final long FILE_TRIGGERING_BYTE_COUNT = 100 * (1L << 20); // 100MiB

// If using auto-sharding for unbounded data, we batch the records before triggering file write
// to avoid generating too many small files.
Expand Down

0 comments on commit ff6b5a0

Please sign in to comment.