Skip to content

Commit

Permalink
Resolves #1560: Index prefetch (#1561)
Browse files Browse the repository at this point in the history
* Initial commit of POC for index prefetch

* version

* Disable readyourqrites

* Handle embedded keys in index entries

* Small fixes. DisableReadYourWrites for tests, add plan constructor

* Add props to control which API to use. Custom unpacking. range scan in FDB.

* new test

* Finalize implementation:
1. Create IndexedRawRecord to hold indexed unsplit records
2. Unsplit the records in the index maintainer
3. Test reads
4. Deserialize records

* Handle new API

* Code cleanup - move unsplit records to record store
Support fallback execution

* Changes after rebase, style.

* Reimplement integration with planner after rebase, added tests with fallback and comparator plan

* Support old version format, test.

* Changes after getting latest FDB API

* Latest changes from main branch.
Add split record tests, fix performance tests, add prefetch tests to performance framework, cleanup code.

* Code style and findbugs changes.

* Handle old format

* Fixes while running all tests:
- Don't use prefetch when no common primary key
- Don't use prefetch for non-VALUE indexes
- Fix counter event

* Fallback mode for cases where the failure happens in the Async Future's onNext() call.
- Add FallbackCursor
- Use the cursor in fallback mode
- Tests for read-your-writes mode that exhibit this kind of failure

* Added handling of IndexOrphanBehavior: In case there are no record splits for an index entry, return empty entry, skip record or abort. Added tests.

* Changes after rebase from main and checkstyle

* Add better names to test

* Disable failing test

* Add mixed continuations test

* Add comments, javadoc and @nonnull annotations.
Review comments; Fix type issues with generics: Add serialized and changed <Message> to <M>, removed redundant queriedRecordOfMessage method.
Move tests to foundationdb package.

* Add version protection, fix test names.
Adjust two tests that would normally throw in prefetch mode to not throw exception since 630 API VERSION will cause the tests to run in scan mdoe.

* Fix sonarcube issue: CommonPromaryKey is @nonnull

* Fix sonarcube issues

* Dummy commit to launch build

* More sonar attempts

* Update fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/FallbackCursor.java

Co-authored-by: Alec Grieser <alloc@apple.com>

* PR Comments phase 1: Non-test code:
- Renamed "Index Prefetch" to "Remote Fetch" to avoid confusion with the other "prefetch". This includes parameters, method, enums and comments.
- Fixed a bug in the FallbackCursor that did not initialize nextResult. Removed nextResult and replaced with usage of nextResultFuture.
- FallbackCursor always use inner's executor.
- Simplified logic for FallBackCursor processing
- Replaced usage of TupleRange with IndexScanBounds
- Moved HopInfo (Mapper) creation to IndexMaintainer.
- Check for API_VERSION and scan type in the store scanIndexRemoteFetchInternal method to get protection even when not using queries.
- Counting index entries in the byte count.
- Added new timer entry to count prefetch.
- Changed log from error to warning.
- Check API version on executePlan to allow for version to change during runtime.
- Remove PlanWithIndex.getComparisons() as it is no longer needed.

* PR Comments: Remove `getCommonPrimaryKey` from the interface

* PR Comments:
- Refactor tests to extend a common baseclass
- Wrapped FallbackCursor exception with another exception with a descriptive message
- Removed index and indexSubspace from param list for createRemoteFetchMapper
- Marked public methods as API.Status.EXPERIMENTAL
- Fix bugs with logging too many warnings in executePlan
- Added test with index that has primary key component
- Fixed tests to handle different behavior per API_VERSION

* Sonar issues:
- Changed future.get() to future.join() to simplify exception handling
- Suppress parent warning for exceptions
- Description of ignored test

* Test failures: Reduce overall size of transaction for large record test.
Wrap exception from FallbackCursor to simplify exception handling between fallback and non-fallback executions.

* Dummy commit to kick off build

* PR Comments:
- Add exception wrapping logic in FallbackCursor in the case where the handed exception is not wrapped
- Remove recordSubspace parameter from IndexMaintainer.scanRemoteFetch
- Turn warning messages to debug for the cases where commonPrimaryKey is null and index type is not VALUE in IndexPLan
- Add assertions on the failure cause in FallbackCursorTest
- Added RemoteFetchMultiColumnKeyTest
- Replaced OrphanPolicyTests with end-to-end test
- Added tests for both in-range and out-of-range read-your-writes

* Release notes

Co-authored-by: Alec Grieser <alloc@apple.com>
  • Loading branch information
