diff --git a/docs/ReleaseNotes.md b/docs/ReleaseNotes.md index e9cc0518b1..f9db896276 100644 --- a/docs/ReleaseNotes.md +++ b/docs/ReleaseNotes.md @@ -34,7 +34,7 @@ This release also updates downstream dependency versions. Most notably, the prot * **Feature** Feature 2 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN) * **Feature** Feature 3 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN) * **Feature** Feature 4 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN) -* **Feature** Feature 5 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN) +* **Feature** Cursors now can map updates to their continuations via the `.mapContinuation` method [(Issue #1663)](https://github.com/FoundationDB/fdb-record-layer/issues/1663) * **Breaking change** Change 1 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN) * **Breaking change** Change 2 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN) * **Breaking change** Change 3 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN) diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/RecordCursor.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/RecordCursor.java index bbe142730d..a541593641 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/RecordCursor.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/RecordCursor.java @@ -20,8 +20,8 @@ package com.apple.foundationdb.record; -import com.apple.foundationdb.annotation.SpotBugsSuppressWarnings; import com.apple.foundationdb.annotation.API; +import com.apple.foundationdb.annotation.SpotBugsSuppressWarnings; import com.apple.foundationdb.async.AsyncIterator; import com.apple.foundationdb.async.AsyncUtil; import com.apple.foundationdb.record.cursors.AsyncIteratorCursor; @@ -31,8 +31,8 @@ import com.apple.foundationdb.record.cursors.FutureCursor; import com.apple.foundationdb.record.cursors.IteratorCursor; import com.apple.foundationdb.record.cursors.ListCursor; -import com.apple.foundationdb.record.cursors.MapCursor; import com.apple.foundationdb.record.cursors.MapPipelinedCursor; +import com.apple.foundationdb.record.cursors.MapResultCursor; import com.apple.foundationdb.record.cursors.OrElseCursor; import com.apple.foundationdb.record.cursors.RowLimitedCursor; import com.apple.foundationdb.record.cursors.SkipCursor; @@ -327,12 +327,95 @@ default CompletableFuture> first() { /** * Get a new cursor by applying the given function to the records in this cursor. * @param func the function to apply - * @param the type of the record elements + * @param the type of the elements of the new cursor * @return a new cursor that applies the given function + * @see #mapResult(Function) if the continuation also needs to be mapped */ @Nonnull default RecordCursor map(@Nonnull Function func) { - return new MapCursor<>(this, func); + return mapResult(result -> result.map(func)); + } + + /** + * Get a new cursor by applying the given function to the results returned by this cursor. + * This allows the caller to change both the values and the continuations returned by + * this cursor. There are alternative methods available if only one or the other needs to + * be adjusted. + * + * @param func the function to apply + * @param the type of the elements of the new cursor + * @return a new cursor that applies the given function to each result of this cursor + * @see #map(Function) to change only the values returned by the cursor + * @see #mapContinuation(Function, ContinuationConvertor, byte[]) to change only the continuations returned by the cursor + */ + @API(API.Status.EXPERIMENTAL) + @Nonnull + default RecordCursor mapResult(@Nonnull Function, RecordCursorResult> func) { + return new MapResultCursor<>(this, func); + } + + /** + * Transformation to apply to a continuation. This interface requires that there by a two-way transformation, + * one ({@link #wrapContinuation(RecordCursorContinuation)}) that is applied to results from one cursor and + * another ({@link #unwrapContinuation(byte[])}) that is used to re-create the original cursor's continuation. + * + * @see #mapContinuation(Function, ContinuationConvertor, byte[]) + */ + @API(API.Status.EXPERIMENTAL) + interface ContinuationConvertor { + /** + * Extract a continuation to resume a new cursor. This method is used by + * {@link #mapContinuation(Function, ContinuationConvertor, byte[])} when constructing + * the new cursor to un-transform a continuation from {@link #wrapContinuation(RecordCursorContinuation)} + * to be handed to a copy of the original inner cursor. + * + * @param continuation a continuation returned from {@link #wrapContinuation(RecordCursorContinuation)} + * or {@code null} if a cursor is being restarted from the beginning + * @return a continuation to use to resume execution of a cursor + */ + @Nullable + byte[] unwrapContinuation(@Nullable byte[] continuation); + + /** + * Transform a continuation that came from an existing cursor. This method is used by + * {@link #mapContinuation(Function, ContinuationConvertor, byte[])} to transform the + * continuations from one cursor and attach them to a new cursor + * + * @param continuation the continuation from an existing cursor + * @return the new continuation to return + */ + RecordCursorContinuation wrapContinuation(@Nonnull RecordCursorContinuation continuation); + } + + /** + * Get a new cursor by applying a transformation to the continuation of each result. This function creates + * a new cursor by unwrapping the passed in {@code continuation} parameter (using the + * {@link ContinuationConvertor#unwrapContinuation(byte[]) unwrapContinuation} method on the supplied + * {@code convertor}) and passing that to the given {@code cursorFunction}. It then will modify + * the continuation of each result from that new cursor by applying the {@code convertor}'s + * {@link ContinuationConvertor#wrapContinuation(RecordCursorContinuation) wrapContinuation} + * method to each result. + * + *

+ * One common use case for this function is to allow the user to attach additional context to the + * to the continuation that might come from the cursor execution environment. So the final cursor's + * continuation might be a serialized data structure containing that context plus the original cursor's + * continuation. The {@link ContinuationConvertor#unwrapContinuation(byte[])} should then be able to + * extract the original cursor's continuation and use it to resume a copy of the original cursor. + *

+ * + * @param cursorFunction a function that constructs a cursor based on the unwrapped continuation + * @param convertor a convertor that can transform continuations + * @param continuation the continuation from a previous run (or {@code null} to start from the beginning) + * @param the type of elements in the cursor + * @return a cursor that has the values taken from the cursor produced by the {@code cursorFunction} but + * continuations that have been transformed by the {@code convertor} + */ + @API(API.Status.EXPERIMENTAL) + static RecordCursor mapContinuation(@Nonnull Function> cursorFunction, @Nonnull ContinuationConvertor convertor, @Nullable byte[] continuation) { + byte[] innerContinuation = convertor.unwrapContinuation(continuation); + return cursorFunction.apply(innerContinuation) + .mapResult(result -> result.withContinuation(convertor.wrapContinuation(result.getContinuation()))); } /** @@ -342,9 +425,9 @@ default RecordCursor map(@Nonnull Function func) { */ @Nonnull default RecordCursor mapEffect(@Nonnull Consumer consumer) { - return new MapCursor<>(this, record -> { - consumer.accept(record); - return record; + return map(rec -> { + consumer.accept(rec); + return rec; }); } @@ -355,9 +438,9 @@ default RecordCursor mapEffect(@Nonnull Consumer consumer) { */ @Nonnull default RecordCursor mapEffect(@Nonnull Runnable runnable) { - return new MapCursor<>(this, record -> { + return mapResult(result -> { runnable.run(); - return record; + return result; }); } diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/MapCursor.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/MapCursor.java index 45989227c5..be38ffb482 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/MapCursor.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/MapCursor.java @@ -35,8 +35,10 @@ * A cursor that applies a function to the elements of another cursor. * @param the type of elements of the source cursor * @param the type of elements of the cursor after applying the function + * @deprecated Use {@link RecordCursor#map} or {@link MapResultCursor} instead */ -@API(API.Status.MAINTAINED) +@API(API.Status.DEPRECATED) +@Deprecated public class MapCursor implements RecordCursor { @Nonnull private final RecordCursor inner; diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/MapResultCursor.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/MapResultCursor.java new file mode 100644 index 0000000000..c5e956afa6 --- /dev/null +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/MapResultCursor.java @@ -0,0 +1,96 @@ +/* + * MapResultCursor.java + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2015-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apple.foundationdb.record.cursors; + +import com.apple.foundationdb.annotation.API; +import com.apple.foundationdb.record.RecordCursor; +import com.apple.foundationdb.record.RecordCursorResult; +import com.apple.foundationdb.record.RecordCursorVisitor; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.function.Function; + +/** + * A cursor that applies a function to the elements of another cursor. + * @param the type of elements of the source cursor + * @param the type of elements of the cursor after applying the function + */ +@API(API.Status.MAINTAINED) +public class MapResultCursor implements RecordCursor { + @Nonnull + private final RecordCursor inner; + @Nonnull + private final Function, RecordCursorResult> func; + + @Nullable + private RecordCursorResult nextResult; + + /** + * Internal constructor. Adopters of the library should call {@link RecordCursor#mapResult(Function) inner.mapResult()} + * or one of its variants instead. + * + * @param inner the inner cursor to apply the function on + * @param func the function to apply + * @see RecordCursor#map(Function) + * @see RecordCursor#mapContinuation(Function, ContinuationConvertor, byte[]) + * @see RecordCursor#mapResult(Function) + */ + @API(API.Status.INTERNAL) + public MapResultCursor(@Nonnull RecordCursor inner, @Nonnull Function, RecordCursorResult> func) { + this.inner = inner; + this.func = func; + } + + @Nonnull + @Override + public CompletableFuture> onNext() { + if (nextResult != null && !nextResult.hasNext()) { + return CompletableFuture.completedFuture(nextResult); + } + return inner.onNext().thenApply(func) + .thenApply(result -> { + nextResult = result; + return result; + }); + } + + @Override + public void close() { + inner.close(); + } + + @Nonnull + @Override + public Executor getExecutor() { + return inner.getExecutor(); + } + + @Override + public boolean accept(@Nonnull final RecordCursorVisitor visitor) { + if (visitor.visitEnter(this)) { + inner.accept(visitor); + } + return visitor.visitLeave(this); + } +} diff --git a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/RecordCursorTest.java b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/RecordCursorTest.java index 031ae5c1e2..f548adb3b9 100644 --- a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/RecordCursorTest.java +++ b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/RecordCursorTest.java @@ -25,7 +25,7 @@ import com.apple.foundationdb.record.cursors.FilterCursor; import com.apple.foundationdb.record.cursors.FirableCursor; import com.apple.foundationdb.record.cursors.LazyCursor; -import com.apple.foundationdb.record.cursors.MapCursor; +import com.apple.foundationdb.record.cursors.MapResultCursor; import com.apple.foundationdb.record.cursors.RowLimitedCursor; import com.apple.foundationdb.record.cursors.SkipCursor; import com.apple.foundationdb.record.provider.foundationdb.FDBStoreTimer; @@ -33,6 +33,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; import org.apache.commons.lang3.tuple.Pair; import org.hamcrest.Matchers; import org.junit.jupiter.api.BeforeEach; @@ -88,7 +89,7 @@ public class RecordCursorTest { Timer timer; @BeforeEach - public void setup() throws Exception { + void setup() { timer = new Timer("RecordCursorTest"); } @@ -143,7 +144,7 @@ public boolean accept(@Nonnull RecordCursorVisitor visitor) { } @Test - public void mapPipelinedReuseTest() throws Exception { + void mapPipelinedReuseTest() { AsyncCountdown cursor = new AsyncCountdown(100); RecordCursor map = cursor.mapPipelined(i -> delayedFuture(i, 10), 10); assertEquals(IntStream.range(0, 100).mapToObj(i -> 100 - i).collect(Collectors.toList()), map.asList().join()); @@ -151,7 +152,7 @@ public void mapPipelinedReuseTest() throws Exception { } @Test - public void forEachAsyncTest() { + void forEachAsyncTest() { RecordCursor cursor = RecordCursor.fromList(Arrays.asList(1, 2, 3, 4, 5, 6, 7)); long start = System.currentTimeMillis(); cursor.forEachAsync(i -> MoreAsyncUtil.delayedFuture(10L, TimeUnit.MILLISECONDS), 2).join(); @@ -169,7 +170,7 @@ public void forEachAsyncTest() { }), 1).join(); end = System.currentTimeMillis(); assertEquals(integer.get(), 36); - assertThat(end - start, Matchers.greaterThanOrEqualTo(40L)); + assertThat(end - start, Matchers.greaterThanOrEqualTo(40L)); // It should be fine if they all complete immediately. integer.set(0); @@ -184,7 +185,7 @@ public void forEachAsyncTest() { } @Test - public void orElseTest() throws Exception { + void orElseTest() { List ints = Arrays.asList(1, 2, 3); BiFunction> elseZero = (executor, cont) -> RecordCursor.fromFuture(executor, CompletableFuture.completedFuture(0), cont); assertEquals(ints, RecordCursor.fromList(ints).asList().join()); @@ -196,7 +197,7 @@ public void orElseTest() throws Exception { //@Test @Slow // Will get either NPE or NoSuchElementException after a while. - public void orElseTimingErrorTest() throws Exception { + void orElseTimingErrorTest() { BiFunction> elseZero = (executor, cont) -> RecordCursor.fromFuture(executor, CompletableFuture.completedFuture(0), cont); for (int i = 0; i < 100000; i++) { RecordCursorIterator cursor = RecordCursor.orElse(cont -> RecordCursor.fromList(Collections.emptyList(), cont), elseZero, null).asIterator(); @@ -212,7 +213,7 @@ public void orElseTimingErrorTest() throws Exception { } @Test - public void asListWithContinuationTest() throws Exception { + void asListWithContinuationTest() throws Exception { final List ints = IntStream.range(0, 50).boxed().collect(Collectors.toList()); final AtomicReference> finalResult = new AtomicReference<>(); @@ -236,52 +237,52 @@ public void asListWithContinuationTest() throws Exception { } @Test - public void limitTest() { - List ints = Arrays.asList(1,2,3,4); + void limitTest() { + List ints = Arrays.asList(1, 2, 3, 4); // Make sure that if the limit is less than the size, we get the thing suppose. RecordCursor cursor = RecordCursor.fromList(ints).limitRowsTo(3); assertTrue(cursor instanceof RowLimitedCursor, "Setting limit should create a LimitCursor"); List newInts = cursor.asList().join(); - assertEquals(Arrays.asList(1,2,3), newInts); + assertEquals(Arrays.asList(1, 2, 3), newInts); // Make sure that if the limit is greater than the size, we get everything. cursor = RecordCursor.fromList(ints).limitRowsTo(5); assertTrue(cursor instanceof RowLimitedCursor, "Setting limit should create a LimitCursor"); newInts = cursor.asList().join(); - assertEquals(Arrays.asList(1,2,3,4), newInts); + assertEquals(Arrays.asList(1, 2, 3, 4), newInts); cursor = RecordCursor.fromList(ints).limitRowsTo(Integer.MAX_VALUE); assertFalse(cursor instanceof RowLimitedCursor, "Setting max limit shouldn't actually create a LimitCursor"); } @Test - public void skipTest() { - List ints = Arrays.asList(1,2,3,4); + void skipTest() { + List ints = Arrays.asList(1, 2, 3, 4); RecordCursor cursor = RecordCursor.fromList(ints).skip(2); assertTrue(cursor instanceof SkipCursor, "Setting skip should create a SkipCursor"); List newInts = cursor.asList().join(); - assertEquals(Arrays.asList(3,4), newInts); + assertEquals(Arrays.asList(3, 4), newInts); cursor = RecordCursor.fromList(ints).skip(0); assertFalse(cursor instanceof SkipCursor, "Setting skip 0 shouldn't actually create a SkipCursor"); } @Test - public void filterTest() { - List ints = Arrays.asList(1,2,3,4,5,6,7); + void filterTest() { + List ints = Arrays.asList(1, 2, 3, 4, 5, 6, 7); RecordCursor cursor = RecordCursor.fromList(ints).filter(i -> i % 2 == 0); assertTrue(cursor instanceof FilterCursor, "Creating a filter should create a filter cursor"); List newInts = cursor.asList().join(); - assertEquals(Arrays.asList(2,4,6), newInts); + assertEquals(Arrays.asList(2, 4, 6), newInts); cursor = RecordCursor.fromList(ints).filterAsync(i -> CompletableFuture.completedFuture(i % 2 != 0), 1); - assertTrue(cursor instanceof MapCursor, "Creating an async filter should create a map cursor"); + assertTrue(cursor instanceof MapResultCursor, "Creating an async filter should create a map cursor"); newInts = cursor.asList().join(); - assertEquals(Arrays.asList(1,3,5,7), newInts); + assertEquals(Arrays.asList(1, 3, 5, 7), newInts); - ints = Arrays.asList(1,2,3, null, 4, 5, 6, 7); + ints = Arrays.asList(1, 2, 3, null, 4, 5, 6, 7); cursor = RecordCursor.fromList(ints).filter(i -> { if (i == null) { @@ -292,7 +293,7 @@ public void filterTest() { }); assertTrue(cursor instanceof FilterCursor, "Creating a filter should create a filter cursor"); newInts = cursor.asList().join(); - assertEquals(Arrays.asList(1,3,5,7), newInts); + assertEquals(Arrays.asList(1, 3, 5, 7), newInts); cursor = RecordCursor.fromList(ints).filterAsync(i -> { if (i == null) { @@ -302,13 +303,89 @@ public void filterTest() { } }, 1); newInts = cursor.asList().join(); - assertTrue(cursor instanceof MapCursor, "Creating an async filter should create a map cursor"); - assertEquals(Arrays.asList(2,4,6), newInts); + assertTrue(cursor instanceof MapResultCursor, "Creating an async filter should create a map cursor"); + assertEquals(Arrays.asList(2, 4, 6), newInts); + } + + private static class PrefixAddingContinuationConvertor implements RecordCursor.ContinuationConvertor { + private final ByteString prefix; + + private PrefixAddingContinuationConvertor(ByteString prefix) { + this.prefix = prefix; + } + + @Nullable + @Override + public byte[] unwrapContinuation(@Nullable final byte[] continuation) { + if (continuation == null) { + return null; + } + ByteString wrappedBytes = ByteString.copyFrom(continuation); + assertTrue(wrappedBytes.startsWith(prefix), "continuation should begin with expected prefix"); + return wrappedBytes.substring(prefix.size()).toByteArray(); + } + + @Override + public RecordCursorContinuation wrapContinuation(@Nonnull final RecordCursorContinuation continuation) { + if (continuation.isEnd()) { + return RecordCursorEndContinuation.END; + } + return new RecordCursorContinuation() { + @Nonnull + @Override + public ByteString toByteString() { + return prefix.concat(continuation.toByteString()); + } + + @Nullable + @Override + public byte[] toBytes() { + return toByteString().toByteArray(); + } + + @Override + public boolean isEnd() { + return false; + } + }; + } } @Test - public void firstTest() throws Exception { - List ints = Arrays.asList(1,2,3,4); + void mapContinuationsTest() { + final List ints = Arrays.asList(1, 2, 3, 4, 5, 6); + final ByteString prefix = ByteString.copyFromUtf8("prefix+"); + final PrefixAddingContinuationConvertor continuationConvertor = new PrefixAddingContinuationConvertor(prefix); + + RecordCursor cursor = RecordCursor.mapContinuation(continuation -> RecordCursor.fromList(ints, continuation), continuationConvertor, null); + + List soFar = new ArrayList<>(); + RecordCursorResult result; + do { + result = cursor.getNext(); + if (result.getContinuation().isEnd()) { + assertEquals(ByteString.EMPTY, result.getContinuation().toByteString()); + assertNull(result.getContinuation().toBytes()); + } else { + soFar.add(result.get()); + + // Modified continuation should begin with the prefix + assertTrue(result.getContinuation().toByteString().startsWith(prefix)); + + // Stripping away the prefix and resuming the cursor should produce the rest of the list + byte[] continuation = result.getContinuation().toBytes(); + assertNotNull(continuation); + RecordCursor tailCursor = RecordCursor.mapContinuation(innerContinuation -> RecordCursor.fromList(ints, innerContinuation), continuationConvertor, continuation); + final List resultList = new ArrayList<>(soFar); + tailCursor.forEach(resultList::add).join(); + assertEquals(ints, resultList); + } + } while (result.hasNext()); + } + + @Test + void firstTest() throws Exception { + List ints = Arrays.asList(1, 2, 3, 4); RecordCursor cursor = RecordCursor.fromList(ints); assertEquals(Optional.of(1), cursor.first().get()); @@ -318,8 +395,8 @@ public void firstTest() throws Exception { } @Test - public void pipelineContinuationTest() throws Exception { - List ints = Lists.newArrayList(1,2,3,4,5); + void pipelineContinuationTest() throws Exception { + List ints = Lists.newArrayList(1, 2, 3, 4, 5); List expected = ints.stream().flatMap(o -> ints.stream().map(i -> o * 100 + i)).collect(Collectors.toList()); Function> outerFunc = cont -> RecordCursor.fromList(ints, cont); @@ -422,8 +499,8 @@ private int iterateGrid(@Nonnull Function ints = IntStream.range(0, 10).boxed().collect(Collectors.toList()); @@ -572,7 +649,7 @@ public void pipelineWithOuterLimitsWithSomeDelay(boolean outOfBand) { } @Test - public void flatMapPipelineErrorPropagation() throws ExecutionException, InterruptedException { + void flatMapPipelineErrorPropagation() throws ExecutionException, InterruptedException { FirableCursor firableCursor1 = new FirableCursor<>(RecordCursor.fromList(Collections.singletonList("hello"))); FirableCursor firableCursor2 = new FirableCursor<>(new BrokenCursor()); List> firableCursors = Arrays.asList(firableCursor1, firableCursor2); @@ -610,7 +687,7 @@ public void flatMapPipelineErrorPropagation() throws ExecutionException, Interru * @throws InterruptedException from futures joined in the test */ @Test - public void mapPipelinedErrorAtConcurrentCompletion() throws ExecutionException, InterruptedException { + void mapPipelinedErrorAtConcurrentCompletion() throws ExecutionException, InterruptedException { final RuntimeException runtimeEx = new RuntimeException("some random exception"); List> futures = Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>(), new CompletableFuture<>()); @@ -649,7 +726,7 @@ public void mapPipelinedErrorAtConcurrentCompletion() throws ExecutionException, * @throws InterruptedException from futures joined in the test */ @Test - public void mapPipelinedErrorPropagationInPipeline() throws ExecutionException, InterruptedException { + void mapPipelinedErrorPropagationInPipeline() throws ExecutionException, InterruptedException { final RuntimeException runtimeEx = new RuntimeException("some random exception"); List> futures = Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>(), new CompletableFuture<>()); RecordCursor cursor = RecordCursor.fromList(Arrays.asList(0, 1, 2)).mapPipelined(futures::get, 2); @@ -681,7 +758,7 @@ public void mapPipelinedErrorPropagationInPipeline() throws ExecutionException, * @throws InterruptedException from futures joined in the test */ @Test - public void mapPipelinedContinuationWithTimeLimit() throws ExecutionException, InterruptedException { + void mapPipelinedContinuationWithTimeLimit() throws ExecutionException, InterruptedException { List> futures = Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>(), new CompletableFuture<>(), new CompletableFuture<>()); RecordCursor cursor = new FakeOutOfBandCursor<>(RecordCursor.fromList(Arrays.asList(0, 1, 2, 3)), 3).mapPipelined(futures::get, 3); futures.get(2).complete(1415); // complete a future that is not immediately returned @@ -715,7 +792,7 @@ public void mapPipelinedContinuationWithTimeLimit() throws ExecutionException, I * @throws InterruptedException from futures joined in the test */ @Test - public void mapPipelinedContinuationWithTimeLimitWithMoreToReturn() throws ExecutionException, InterruptedException { + void mapPipelinedContinuationWithTimeLimitWithMoreToReturn() throws ExecutionException, InterruptedException { List> futures = Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>(), new CompletableFuture<>(), new CompletableFuture<>(), new CompletableFuture<>()); RecordCursor cursor = new FakeOutOfBandCursor<>(RecordCursor.fromList(Arrays.asList(0, 1, 2, 3, 4)), 4).mapPipelined(futures::get, 4); futures.get(1).complete(1415); @@ -757,7 +834,7 @@ public void mapPipelinedContinuationWithTimeLimitWithMoreToReturn() throws Execu * @throws InterruptedException from futures joined in the test */ @Test - public void mapPipelinedContinuationWithTimeLimitBeforeFirstEntry() throws ExecutionException, InterruptedException { + void mapPipelinedContinuationWithTimeLimitBeforeFirstEntry() throws ExecutionException, InterruptedException { List> futures = Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>(), new CompletableFuture<>()); futures.get(1).complete(1415); RecordCursor cursor = new FakeOutOfBandCursor<>(RecordCursor.fromList(Arrays.asList(0, 1, 2)), 2).mapPipelined(futures::get, 3); @@ -788,26 +865,29 @@ public void mapPipelinedContinuationWithTimeLimitBeforeFirstEntry() throws Execu } @Test - public void lazyCursorTest() { + void lazyCursorTest() { RecordCursorIterator cursor = new LazyCursor<>( CompletableFuture.completedFuture(RecordCursor.fromList(Lists.newArrayList(1, 2, 3, 4, 5)))).asIterator(); int i = 1; while (i <= 5 && cursor.hasNext()) { - assertEquals(i, (int) cursor.next()); + assertEquals(i, (int)cursor.next()); ++i; } assertEquals(6, i); } @Test - public void lazyCursorExceptionTest() { + void lazyCursorExceptionTest() { LazyCursor cursor = new LazyCursor<>( - CompletableFuture.supplyAsync( () -> { throw new IllegalArgumentException("Uh oh"); })); + CompletableFuture.supplyAsync(() -> { + throw new IllegalArgumentException("Uh oh"); + })); assertThrows(RecordCoreException.class, () -> cursor.getNext()); } /** * A cursor that simulates out of band stopping by actually counting in band records returned. + * * @param type of elements of the cursor */ public static class FakeOutOfBandCursor extends RowLimitedCursor { @@ -836,7 +916,7 @@ public CompletableFuture> onNext() { } @Test - public void testFakeTimeLimitReasons() throws Exception { + void testFakeTimeLimitReasons() { final List list = Arrays.asList(1, 2, 3, 4, 5); RecordCursor cursor = new FakeOutOfBandCursor<>(RecordCursor.fromList(list), 3); assertEquals(Arrays.asList(1, 2, 3), cursor.asList().join()); @@ -847,7 +927,7 @@ public void testFakeTimeLimitReasons() throws Exception { } @Test - public void testMapAsyncTimeLimitReasons() throws Exception { + void testMapAsyncTimeLimitReasons() { // If stopped for a timeout, we additionally don't wait for incomplete items in the pipeline. final List list = Arrays.asList(1, 2, 3, 4, 5); final Function> map = i -> i != 2 ? CompletableFuture.completedFuture(i) : new CompletableFuture<>(); @@ -868,7 +948,7 @@ public void testMapAsyncTimeLimitReasons() throws Exception { } @Test - public void testMapAsyncScanLimitReasons() throws Exception { + void testMapAsyncScanLimitReasons() { // If stopped for a scan limit, no special handling. final List list = Arrays.asList(1, 2, 3, 4, 5); final Function> map = CompletableFuture::completedFuture; @@ -885,7 +965,7 @@ public void testMapAsyncScanLimitReasons() throws Exception { } @Test - public void testFilteredMapAsyncReasons1() throws Exception { + void testFilteredMapAsyncReasons1() { // May need continuation before the first record in the pipeline. final List list = Arrays.asList(1, 2, 3, 4, 5); final Function> map = CompletableFuture::completedFuture; @@ -924,7 +1004,7 @@ public void testFilteredMapAsyncReasons1() throws Exception { } @Test - public void testFilteredMapAsyncReasons2() throws Exception { + void testFilteredMapAsyncReasons2() { // May need continuation before the first record in the pipeline. final List list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); final ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(1); @@ -953,7 +1033,7 @@ public void testFilteredMapAsyncReasons2() throws Exception { } @Test - public void testFilteredMapAsyncReasons3() throws Exception { + void testFilteredMapAsyncReasons3() { final List list = Arrays.asList(1, 2, 3, 4, 5); final Function> map = CompletableFuture::completedFuture; final Function filter = i -> i % 2 == 0; @@ -970,7 +1050,7 @@ public void testFilteredMapAsyncReasons3() throws Exception { } @Test - public void testFlatMapReasons() throws Exception { + void testFlatMapReasons() { // If the inside stops prematurely, the whole pipeline shuts down. final List list = Arrays.asList(1, 2, 3, 4, 5); final Function> outer = continuation -> RecordCursor.fromList(list, continuation); @@ -1004,7 +1084,7 @@ public void testFlatMapReasons() throws Exception { } @Test - public void testOrElseReasons() throws Exception { + void testOrElseReasons() { // Don't take else path if inside stops prematurely. final List list = Arrays.asList(1, 2, 3, 4, 5); final BiFunction> orElse = (x, cont) -> RecordCursor.fromList(Collections.singletonList(0), cont); @@ -1022,7 +1102,7 @@ public void testOrElseReasons() throws Exception { } @Test - public void orElseWithEventuallyNonEmptyInner() { + void orElseWithEventuallyNonEmptyInner() { final List list = Arrays.asList(1, 2, 3, 4, 5); RecordCursor cursor = getOrElseOfFilteredFakeOutOfBandCursor(list, 3, 4, null); assertEquals(Collections.emptyList(), cursor.asList().join()); @@ -1037,7 +1117,7 @@ public void orElseWithEventuallyNonEmptyInner() { } @Test - public void orElseContinueWithInnerBranchAfterDecision() { + void orElseContinueWithInnerBranchAfterDecision() { final List longList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18); RecordCursor cursor = getOrElseOfFilteredFakeOutOfBandCursor(longList, 3, 10, null); @@ -1068,7 +1148,7 @@ public void orElseContinueWithInnerBranchAfterDecision() { } @Test - public void orElseContinueWithElseBranchAfterDecision() { + void orElseContinueWithElseBranchAfterDecision() { final List innerList = Arrays.asList(1, 2, 3, 4, 5); final List elseList = Arrays.asList(-1, -2, -3, -4, -5); final BiFunction> orElse = (x, cont) -> @@ -1098,7 +1178,7 @@ public void orElseContinueWithElseBranchAfterDecision() { @Nonnull private static RecordCursor getOrElseOfFilteredFakeOutOfBandCursor(@Nonnull List list, int limit, int threshold, - @Nullable byte[] continuation) { + @Nullable byte[] continuation) { final BiFunction> orElse = (x, cont) -> RecordCursor.fromList(Collections.singletonList(0), cont); return RecordCursor.orElse(cont -> new FakeOutOfBandCursor<>(RecordCursor.fromList(list, cont), limit) .filter(i -> i > threshold), orElse, continuation); @@ -1131,7 +1211,7 @@ public boolean accept(@Nonnull RecordCursorVisitor visitor) { } @Test - public void hasNextErrorStack() throws Exception { + void hasNextErrorStack() { final Iterator erring = new BrokenCursor().asIterator(); try { Iterators.getLast(erring, null); @@ -1146,7 +1226,7 @@ public void hasNextErrorStack() throws Exception { } @Test - public void reduceTest() throws Exception { + void reduceTest() throws Exception { RecordCursor cursor = RecordCursor.fromList(Arrays.asList(1, 2, 3, 4, 5, 6, 7)); CompletableFuture sum = cursor.reduce(0, Integer::sum); assertEquals(28, sum.get()); @@ -1161,14 +1241,14 @@ public void reduceTest() throws Exception { } @Test - public void asStreamTest() throws Exception { + void asStreamTest() { assertEquals(7, RecordCursor.fromList(Arrays.asList(1, 2, 3, 4, 5, 6, 7)).asStream().count()); assertEquals(2, RecordCursor.fromList(Arrays.asList(2, 3)).asStream().findFirst().get()); assertEquals(3, RecordCursor.fromList(Arrays.asList(2, 3)).map(x -> x + 1).asStream().findFirst().get()); assertThrows(RuntimeException.class, () -> RecordCursor.fromList(Arrays.asList(2, 3)).asStream( - () -> { - throw new RuntimeException("on close"); - }).close()); + () -> { + throw new RuntimeException("on close"); + }).close()); }