From a2ce9f2131f22424f8bc9a545b5113c0bdaba6ca Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sat, 12 Dec 2020 10:30:18 -0500 Subject: [PATCH] Add index commit id to searcher (#63963) This change assigns the id of an index commit to a searcher, so we can retry search requests on another shard copy if they have the same index commit. --- .../indices/state/CloseIndexIT.java | 30 +++++++++++++++++++ .../elasticsearch/common/lucene/Lucene.java | 8 +++++ .../index/engine/CommitStats.java | 3 +- .../elasticsearch/index/engine/Engine.java | 9 ++++++ .../index/engine/ReadOnlyEngine.java | 23 ++++++++++++++ .../index/engine/FrozenEngine.java | 5 ++++ 6 files changed, 76 insertions(+), 2 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/state/CloseIndexIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/state/CloseIndexIT.java index be3d36babde7d0..06529d7b77c181 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/state/CloseIndexIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/state/CloseIndexIT.java @@ -34,9 +34,11 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndexClosedException; import org.elasticsearch.indices.IndicesService; @@ -472,6 +474,34 @@ public void testResyncPropagatePrimaryTerm() throws Exception { } } + public void testCommitIdInSearcher() throws Exception { + final String indexName = "test_commit_id"; + createIndex(indexName, Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build()); + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(0, 50)) + .mapToObj(n -> client().prepareIndex(indexName).setSource("num", n)).collect(toList())); + ensureGreen(indexName); + assertAcked(client().admin().indices().prepareClose(indexName)); + assertIndexIsClosed(indexName); + ensureGreen(indexName); + final String nodeWithPrimary = Iterables.get(internalCluster().nodesInclude(indexName), 0); + IndexShard shard = internalCluster().getInstance(IndicesService.class, nodeWithPrimary) + .indexService(resolveIndex(indexName)).getShard(0); + final String commitId; + try (Engine.SearcherSupplier searcherSupplier = shard.acquireSearcherSupplier(randomFrom(Engine.SearcherScope.values()))) { + assertNotNull(searcherSupplier.getCommitId()); + commitId = searcherSupplier.getCommitId(); + } + internalCluster().restartNode(nodeWithPrimary); + ensureGreen(indexName); + shard = internalCluster().getInstance(IndicesService.class, nodeWithPrimary).indexService(resolveIndex(indexName)).getShard(0); + try (Engine.SearcherSupplier searcherSupplier = shard.acquireSearcherSupplier(randomFrom(Engine.SearcherScope.values()))) { + assertThat(searcherSupplier.getCommitId(), equalTo(commitId)); + } + } + private static void closeIndices(final String... indices) { closeIndices(client().admin().indices().prepareClose(indices)); } diff --git a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java index b9f8bb98b4273c..53f1b3dc39c5e0 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java @@ -99,6 +99,7 @@ import java.text.ParseException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Base64; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -808,6 +809,13 @@ public void delete() { } } + /** + * Returns a base64 encoded string of the commit id of the given {@link SegmentInfos} + */ + public static String getCommitId(SegmentInfos segmentInfos) { + return Base64.getEncoder().encodeToString(segmentInfos.getId()); + } + /** * Return a {@link Bits} view of the provided scorer. * NOTE: that the returned {@link Bits} instance MUST be consumed in order. diff --git a/server/src/main/java/org/elasticsearch/index/engine/CommitStats.java b/server/src/main/java/org/elasticsearch/index/engine/CommitStats.java index 383adf2ac2e3ca..9f6acf712be68e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CommitStats.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CommitStats.java @@ -29,7 +29,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Base64; import java.util.Map; import static java.util.Map.entry; @@ -47,7 +46,7 @@ public CommitStats(SegmentInfos segmentInfos) { userData = Map.copyOf(segmentInfos.getUserData()); // lucene calls the current generation, last generation. generation = segmentInfos.getLastGeneration(); - id = Base64.getEncoder().encodeToString(segmentInfos.getId()); + id = Lucene.getCommitId(segmentInfos); numDocs = Lucene.getNumDocs(segmentInfos); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 5259c1da80a482..393f53914b1440 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1206,6 +1206,15 @@ public final void close() { protected abstract void doClose(); protected abstract Searcher acquireSearcherInternal(String source); + + /** + * Returns a commit id associated with this searcher if it's opened from an index commit; otherwise, return null. Two searchers + * with the same commit id must have identical Lucene level indices (i.e., identical segments with same docs using same doc-ids). + */ + @Nullable + public String getCommitId() { + return null; + } } public static final class Searcher extends IndexSearcher implements Releasable { diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 899863ee7c4781..2b16312b5dca66 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -79,6 +79,7 @@ public class ReadOnlyEngine extends Engine { private final boolean requireCompleteHistory; protected volatile TranslogStats translogStats; + protected final String commitId; /** * Creates a new ReadOnlyEngine. This ctor can also be used to open a read-only engine on top of an already opened @@ -110,6 +111,7 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats // yet this makes sure nobody else does. including some testing tools that try to be messy indexWriterLock = obtainLock ? directory.obtainLock(IndexWriter.WRITE_LOCK_NAME) : null; this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory); + this.commitId = Lucene.getCommitId(lastCommittedSegmentInfos); if (seqNoStats == null) { seqNoStats = buildSeqNoStats(config, lastCommittedSegmentInfos); ensureMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats); @@ -523,4 +525,25 @@ public ShardLongFieldRange getRawFieldRange(String field) throws IOException { } } + + @Override + public SearcherSupplier acquireSearcherSupplier(Function wrapper, SearcherScope scope) throws EngineException { + final SearcherSupplier delegate = super.acquireSearcherSupplier(wrapper, scope); + return new SearcherSupplier(Function.identity()) { + @Override + protected void doClose() { + delegate.close(); + } + + @Override + protected Searcher acquireSearcherInternal(String source) { + return delegate.acquireSearcherInternal(source); + } + + @Override + public String getCommitId() { + return commitId; + } + }; + } } diff --git a/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java b/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java index 3ebe32c933c8b8..aa122453f44f46 100644 --- a/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java +++ b/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java @@ -193,6 +193,11 @@ public Searcher acquireSearcherInternal(String source) { protected void doClose() { store.decRef(); } + + @Override + public String getCommitId() { + return commitId; + } }; }