ohadzeliger and alecgrieser committed May 16, 2022
1 parent d80d1db commit 29edea5
Show file tree
Hide file tree
Showing 32 changed files with 2,303 additions and 63 deletions.
4 changes: 2 additions & 2 deletions docs/ReleaseNotes.md
Expand Up @@ -32,11 +32,11 @@ This release also updates downstream dependency versions. Most notably, the prot
* **Performance** Improvement 5 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** The FDB API version can now be configured through the `FDBDatabaseFactory` [(Issue #1639)](https://github.com/FoundationDB/fdb-record-layer/issues/1639)
* **Feature** Match index hints in a query [(Issue #1671)](https://github.com/FoundationDB/fdb-record-layer/issues/1671)
* **Feature** Feature 2 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Remote Fetch feature using FDB's getMappedRange new API [(Issue #1560)](https://github.com/FoundationDB/fdb-record-layer/issues/1560)
* **Feature** Feature 3 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Feature 4 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Feature 5 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Breaking change** Change 1 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Breaking change** Extenders of StandardIndexMaintainers will inherit the scanRemoteFetch implementation. If this is not suppoted for the subclass, override the method [(Issue #1560)](https://github.com/FoundationDB/fdb-record-layer/issues/1560)
* **Breaking change** Change 2 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Breaking change** Change 3 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Breaking change** Change 4 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
Expand Down
@@ -0,0 +1,168 @@
/*
* FallbackCursor.java
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2015-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.apple.foundationdb.record.cursors;

import com.apple.foundationdb.annotation.API;
import com.apple.foundationdb.record.RecordCoreException;
import com.apple.foundationdb.record.RecordCursor;
import com.apple.foundationdb.record.RecordCursorResult;
import com.apple.foundationdb.record.RecordCursorVisitor;
import com.apple.foundationdb.util.LoggableException;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Supplier;

/**
* Provide an alternative cursor in case the primary cursor fails. This cursor has an <code>inner</code> cursor that is
* used to return the results in the sunny day scenarios, and an alternative provider of a <code>fallback</code> cursor
* that will be used if the primary cursor encounters an error.
*
* Note that there are business rules around when a fallback may happen. For example, because of the way records are
* returned to the consumer, once a record is returned from the primary cursor, no fallback may be permitted: This is
* done in order to prevent the case where a few records are returned, then a failure happens and the fallback cursor
* starts again from the beginning, resulting in duplicate records being returned.
* In practice, since many errors are observed when the request is sent to FDB (which coincide with the cursor's
* first <code>onNext()</code> call, many such failures will be caught by that first result.
*
* A note about continuations: As written, the cursor assumes that the <code>inner</code> and <code>fallback</code>
* cursors each have their own continuation to pick up from. A future enhancement can be to have this cursor store the
* state of the failover in its continuation and then package that with the appropriate inner continuation so that it
* can continue from that same state.
*
* @param <T> the type of cursor result returned by the cursor
*/
@API(API.Status.MAINTAINED)
public class FallbackCursor<T> implements RecordCursor<T> {
@Nonnull
private final Supplier<RecordCursor<T>> fallbackCursorSupplier;
@Nonnull
private final Executor executor;
@Nonnull
private RecordCursor<T> inner;
@Nullable
private CompletableFuture<RecordCursorResult<T>> nextResultFuture;

private boolean alreadyFailed = false;
private boolean allowedToFail = true;

/**
* Creates a new fallback cursor.
*
* @param inner the primary (default) cursor to be used when results are successfully returned
* @param fallbackCursorSupplier the fallback cursor provider to be used when the primary cursor fails
*/
public FallbackCursor(@Nonnull RecordCursor<T> inner, @Nonnull Supplier<RecordCursor<T>> fallbackCursorSupplier) {
this.inner = inner;
this.fallbackCursorSupplier = fallbackCursorSupplier;
this.executor = inner.getExecutor();
}

@Nonnull
@Override
public CompletableFuture<RecordCursorResult<T>> onNext() {
try {
if (nextResultFuture != null && nextResultFuture.isDone() && !nextResultFuture.join().hasNext()) {
// This is needed to ensure we return the same terminal value once the cursor is exhausted
return nextResultFuture;
}
} catch (Exception ignored) {
// This will happen if the future finished exceptionally - just keep going (client will get
// the exception when they observe the future).
}
// The first stage (handle) will calculate the result of the operation if successful, or replace the inner
// with the fallback cursor if failed, and store future to the result in nextResultFuture.
// The second stage (thenCompose) will return nextResultFuture once the first stage is done.
return inner.onNext().handle((result, throwable) -> {
if (throwable == null) {
nextResultFuture = CompletableFuture.completedFuture(result);
// Cannot fail after the first result was delivered
allowedToFail = false;
} else {
if (alreadyFailed) {
nextResultFuture = CompletableFuture.failedFuture(wrapException("Fallback cursor failed, cannot fallback again", throwable));
} else if (!allowedToFail) {
nextResultFuture = CompletableFuture.failedFuture(wrapException("Cannot fallback to alternate cursor since inner already produced a record", throwable));
} else {
inner.close();
inner = fallbackCursorSupplier.get();
nextResultFuture = inner.onNext();
}
alreadyFailed = true;
}
return null; // return value is ignored by next stage
}).thenCompose(vignore -> nextResultFuture);
}

@Override
public void close() {
inner.close();
}

@Nonnull
@Override
public Executor getExecutor() {
return this.executor;
}

private RecordCursor<T> getInner() {
return inner;
}

@Override
public boolean accept(@Nonnull RecordCursorVisitor visitor) {
if (visitor.visitEnter(this)) {
getInner().accept(visitor);
}
return visitor.visitLeave(this);
}

private Throwable wrapException(final String msg, final Throwable ex) {
if (ex instanceof LoggableException) {
// In the case of loggable exception, maintain the original exception to simplify exception handling across
// fallback and non-fallback executions
LoggableException loggableException = (LoggableException)ex;
loggableException.addLogInfo("fallback_failed", msg);
return ex;
} else if ((ex.getCause() != null) && (ex.getCause() instanceof LoggableException)) {
// Same but in case the throwable is already wrapping the LoggableException
LoggableException loggableException = (LoggableException)(ex.getCause());
loggableException.addLogInfo("fallback_failed", msg);
return ex;
} else {
return new FallbackExecutionFailedException(msg, ex);
}
}

/**
* Exception thrown when the fallback cursor fails.
*/
@SuppressWarnings("java:S110")
public static class FallbackExecutionFailedException extends RecordCoreException {
public static final long serialVersionUID = 1;

public FallbackExecutionFailedException(@Nonnull final String msg, @Nullable final Throwable cause) {
super(msg, cause);
}
}
}
Expand Up @@ -514,6 +514,28 @@ public Tuple getEntryPrimaryKey(@Nonnull Tuple entry) {
return Tuple.fromList(primaryKeys);
}

/**
* Get the primary key positions of an index entry.
* @param primaryKeyLength the number of elements in the primary key for the record
* @return a list of the primary key positions for an entry of this index
*/
@Nonnull
public List<Integer> getEntryPrimaryKeyPositions(int primaryKeyLength) {
List<Integer> primaryKeys = new ArrayList<>(primaryKeyLength);
int columnSize = getColumnSize();
if (primaryKeyComponentPositions == null) {
for (int i = columnSize; i < (columnSize + primaryKeyLength); i++ ) {
primaryKeys.add(i);
}
} else {
int after = columnSize;
for (int position : primaryKeyComponentPositions) {
primaryKeys.add((position < 0) ? after++ : position);
}
}
return primaryKeys;
}

/**
* Get the version at which the index was first added.
* @return the added version
Expand Down
@@ -0,0 +1,106 @@
/*
* FDBIndexedRecord.java
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2015-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.apple.foundationdb.record.provider.foundationdb;

import com.apple.foundationdb.MappedKeyValue;
import com.apple.foundationdb.annotation.API;
import com.apple.foundationdb.record.IndexEntry;
import com.apple.foundationdb.record.metadata.Index;

import javax.annotation.Nonnull;
import java.util.Objects;

/**
* A raw record that has been loaded via an index. This is the case where the record was loaded via the
* {@link IndexMaintainer#scanRemoteFetch}. The raw record may contain versions, splits and other raw record data.
*/
@API(API.Status.EXPERIMENTAL)
public class FDBIndexedRawRecord {
@Nonnull
private final IndexEntry indexEntry;
@Nonnull
private final MappedKeyValue rawRecord;

/**
* Wrap a stored record with an index entry that pointed to it. This method is internal, and it generally
* should not be called be external clients.
*
* @param indexEntry the index entry that produced this record
* @param rawRecord the {@link FDBRawRecord} containing the record's data
*/
@API(API.Status.INTERNAL)
public FDBIndexedRawRecord(@Nonnull IndexEntry indexEntry, @Nonnull MappedKeyValue rawRecord) {
this.indexEntry = indexEntry;
this.rawRecord = rawRecord;
}

/**
* Get the index for this record.
* @return the index that contained the entry pointing to this record
*/
@Nonnull
public Index getIndex() {
return indexEntry.getIndex();
}

/**
* Get the index entry for this record.
* @return the index entry that pointed to this record
*/
@Nonnull
public IndexEntry getIndexEntry() {
return indexEntry;
}

@Nonnull
public MappedKeyValue getRawRecord() {
return rawRecord;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

FDBIndexedRawRecord that = (FDBIndexedRawRecord)o;

if (!indexEntry.equals(that.indexEntry)) {
return false;
}
return Objects.equals(this.rawRecord, that.rawRecord);
}

@Override
public int hashCode() {
int result = indexEntry.hashCode();
result = 31 * result + Objects.hashCode(rawRecord);
return result;
}

@Override
public String toString() {
return indexEntry + " -> " + rawRecord;
}
}

0 comments on commit 29edea5

Please sign in to comment.