Skip to content

Commit

Permalink
Resolves #1430: Support non-idempotent target indexes while indexing …
Browse files Browse the repository at this point in the history
…by index (#1904)

* Resolves #1430: Support non-idempotent target indexes while indexing by index

This adds support for building a non-idempotent target index when indexing from a different source index. This works in a manner that is analogous to the way that this operation works for non-idempotent indexes built by a record scan, except that as the range set contains ranges of index entries from the source index, the maintainer needs to be updated to: (1) check the indexing type stamp and (2) modify the range set check to use the index key instead of the primary key. Then there are some updates to the indexers to adjust logic that assumed the target index type would always be idempotent.

This resolves #1430.
  • Loading branch information
alecgrieser authored and ammolitor committed Nov 17, 2022
1 parent 621f16c commit 087e5b2
Show file tree
Hide file tree
Showing 22 changed files with 1,464 additions and 406 deletions.
2 changes: 1 addition & 1 deletion docs/ReleaseNotes.md
Expand Up @@ -27,7 +27,7 @@ The Guava dependency version has been updated to 31.1. Projects may need to chec
* **Performance** Improvement 5 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Feature 1 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Feature 2 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Feature 3 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Non-idempotent target indexes can now be built from an existing index [(Issue #1430)](https://github.com/FoundationDB/fdb-record-layer/issues/1430)
* **Feature** Feature 4 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Feature 5 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Breaking change** Change 1 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
Expand Down
Expand Up @@ -42,6 +42,7 @@
import com.apple.foundationdb.record.ExecuteProperties;
import com.apple.foundationdb.record.ExecuteState;
import com.apple.foundationdb.record.FunctionNames;
import com.apple.foundationdb.record.IndexBuildProto;
import com.apple.foundationdb.record.IndexEntry;
import com.apple.foundationdb.record.IndexScanType;
import com.apple.foundationdb.record.IndexState;
Expand Down Expand Up @@ -189,11 +190,13 @@ public class FDBRecordStore extends FDBStoreBase implements FDBRecordStoreBase<M
public static final int CACHEABLE_STATE_FORMAT_VERSION = 7;
// 8 - add custom fields to store header
public static final int HEADER_USER_FIELDS_FORMAT_VERSION = 8;

// 9 - add READABLE_UNIQUE_PENDING index state
public static final int READABLE_UNIQUE_PENDING_FORMAT_VERSION = 9;
// 10 - check index build type during update
public static final int CHECK_INDEX_BUILD_TYPE_DURING_UPDATE_FORMAT_VERSION = 10;

// The current code can read and write up to the format version below
public static final int MAX_SUPPORTED_FORMAT_VERSION = READABLE_UNIQUE_PENDING_FORMAT_VERSION;
public static final int MAX_SUPPORTED_FORMAT_VERSION = CHECK_INDEX_BUILD_TYPE_DURING_UPDATE_FORMAT_VERSION;

// By default, record stores attempt to upgrade to this version
// NOTE: Updating this can break certain users during upgrades.
Expand Down Expand Up @@ -605,22 +608,12 @@ private <M extends Message> void updateSecondaryIndexes(@Nullable final FDBIndex
for (Index index : indexes) {
final IndexMaintainer maintainer = getIndexMaintainer(index);
final CompletableFuture<Void> future;
if (!maintainer.isIdempotent() && isIndexWriteOnly(index)) {
// In this case, the index is still being built, so we are not
// going to update the record unless the rebuild job has already
// gotten to this range.
final Tuple primaryKey = newRecord == null ? oldRecord.getPrimaryKey() : newRecord.getPrimaryKey();
future = maintainer.addedRangeWithKey(primaryKey)
.thenCompose(present -> {
if (present) {
return maintainer.update(oldRecord, newRecord);
} else {
return AsyncUtil.DONE;
}
});
if (!MoreAsyncUtil.isCompletedNormally(future)) {
futures.add(future);
}
if (isIndexWriteOnly(index)) {
// In this case, the index is still being built. For some index
// types, the index update needs to check whether indexing
// process has already built the relevant ranges, and it
// may adjust the way the index is built in response.
future = maintainer.updateWhileWriteOnly(oldRecord, newRecord);
} else {
future = maintainer.update(oldRecord, newRecord);
}
Expand Down Expand Up @@ -3499,6 +3492,49 @@ public CompletableFuture<IndexBuildState> getIndexBuildStateAsync(Index index) {
return IndexBuildState.loadIndexBuildStateAsync(this, index);
}

/**
* Load the indexing type stamp for an index. This stamp contains information about the kind of
* index build being used to construct a new index. This method is {@link API.Status#INTERNAL}.
*
* @param index the index being built
* @return the indexing type stamp for the index's current build
* @see #saveIndexingTypeStamp(Index, IndexBuildProto.IndexBuildIndexingStamp)
*/
@API(API.Status.INTERNAL)
@Nonnull
public CompletableFuture<IndexBuildProto.IndexBuildIndexingStamp> loadIndexingTypeStampAsync(Index index) {
byte[] stampKey = IndexingBase.indexBuildTypeSubspace(this, index).pack();
return ensureContextActive().get(stampKey).thenApply(serializedStamp -> {
if (serializedStamp == null) {
return null;
}
try {
return IndexBuildProto.IndexBuildIndexingStamp.parseFrom(serializedStamp);
} catch (InvalidProtocolBufferException ex) {
RecordCoreException protoEx = new RecordCoreException("invalid indexing type stamp",
LogMessageKeys.INDEX_NAME, index.getName(),
LogMessageKeys.ACTUAL, ByteArrayUtil2.loggable(serializedStamp));
protoEx.initCause(ex);
throw protoEx;
}
});
}

/**
* Update the indexing type stamp for the given index. This is used by the {@link OnlineIndexer}
* to document what kind of indexing procedure is being used to build the given index. This method
* is {@link API.Status#INTERNAL}.
*
* @param index the index being built
* @param stamp the new value of the index's indexing type stamp
* @see #loadIndexingTypeStampAsync(Index)
*/
@API(API.Status.INTERNAL)
public void saveIndexingTypeStamp(Index index, IndexBuildProto.IndexBuildIndexingStamp stamp) {
byte[] stampKey = IndexingBase.indexBuildTypeSubspace(this, index).pack();
ensureContextActive().set(stampKey, stamp.toByteArray());
}

// Remove any indexes that do not match the filter.
// NOTE: This assumes that the filter will not filter out any indexes if all indexes are readable.
private List<Index> sanitizeIndexes(@Nonnull List<Index> indexes, @Nonnull Predicate<Index> filter) {
Expand Down
Expand Up @@ -115,12 +115,30 @@ public RecordCursor<IndexEntry> scan(@Nonnull IndexScanBounds scanBounds,
* @param oldRecord the previous stored record or <code>null</code> if a new record is being created
* @param newRecord the new record or <code>null</code> if an old record is being deleted
* @param <M> type of message
* @return a future that is complete when the record update is done
* @return a future that is complete when the index update is done
*/
@Nonnull
public abstract <M extends Message> CompletableFuture<Void> update(@Nullable FDBIndexableRecord<M> oldRecord,
@Nullable FDBIndexableRecord<M> newRecord);

/**
* Update the associated index for a changed record while the index is in
* {@link com.apple.foundationdb.record.IndexState#WRITE_ONLY} mode. For most indexes, this should do the
* same thing that a normal update does, but if the index is not {@linkplain #isIdempotent() idempotent},
* then during an index build, it may need to perform additional checks to make sure each record is only
* added to the index once. In particular, it can check the {@link com.apple.foundationdb.async.RangeSet}
* associated with the index build to check to see if the record has already been indexed, and then decide
* to update (or not update) the index as appropriate.
*
* @param oldRecord the previous stored record or <code>null</code> if a new record is being created
* @param newRecord the new record or <code>null</code> if an old record is being deleted
* @param <M> type of message
* @return a future that is complete when the index update is done
*/
@Nonnull
public abstract <M extends Message> CompletableFuture<Void> updateWhileWriteOnly(@Nullable FDBIndexableRecord<M> oldRecord,
@Nullable FDBIndexableRecord<M> newRecord);


/**
* Scans through the list of uniqueness violations within the database.
Expand Down
Expand Up @@ -47,7 +47,6 @@
import com.apple.foundationdb.subspace.Subspace;
import com.apple.foundationdb.tuple.ByteArrayUtil2;
import com.apple.foundationdb.tuple.Tuple;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
Expand Down Expand Up @@ -401,35 +400,32 @@ public void enforceStampOverwrite() {
@SuppressWarnings("PMD.CloseResource")
private CompletableFuture<Void> setIndexingTypeOrThrow(FDBRecordStore store, boolean continuedBuild) {
// continuedBuild is set if this session isn't a continuation of a previous indexing
Transaction transaction = store.getContext().ensureActive();
IndexBuildProto.IndexBuildIndexingStamp indexingTypeStamp = getIndexingTypeStamp(store);

return forEachTargetIndex(index -> setIndexingTypeOrThrow(store, continuedBuild, transaction, index, indexingTypeStamp));
return forEachTargetIndex(index -> setIndexingTypeOrThrow(store, continuedBuild, index, indexingTypeStamp));
}

@Nonnull
private CompletableFuture<Void> setIndexingTypeOrThrow(FDBRecordStore store, boolean continuedBuild, Transaction transaction, Index index, IndexBuildProto.IndexBuildIndexingStamp indexingTypeStamp) {
byte[] stampKey = indexBuildTypeSubspace(store, index).getKey();
private CompletableFuture<Void> setIndexingTypeOrThrow(FDBRecordStore store, boolean continuedBuild, Index index, IndexBuildProto.IndexBuildIndexingStamp indexingTypeStamp) {
if (forceStampOverwrite && !continuedBuild) {
// Fresh session + overwrite = no questions asked
transaction.set(stampKey, indexingTypeStamp.toByteArray());
store.saveIndexingTypeStamp(index, indexingTypeStamp);
return AsyncUtil.DONE;
}
return transaction.get(stampKey)
.thenCompose(bytes -> {
if (bytes == null) {
return store.loadIndexingTypeStampAsync(index)
.thenCompose(savedStamp -> {
if (savedStamp == null) {
if (continuedBuild && indexingTypeStamp.getMethod() !=
IndexBuildProto.IndexBuildIndexingStamp.Method.BY_RECORDS) {
// backward compatibility - maybe continuing an old BY_RECORD session
return isWriteOnlyButNoRecordScanned(store, index)
.thenCompose(noRecordScanned -> throwAsByRecordsUnlessNoRecordWasScanned(noRecordScanned, transaction, index, stampKey, indexingTypeStamp));
.thenCompose(noRecordScanned -> throwAsByRecordsUnlessNoRecordWasScanned(noRecordScanned, store, index, indexingTypeStamp));
}
// Here: either not a continuedBuild (new session), or a BY_RECORD session (allowed to overwrite the null stamp)
transaction.set(stampKey, indexingTypeStamp.toByteArray());
store.saveIndexingTypeStamp(index, indexingTypeStamp);
return AsyncUtil.DONE;
}
// Here: has non-null type stamp
IndexBuildProto.IndexBuildIndexingStamp savedStamp = parseTypeStampOrThrow(bytes);
if (indexingTypeStamp.equals(savedStamp)) {
// A matching stamp is already there - One less thing to worry about
return AsyncUtil.DONE;
Expand All @@ -438,14 +434,14 @@ private CompletableFuture<Void> setIndexingTypeOrThrow(FDBRecordStore store, boo
indexingTypeStamp.getMethod() == IndexBuildProto.IndexBuildIndexingStamp.Method.BY_RECORDS &&
savedStamp.getMethod() == IndexBuildProto.IndexBuildIndexingStamp.Method.MULTI_TARGET_BY_RECORDS) {
// Special case: partly built with multi target, but may be continued indexing on its own
transaction.set(stampKey, indexingTypeStamp.toByteArray());
store.saveIndexingTypeStamp(index, indexingTypeStamp);
return AsyncUtil.DONE;
}
if (forceStampOverwrite) { // and a continued Build
// check if partly built
return isWriteOnlyButNoRecordScanned(store, index)
.thenCompose(noRecordScanned ->
throwUnlessNoRecordWasScanned(noRecordScanned, transaction, index, stampKey, indexingTypeStamp,
throwUnlessNoRecordWasScanned(noRecordScanned, store, index, indexingTypeStamp,
savedStamp, continuedBuild));
}
// fall down to exception
Expand All @@ -454,8 +450,9 @@ private CompletableFuture<Void> setIndexingTypeOrThrow(FDBRecordStore store, boo
}

@Nonnull
private CompletableFuture<Void> throwAsByRecordsUnlessNoRecordWasScanned(boolean noRecordScanned, Transaction transaction,
Index index, byte[] stampKey,
private CompletableFuture<Void> throwAsByRecordsUnlessNoRecordWasScanned(boolean noRecordScanned,
FDBRecordStore store,
Index index,
IndexBuildProto.IndexBuildIndexingStamp indexingTypeStamp) {
// A complicated way to reduce complexity.
if (noRecordScanned) {
Expand All @@ -465,7 +462,7 @@ private CompletableFuture<Void> throwAsByRecordsUnlessNoRecordWasScanned(boolean
.addKeysAndValues(common.indexLogMessageKeyValues())
.toString());
}
transaction.set(stampKey, indexingTypeStamp.toByteArray());
store.saveIndexingTypeStamp(index, indexingTypeStamp);
return AsyncUtil.DONE;
}
// Here: there is no type stamp, but indexing is ongoing. For backward compatibility reasons, we'll consider it a BY_RECORDS stamp
Expand All @@ -479,15 +476,16 @@ private CompletableFuture<Void> throwAsByRecordsUnlessNoRecordWasScanned(boolean
}

@Nonnull
private CompletableFuture<Void> throwUnlessNoRecordWasScanned(boolean noRecordScanned, Transaction transaction,
Index index, byte[] stampKey,
private CompletableFuture<Void> throwUnlessNoRecordWasScanned(boolean noRecordScanned,
FDBRecordStore store,
Index index,
IndexBuildProto.IndexBuildIndexingStamp indexingTypeStamp,
IndexBuildProto.IndexBuildIndexingStamp savedStamp,
boolean continuedBuild) {
// Ditto (a complicated way to reduce complexity)
if (noRecordScanned) {
// we can safely overwrite the previous type stamp
transaction.set(stampKey, indexingTypeStamp.toByteArray());
store.saveIndexingTypeStamp(index, indexingTypeStamp);
return AsyncUtil.DONE;
}
// A force overwrite cannot be allowed when partly built
Expand Down Expand Up @@ -543,19 +541,6 @@ private CompletableFuture<Void> setScrubberTypeOrThrow(FDBRecordStore store) {

abstract CompletableFuture<Void> buildIndexInternalAsync();

private IndexBuildProto.IndexBuildIndexingStamp parseTypeStampOrThrow(byte[] bytes) {
try {
return IndexBuildProto.IndexBuildIndexingStamp.parseFrom(bytes);
} catch (InvalidProtocolBufferException ex) {
RecordCoreException protoEx = new RecordCoreException("invalid indexing type stamp",
LogMessageKeys.INDEX_NAME, common.getTargetIndexesNames(),
LogMessageKeys.INDEXER_ID, common.getUuid(),
LogMessageKeys.ACTUAL, bytes);
protoEx.initCause(ex);
throw protoEx;
}
}

private CompletableFuture<Boolean> isWriteOnlyButNoRecordScanned(FDBRecordStore store, Index index) {
RangeSet rangeSet = new RangeSet(store.indexRangeSubspace(index));
AsyncIterator<Range> ranges = rangeSet.missingRanges(store.ensureContextActive()).iterator();
Expand Down

0 comments on commit 087e5b2

Please sign in to comment.