From 138cf89b460835668d615be5419d75bb0e44e69c Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 8 Feb 2021 11:52:06 +0000 Subject: [PATCH] Avoid async cache-size fetch on partial shards (#68644) Today we perform an async fetch for every searchable snapshot shard while allocating it, so that we can prefer to allocate it to the node that holds the warmest cache for that shard. For partial shards, there is no persistently-cached data to reuse, so we can skip the async fetch. --- .../SearchableSnapshotAllocator.java | 7 +++ ...rtSearchableSnapshotCacheStoresAction.java | 6 ++ .../SearchableSnapshotAllocatorTests.java | 63 +++++++++++++++++-- 3 files changed, 71 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotAllocator.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotAllocator.java index 3cec34198c1c3..3ff1e1b70c330 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotAllocator.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotAllocator.java @@ -56,6 +56,7 @@ import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_PARTIAL_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_REPOSITORY_NAME_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING; @@ -262,6 +263,12 @@ public int getNumberOfInFlightFetches() { private AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation) { final ShardId shardId = shard.shardId(); final Settings indexSettings = allocation.metadata().index(shard.index()).getSettings(); + + if (SNAPSHOT_PARTIAL_SETTING.get(indexSettings)) { + // cached data for partial indices is not persistent, no need to fetch it + return new AsyncShardFetch.FetchResult<>(shardId, Collections.emptyMap(), Collections.emptySet()); + } + final SnapshotId snapshotId = new SnapshotId( SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings), SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/cache/TransportSearchableSnapshotCacheStoresAction.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/cache/TransportSearchableSnapshotCacheStoresAction.java index b8989aa902c9b..09035aa0b2738 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/cache/TransportSearchableSnapshotCacheStoresAction.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/cache/TransportSearchableSnapshotCacheStoresAction.java @@ -29,6 +29,9 @@ import java.io.IOException; import java.util.List; +import java.util.Optional; + +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_PARTIAL_SETTING; public class TransportSearchableSnapshotCacheStoresAction extends TransportNodesAction< TransportSearchableSnapshotCacheStoresAction.Request, @@ -87,6 +90,9 @@ protected NodeCacheFilesMetadata newNodeResponse(StreamInput in) throws IOExcept @Override protected NodeCacheFilesMetadata nodeOperation(NodeRequest request) { assert cacheService != null; + assert Optional.ofNullable(clusterService.state().metadata().index(request.shardId.getIndex())) + .map(indexMetadata -> SNAPSHOT_PARTIAL_SETTING.get(indexMetadata.getSettings())) + .orElse(false) == false : request.shardId + " is partial, should not be fetching its cached size"; return new NodeCacheFilesMetadata(clusterService.localNode(), cacheService.getCachedSize(request.shardId, request.snapshotId)); } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotAllocatorTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotAllocatorTests.java index 85e4347c8fd1c..a40bb484113c9 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotAllocatorTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotAllocatorTests.java @@ -46,9 +46,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; +import java.util.function.UnaryOperator; import java.util.stream.Collectors; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_PARTIAL_SETTING; import static org.hamcrest.Matchers.empty; public class SearchableSnapshotAllocatorTests extends ESAllocationTestCase { @@ -153,7 +155,7 @@ public void doE Request request, ActionListener listener ) { - throw new AssertionError("Expecting no requests but received [" + action + "]"); + throw new AssertionError("Expecting no requests but received [" + action.name() + "]"); } }; @@ -167,15 +169,66 @@ public void doE assertTrue(allocation.routingTable().index(shardId.getIndex()).allPrimaryShardsUnassigned()); } + public void testNoFetchesForPartialIndex() { + final ShardId shardId = new ShardId("test", "_na_", 0); + final List nodes = randomList(1, 10, () -> newNode("node-" + UUIDs.randomBase64UUID(random()))); + final DiscoveryNode localNode = randomFrom(nodes); + final Settings localNodeSettings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getName()).build(); + + final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(localNodeSettings, random()); + + final Metadata metadata = buildSingleShardIndexMetadata(shardId, builder -> builder.put(SNAPSHOT_PARTIAL_SETTING.getKey(), true)); + final RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + routingTableBuilder.addAsRestore(metadata.index(shardId.getIndex()), randomSnapshotSource(shardId)); + + final ClusterState state = buildClusterState(nodes, metadata, routingTableBuilder); + final RoutingAllocation allocation = buildAllocation( + deterministicTaskQueue, + state, + randomNonNegativeLong(), + yesAllocationDeciders() + ); + + final Client client = new NoOpNodeClient(deterministicTaskQueue.getThreadPool()) { + @Override + public void doExecute( + ActionType action, + Request request, + ActionListener listener + ) { + throw new AssertionError("Expecting no requests but received [" + action.name() + "]"); + } + }; + + final SearchableSnapshotAllocator allocator = new SearchableSnapshotAllocator( + client, + (reason, priority, listener) -> { throw new AssertionError("Expecting no reroutes"); } + ); + allocateAllUnassigned(allocation, allocator); + assertFalse(allocation.routingNodesChanged()); + assertThat(allocation.routingNodes().assignedShards(shardId), empty()); + assertTrue(allocation.routingTable().index(shardId.getIndex()).allPrimaryShardsUnassigned()); + } + private static Metadata buildSingleShardIndexMetadata(ShardId shardId) { + return buildSingleShardIndexMetadata(shardId, UnaryOperator.identity()); + } + + private static Metadata buildSingleShardIndexMetadata(ShardId shardId, UnaryOperator extraSettings) { return Metadata.builder() .put( IndexMetadata.builder(shardId.getIndexName()) .settings( - settings(Version.CURRENT).put( - ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING.getKey(), - SearchableSnapshotAllocator.ALLOCATOR_NAME - ).put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), SearchableSnapshotsConstants.SNAPSHOT_DIRECTORY_FACTORY_KEY) + extraSettings.apply( + settings(Version.CURRENT).put( + ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING.getKey(), + SearchableSnapshotAllocator.ALLOCATOR_NAME + ) + .put( + IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), + SearchableSnapshotsConstants.SNAPSHOT_DIRECTORY_FACTORY_KEY + ) + ) ) .numberOfShards(1) .numberOfReplicas(0)