Skip to content

Commit

Permalink
Resolves FoundationDB#1430: Support non-idempotent target indexes whi…
Browse files Browse the repository at this point in the history
…le indexing by index

This adds support for building a non-idempotent target index when indexing from a different source index. This works in a manner that is analogous to the way that this operation works for non-idempotent indexes built by a record scan, except that as the range set contains ranges of index entries from the source index, the maintainer needs to be updated to: (1) check the indexing type stamp and (2) modify the range set check to use the index key instead of the primary key. Then there are some updates to the indexers to adjust logic that assumed the target index type would always be idempotent.

This resolves FoundationDB#1430.
  • Loading branch information
alecgrieser committed Nov 9, 2022
1 parent 032f1e4 commit 602d8eb
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 57 deletions.
2 changes: 1 addition & 1 deletion docs/ReleaseNotes.md
Expand Up @@ -27,7 +27,7 @@ The Guava dependency version has been updated to 31.1. Projects may need to chec
* **Performance** Improvement 5 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Feature 1 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Feature 2 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Feature 3 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Non-idempotent target indexes can now be built from an existing index [(Issue #1430)](https://github.com/FoundationDB/fdb-record-layer/issues/1430)
* **Feature** Feature 4 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Feature 5 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Support planning aggregate indexes in Cascades. [(Issue #1885)](https://github.com/FoundationDB/fdb-record-layer/issues/1885)
Expand Down
Expand Up @@ -42,6 +42,7 @@
import com.apple.foundationdb.record.ExecuteProperties;
import com.apple.foundationdb.record.ExecuteState;
import com.apple.foundationdb.record.FunctionNames;
import com.apple.foundationdb.record.IndexBuildProto;
import com.apple.foundationdb.record.IndexEntry;
import com.apple.foundationdb.record.IndexScanType;
import com.apple.foundationdb.record.IndexState;
Expand Down Expand Up @@ -605,22 +606,12 @@ private <M extends Message> void updateSecondaryIndexes(@Nullable final FDBIndex
for (Index index : indexes) {
final IndexMaintainer maintainer = getIndexMaintainer(index);
final CompletableFuture<Void> future;
if (!maintainer.isIdempotent() && isIndexWriteOnly(index)) {
// In this case, the index is still being built, so we are not
// going to update the record unless the rebuild job has already
// gotten to this range.
final Tuple primaryKey = newRecord == null ? oldRecord.getPrimaryKey() : newRecord.getPrimaryKey();
future = maintainer.addedRangeWithKey(primaryKey)
.thenCompose(present -> {
if (present) {
return maintainer.update(oldRecord, newRecord);
} else {
return AsyncUtil.DONE;
}
});
if (!MoreAsyncUtil.isCompletedNormally(future)) {
futures.add(future);
}
if (isIndexWriteOnly(index)) {
// In this case, the index is still being built. For some index
// types, the index update needs to check whether indexing
// process has already built the relevant ranges, and it
// may adjust the way the index is built in response.
future = maintainer.updateWhileWriteOnly(oldRecord, newRecord);
} else {
future = maintainer.update(oldRecord, newRecord);
}
Expand Down Expand Up @@ -3499,6 +3490,32 @@ public CompletableFuture<IndexBuildState> getIndexBuildStateAsync(Index index) {
return IndexBuildState.loadIndexBuildStateAsync(this, index);
}

@API(API.Status.INTERNAL)
@Nonnull
public CompletableFuture<IndexBuildProto.IndexBuildIndexingStamp> loadIndexBuildStampAsync(Index index) {
byte[] stampKey = IndexingBase.indexBuildTypeSubspace(this, index).pack();
return ensureContextActive().get(stampKey).thenApply(serializedStamp -> {
if (serializedStamp == null) {
return null;
}
try {
return IndexBuildProto.IndexBuildIndexingStamp.parseFrom(serializedStamp);
} catch (InvalidProtocolBufferException ex) {
RecordCoreException protoEx = new RecordCoreException("invalid indexing type stamp",
LogMessageKeys.INDEX_NAME, index.getName(),
LogMessageKeys.ACTUAL, ByteArrayUtil2.loggable(serializedStamp));
protoEx.initCause(ex);
throw protoEx;
}
});
}

@API(API.Status.INTERNAL)
public void saveIndexBuildStamp(Index index, IndexBuildProto.IndexBuildIndexingStamp stamp) {
byte[] stampKey = IndexingBase.indexBuildTypeSubspace(this, index).pack();
ensureContextActive().set(stampKey, stamp.toByteArray());
}

// Remove any indexes that do not match the filter.
// NOTE: This assumes that the filter will not filter out any indexes if all indexes are readable.
private List<Index> sanitizeIndexes(@Nonnull List<Index> indexes, @Nonnull Predicate<Index> filter) {
Expand Down
Expand Up @@ -121,6 +121,10 @@ public RecordCursor<IndexEntry> scan(@Nonnull IndexScanBounds scanBounds,
public abstract <M extends Message> CompletableFuture<Void> update(@Nullable FDBIndexableRecord<M> oldRecord,
@Nullable FDBIndexableRecord<M> newRecord);

@Nonnull
public abstract <M extends Message> CompletableFuture<Void> updateWhileWriteOnly(@Nullable FDBIndexableRecord<M> oldRecord,
@Nullable FDBIndexableRecord<M> newRecord);


/**
* Scans through the list of uniqueness violations within the database.
Expand Down
Expand Up @@ -47,7 +47,6 @@
import com.apple.foundationdb.subspace.Subspace;
import com.apple.foundationdb.tuple.ByteArrayUtil2;
import com.apple.foundationdb.tuple.Tuple;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
Expand Down Expand Up @@ -401,35 +400,32 @@ public void enforceStampOverwrite() {
@SuppressWarnings("PMD.CloseResource")
private CompletableFuture<Void> setIndexingTypeOrThrow(FDBRecordStore store, boolean continuedBuild) {
// continuedBuild is set if this session isn't a continuation of a previous indexing
Transaction transaction = store.getContext().ensureActive();
IndexBuildProto.IndexBuildIndexingStamp indexingTypeStamp = getIndexingTypeStamp(store);

return forEachTargetIndex(index -> setIndexingTypeOrThrow(store, continuedBuild, transaction, index, indexingTypeStamp));
return forEachTargetIndex(index -> setIndexingTypeOrThrow(store, continuedBuild, index, indexingTypeStamp));
}

@Nonnull
private CompletableFuture<Void> setIndexingTypeOrThrow(FDBRecordStore store, boolean continuedBuild, Transaction transaction, Index index, IndexBuildProto.IndexBuildIndexingStamp indexingTypeStamp) {
byte[] stampKey = indexBuildTypeSubspace(store, index).getKey();
private CompletableFuture<Void> setIndexingTypeOrThrow(FDBRecordStore store, boolean continuedBuild, Index index, IndexBuildProto.IndexBuildIndexingStamp indexingTypeStamp) {
if (forceStampOverwrite && !continuedBuild) {
// Fresh session + overwrite = no questions asked
transaction.set(stampKey, indexingTypeStamp.toByteArray());
store.saveIndexBuildStamp(index, indexingTypeStamp);
return AsyncUtil.DONE;
}
return transaction.get(stampKey)
.thenCompose(bytes -> {
if (bytes == null) {
return store.loadIndexBuildStampAsync(index)
.thenCompose(savedStamp -> {
if (savedStamp == null) {
if (continuedBuild && indexingTypeStamp.getMethod() !=
IndexBuildProto.IndexBuildIndexingStamp.Method.BY_RECORDS) {
// backward compatibility - maybe continuing an old BY_RECORD session
return isWriteOnlyButNoRecordScanned(store, index)
.thenCompose(noRecordScanned -> throwAsByRecordsUnlessNoRecordWasScanned(noRecordScanned, transaction, index, stampKey, indexingTypeStamp));
.thenCompose(noRecordScanned -> throwAsByRecordsUnlessNoRecordWasScanned(noRecordScanned, store, index, indexingTypeStamp));
}
// Here: either not a continuedBuild (new session), or a BY_RECORD session (allowed to overwrite the null stamp)
transaction.set(stampKey, indexingTypeStamp.toByteArray());
store.saveIndexBuildStamp(index, indexingTypeStamp);
return AsyncUtil.DONE;
}
// Here: has non-null type stamp
IndexBuildProto.IndexBuildIndexingStamp savedStamp = parseTypeStampOrThrow(bytes);
if (indexingTypeStamp.equals(savedStamp)) {
// A matching stamp is already there - One less thing to worry about
return AsyncUtil.DONE;
Expand All @@ -438,14 +434,14 @@ private CompletableFuture<Void> setIndexingTypeOrThrow(FDBRecordStore store, boo
indexingTypeStamp.getMethod() == IndexBuildProto.IndexBuildIndexingStamp.Method.BY_RECORDS &&
savedStamp.getMethod() == IndexBuildProto.IndexBuildIndexingStamp.Method.MULTI_TARGET_BY_RECORDS) {
// Special case: partly built with multi target, but may be continued indexing on its own
transaction.set(stampKey, indexingTypeStamp.toByteArray());
store.saveIndexBuildStamp(index, indexingTypeStamp);
return AsyncUtil.DONE;
}
if (forceStampOverwrite) { // and a continued Build
// check if partly built
return isWriteOnlyButNoRecordScanned(store, index)
.thenCompose(noRecordScanned ->
throwUnlessNoRecordWasScanned(noRecordScanned, transaction, index, stampKey, indexingTypeStamp,
throwUnlessNoRecordWasScanned(noRecordScanned, store, index, indexingTypeStamp,
savedStamp, continuedBuild));
}
// fall down to exception
Expand All @@ -454,8 +450,9 @@ private CompletableFuture<Void> setIndexingTypeOrThrow(FDBRecordStore store, boo
}

@Nonnull
private CompletableFuture<Void> throwAsByRecordsUnlessNoRecordWasScanned(boolean noRecordScanned, Transaction transaction,
Index index, byte[] stampKey,
private CompletableFuture<Void> throwAsByRecordsUnlessNoRecordWasScanned(boolean noRecordScanned,
FDBRecordStore store,
Index index,
IndexBuildProto.IndexBuildIndexingStamp indexingTypeStamp) {
// A complicated way to reduce complexity.
if (noRecordScanned) {
Expand All @@ -465,7 +462,7 @@ private CompletableFuture<Void> throwAsByRecordsUnlessNoRecordWasScanned(boolean
.addKeysAndValues(common.indexLogMessageKeyValues())
.toString());
}
transaction.set(stampKey, indexingTypeStamp.toByteArray());
store.saveIndexBuildStamp(index, indexingTypeStamp);
return AsyncUtil.DONE;
}
// Here: there is no type stamp, but indexing is ongoing. For backward compatibility reasons, we'll consider it a BY_RECORDS stamp
Expand All @@ -479,15 +476,16 @@ private CompletableFuture<Void> throwAsByRecordsUnlessNoRecordWasScanned(boolean
}

@Nonnull
private CompletableFuture<Void> throwUnlessNoRecordWasScanned(boolean noRecordScanned, Transaction transaction,
Index index, byte[] stampKey,
private CompletableFuture<Void> throwUnlessNoRecordWasScanned(boolean noRecordScanned,
FDBRecordStore store,
Index index,
IndexBuildProto.IndexBuildIndexingStamp indexingTypeStamp,
IndexBuildProto.IndexBuildIndexingStamp savedStamp,
boolean continuedBuild) {
// Ditto (a complicated way to reduce complexity)
if (noRecordScanned) {
// we can safely overwrite the previous type stamp
transaction.set(stampKey, indexingTypeStamp.toByteArray());
store.saveIndexBuildStamp(index, indexingTypeStamp);
return AsyncUtil.DONE;
}
// A force overwrite cannot be allowed when partly built
Expand Down Expand Up @@ -543,19 +541,6 @@ private CompletableFuture<Void> setScrubberTypeOrThrow(FDBRecordStore store) {

abstract CompletableFuture<Void> buildIndexInternalAsync();

private IndexBuildProto.IndexBuildIndexingStamp parseTypeStampOrThrow(byte[] bytes) {
try {
return IndexBuildProto.IndexBuildIndexingStamp.parseFrom(bytes);
} catch (InvalidProtocolBufferException ex) {
RecordCoreException protoEx = new RecordCoreException("invalid indexing type stamp",
LogMessageKeys.INDEX_NAME, common.getTargetIndexesNames(),
LogMessageKeys.INDEXER_ID, common.getUuid(),
LogMessageKeys.ACTUAL, bytes);
protoEx.initCause(ex);
throw protoEx;
}
}

private CompletableFuture<Boolean> isWriteOnlyButNoRecordScanned(FDBRecordStore store, Index index) {
RangeSet rangeSet = new RangeSet(store.indexRangeSubspace(index));
AsyncIterator<Range> ranges = rangeSet.missingRanges(store.ensureContextActive()).iterator();
Expand Down
Expand Up @@ -151,7 +151,7 @@ private CompletableFuture<Boolean> buildRangeOnly(@Nonnull FDBRecordStore store,
final IndexMaintainer maintainer = store.getIndexMaintainer(index);

// idempotence - We could have verified it at the first iteration only, but the repeating checks seem harmless
validateOrThrowEx(maintainer.isIdempotent(), "target index is not idempotent");
// validateOrThrowEx(maintainer.isIdempotent(), "target index is not idempotent");
// readability - This method shouldn't block if one has already opened the record store (as we did)
Index srcIndex = getSourceIndex(store.getRecordMetaData());
validateOrThrowEx(store.isIndexScannable(srcIndex), "source index is not scannable");
Expand All @@ -160,7 +160,7 @@ private CompletableFuture<Boolean> buildRangeOnly(@Nonnull FDBRecordStore store,
AsyncIterator<Range> ranges = rangeSet.missingRanges(store.ensureContextActive()).iterator();

final ExecuteProperties.Builder executeProperties = ExecuteProperties.newBuilder()
.setIsolationLevel(IsolationLevel.SNAPSHOT)
.setIsolationLevel(maintainer.isIdempotent() ? IsolationLevel.SNAPSHOT : IsolationLevel.SERIALIZABLE)
.setReturnedRowLimit(getLimit() + 1); // respect limit in this path; +1 allows a continuation item
final ScanProperties scanProperties = new ScanProperties(executeProperties.build());

Expand All @@ -179,10 +179,9 @@ private CompletableFuture<Boolean> buildRangeOnly(@Nonnull FDBRecordStore store,
final AtomicReference<RecordCursorResult<FDBIndexedRecord<Message>>> lastResult = new AtomicReference<>(RecordCursorResult.exhausted());
final AtomicBoolean hasMore = new AtomicBoolean(true);

final boolean isIdempotent = true ; // Note that currently indexing by index is online implemented for idempotent indexes
return iterateRangeOnly(store, cursor,
this::getRecordIfTypeMatch,
lastResult, hasMore, recordsScanned, isIdempotent)
lastResult, hasMore, recordsScanned, maintainer.isIdempotent())
.thenApply(vignore -> hasMore.get() ?
lastResult.get().get().getIndexEntry().getKey() :
rangeEnd)
Expand Down
Expand Up @@ -73,6 +73,12 @@ public <M extends Message> CompletableFuture<Void> update(@Nullable FDBIndexable
return AsyncUtil.DONE;
}

@Nonnull
@Override
public <M extends Message> CompletableFuture<Void> updateWhileWriteOnly(@Nullable final FDBIndexableRecord<M> oldRecord, @Nullable final FDBIndexableRecord<M> newRecord) {
return AsyncUtil.DONE;
}

@Nonnull
@Override
public RecordCursor<IndexEntry> scanUniquenessViolations(@Nonnull TupleRange range, @Nullable byte[] continuation, @Nonnull ScanProperties scanProperties) {
Expand Down

0 comments on commit 602d8eb

Please sign in to comment.