Skip to content

Commit

Permalink
Merge pull request #1664 from alecgrieser/cursor-map-continuations
Browse files Browse the repository at this point in the history
Resolves #1663: Add .map variant to RecordCursor that supports changing a result's continuation
  • Loading branch information
ScottDugas committed May 19, 2022
2 parents 506648f + d04a5a5 commit f0bda52
Show file tree
Hide file tree
Showing 5 changed files with 333 additions and 72 deletions.
2 changes: 1 addition & 1 deletion docs/ReleaseNotes.md
Expand Up @@ -34,7 +34,7 @@ This release also updates downstream dependency versions. Most notably, the prot
* **Feature** Feature 2 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Feature 3 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Feature 4 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Feature 5 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Cursors now can map updates to their continuations via the `.mapContinuation` method [(Issue #1663)](https://github.com/FoundationDB/fdb-record-layer/issues/1663)
* **Breaking change** Change 1 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Breaking change** Change 2 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Breaking change** Change 3 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
Expand Down
Expand Up @@ -20,8 +20,8 @@

package com.apple.foundationdb.record;

import com.apple.foundationdb.annotation.SpotBugsSuppressWarnings;
import com.apple.foundationdb.annotation.API;
import com.apple.foundationdb.annotation.SpotBugsSuppressWarnings;
import com.apple.foundationdb.async.AsyncIterator;
import com.apple.foundationdb.async.AsyncUtil;
import com.apple.foundationdb.record.cursors.AsyncIteratorCursor;
Expand All @@ -31,8 +31,8 @@
import com.apple.foundationdb.record.cursors.FutureCursor;
import com.apple.foundationdb.record.cursors.IteratorCursor;
import com.apple.foundationdb.record.cursors.ListCursor;
import com.apple.foundationdb.record.cursors.MapCursor;
import com.apple.foundationdb.record.cursors.MapPipelinedCursor;
import com.apple.foundationdb.record.cursors.MapResultCursor;
import com.apple.foundationdb.record.cursors.OrElseCursor;
import com.apple.foundationdb.record.cursors.RowLimitedCursor;
import com.apple.foundationdb.record.cursors.SkipCursor;
Expand Down Expand Up @@ -327,12 +327,95 @@ default CompletableFuture<Optional<T>> first() {
/**
* Get a new cursor by applying the given function to the records in this cursor.
* @param func the function to apply
* @param <V> the type of the record elements
* @param <V> the type of the elements of the new cursor
* @return a new cursor that applies the given function
* @see #mapResult(Function) if the continuation also needs to be mapped
*/
@Nonnull
default <V> RecordCursor<V> map(@Nonnull Function<T, V> func) {
return new MapCursor<>(this, func);
return mapResult(result -> result.map(func));
}

/**
* Get a new cursor by applying the given function to the results returned by this cursor.
* This allows the caller to change both the values and the continuations returned by
* this cursor. There are alternative methods available if only one or the other needs to
* be adjusted.
*
* @param func the function to apply
* @param <V> the type of the elements of the new cursor
* @return a new cursor that applies the given function to each result of this cursor
* @see #map(Function) to change only the values returned by the cursor
* @see #mapContinuation(Function, ContinuationConvertor, byte[]) to change only the continuations returned by the cursor
*/
@API(API.Status.EXPERIMENTAL)
@Nonnull
default <V> RecordCursor<V> mapResult(@Nonnull Function<RecordCursorResult<T>, RecordCursorResult<V>> func) {
return new MapResultCursor<>(this, func);
}

/**
* Transformation to apply to a continuation. This interface requires that there by a two-way transformation,
* one ({@link #wrapContinuation(RecordCursorContinuation)}) that is applied to results from one cursor and
* another ({@link #unwrapContinuation(byte[])}) that is used to re-create the original cursor's continuation.
*
* @see #mapContinuation(Function, ContinuationConvertor, byte[])
*/
@API(API.Status.EXPERIMENTAL)
interface ContinuationConvertor {
/**
* Extract a continuation to resume a new cursor. This method is used by
* {@link #mapContinuation(Function, ContinuationConvertor, byte[])} when constructing
* the new cursor to un-transform a continuation from {@link #wrapContinuation(RecordCursorContinuation)}
* to be handed to a copy of the original inner cursor.
*
* @param continuation a continuation returned from {@link #wrapContinuation(RecordCursorContinuation)}
* or {@code null} if a cursor is being restarted from the beginning
* @return a continuation to use to resume execution of a cursor
*/
@Nullable
byte[] unwrapContinuation(@Nullable byte[] continuation);

/**
* Transform a continuation that came from an existing cursor. This method is used by
* {@link #mapContinuation(Function, ContinuationConvertor, byte[])} to transform the
* continuations from one cursor and attach them to a new cursor
*
* @param continuation the continuation from an existing cursor
* @return the new continuation to return
*/
RecordCursorContinuation wrapContinuation(@Nonnull RecordCursorContinuation continuation);
}

/**
* Get a new cursor by applying a transformation to the continuation of each result. This function creates
* a new cursor by unwrapping the passed in {@code continuation} parameter (using the
* {@link ContinuationConvertor#unwrapContinuation(byte[]) unwrapContinuation} method on the supplied
* {@code convertor}) and passing that to the given {@code cursorFunction}. It then will modify
* the continuation of each result from that new cursor by applying the {@code convertor}'s
* {@link ContinuationConvertor#wrapContinuation(RecordCursorContinuation) wrapContinuation}
* method to each result.
*
* <p>
* One common use case for this function is to allow the user to attach additional context to the
* to the continuation that might come from the cursor execution environment. So the final cursor's
* continuation might be a serialized data structure containing that context plus the original cursor's
* continuation. The {@link ContinuationConvertor#unwrapContinuation(byte[])} should then be able to
* extract the original cursor's continuation and use it to resume a copy of the original cursor.
* </p>
*
* @param cursorFunction a function that constructs a cursor based on the unwrapped continuation
* @param convertor a convertor that can transform continuations
* @param continuation the continuation from a previous run (or {@code null} to start from the beginning)
* @param <T> the type of elements in the cursor
* @return a cursor that has the values taken from the cursor produced by the {@code cursorFunction} but
* continuations that have been transformed by the {@code convertor}
*/
@API(API.Status.EXPERIMENTAL)
static <T> RecordCursor<T> mapContinuation(@Nonnull Function<byte[], RecordCursor<T>> cursorFunction, @Nonnull ContinuationConvertor convertor, @Nullable byte[] continuation) {
byte[] innerContinuation = convertor.unwrapContinuation(continuation);
return cursorFunction.apply(innerContinuation)
.mapResult(result -> result.withContinuation(convertor.wrapContinuation(result.getContinuation())));
}

/**
Expand All @@ -342,9 +425,9 @@ default <V> RecordCursor<V> map(@Nonnull Function<T, V> func) {
*/
@Nonnull
default RecordCursor<T> mapEffect(@Nonnull Consumer<T> consumer) {
return new MapCursor<>(this, record -> {
consumer.accept(record);
return record;
return map(rec -> {
consumer.accept(rec);
return rec;
});
}

Expand All @@ -355,9 +438,9 @@ default RecordCursor<T> mapEffect(@Nonnull Consumer<T> consumer) {
*/
@Nonnull
default RecordCursor<T> mapEffect(@Nonnull Runnable runnable) {
return new MapCursor<>(this, record -> {
return mapResult(result -> {
runnable.run();
return record;
return result;
});
}

Expand Down
Expand Up @@ -35,8 +35,10 @@
* A cursor that applies a function to the elements of another cursor.
* @param <T> the type of elements of the source cursor
* @param <V> the type of elements of the cursor after applying the function
* @deprecated Use {@link RecordCursor#map} or {@link MapResultCursor} instead
*/
@API(API.Status.MAINTAINED)
@API(API.Status.DEPRECATED)
@Deprecated
public class MapCursor<T, V> implements RecordCursor<V> {
@Nonnull
private final RecordCursor<T> inner;
Expand Down
@@ -0,0 +1,96 @@
/*
* MapResultCursor.java
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2015-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.apple.foundationdb.record.cursors;

import com.apple.foundationdb.annotation.API;
import com.apple.foundationdb.record.RecordCursor;
import com.apple.foundationdb.record.RecordCursorResult;
import com.apple.foundationdb.record.RecordCursorVisitor;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Function;

/**
* A cursor that applies a function to the elements of another cursor.
* @param <T> the type of elements of the source cursor
* @param <V> the type of elements of the cursor after applying the function
*/
@API(API.Status.MAINTAINED)
public class MapResultCursor<T, V> implements RecordCursor<V> {
@Nonnull
private final RecordCursor<T> inner;
@Nonnull
private final Function<RecordCursorResult<T>, RecordCursorResult<V>> func;

@Nullable
private RecordCursorResult<V> nextResult;

/**
* Internal constructor. Adopters of the library should call {@link RecordCursor#mapResult(Function) inner.mapResult()}
* or one of its variants instead.
*
* @param inner the inner cursor to apply the function on
* @param func the function to apply
* @see RecordCursor#map(Function)
* @see RecordCursor#mapContinuation(Function, ContinuationConvertor, byte[])
* @see RecordCursor#mapResult(Function)
*/
@API(API.Status.INTERNAL)
public MapResultCursor(@Nonnull RecordCursor<T> inner, @Nonnull Function<RecordCursorResult<T>, RecordCursorResult<V>> func) {
this.inner = inner;
this.func = func;
}

@Nonnull
@Override
public CompletableFuture<RecordCursorResult<V>> onNext() {
if (nextResult != null && !nextResult.hasNext()) {
return CompletableFuture.completedFuture(nextResult);
}
return inner.onNext().thenApply(func)
.thenApply(result -> {
nextResult = result;
return result;
});
}

@Override
public void close() {
inner.close();
}

@Nonnull
@Override
public Executor getExecutor() {
return inner.getExecutor();
}

@Override
public boolean accept(@Nonnull final RecordCursorVisitor visitor) {
if (visitor.visitEnter(this)) {
inner.accept(visitor);
}
return visitor.visitLeave(this);
}
}

0 comments on commit f0bda52

Please sign in to comment.