From a415cf614b04294a8b0c4fbed9e77e67547b7422 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 8 Feb 2021 17:11:26 +0000 Subject: [PATCH] Ignore disk watermarks on partial shards (#68673) Today the disk threshold decider applies even to partially-restored shards, which makes no sense since these shards effectively consume no disk space of their own. With this commit the disk threshold decider now freely permits the allocation of these shards. --- .../decider/DiskThresholdDecider.java | 15 +++++ .../common/settings/IndexScopedSettings.java | 2 + .../elasticsearch/index/shard/IndexShard.java | 14 ++++- .../DiskThresholdDeciderUnitTests.java | 55 ++++++++++++++++++ .../SearchableSnapshotsIntegTests.java | 56 +++++++++++++++++++ ...ransportMountSearchableSnapshotAction.java | 4 +- 6 files changed, 142 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java index 36eff3e3a5b4e7..bf5df649e83b7f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java @@ -72,6 +72,10 @@ public class DiskThresholdDecider extends AllocationDecider { public static final Setting ENABLE_FOR_SINGLE_DATA_NODE = Setting.boolSetting("cluster.routing.allocation.disk.watermark.enable_for_single_data_node", false, Setting.Property.NodeScope); + public static final Setting SETTING_IGNORE_DISK_WATERMARKS = + Setting.boolSetting("index.routing.allocation.disk.watermark.ignore", false, + Setting.Property.IndexScope, Setting.Property.PrivateIndex); + private final DiskThresholdSettings diskThresholdSettings; private final boolean enableForSingleDataNode; @@ -133,6 +137,9 @@ public static long sizeOfRelocatingShards(RoutingNode node, boolean subtractShar private static final Decision YES_UNALLOCATED_PRIMARY_BETWEEN_WATERMARKS = Decision.single(Decision.Type.YES, NAME, "the node " + "is above the low watermark, but less than the high watermark, and this primary shard has never been allocated before"); + private static final Decision YES_DISK_WATERMARKS_IGNORED = Decision.single(Decision.Type.YES, NAME, + "disk watermarks are ignored on this index"); + @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { ImmutableOpenMap usages = allocation.clusterInfo().getNodeMostAvailableDiskUsages(); @@ -141,6 +148,10 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing return decision; } + if (SETTING_IGNORE_DISK_WATERMARKS.get(allocation.metadata().index(shardRouting.index()).getSettings())) { + return YES_DISK_WATERMARKS_IGNORED; + } + final double usedDiskThresholdLow = 100.0 - diskThresholdSettings.getFreeDiskThresholdLow(); final double usedDiskThresholdHigh = 100.0 - diskThresholdSettings.getFreeDiskThresholdHigh(); @@ -308,6 +319,10 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl return decision; } + if (SETTING_IGNORE_DISK_WATERMARKS.get(allocation.metadata().index(shardRouting.index()).getSettings())) { + return YES_DISK_WATERMARKS_IGNORED; + } + // subtractLeavingShards is passed as true here, since this is only for shards remaining, we will *eventually* have enough disk // since shards are moving away. No new shards will be incoming since in canAllocate we pass false for this check. final DiskUsageWithRelocations usage = getDiskUsage(node, allocation, usages, true); diff --git a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index b9aeefccb231b8..393d28844df3b0 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -11,6 +11,7 @@ import org.elasticsearch.cluster.metadata.MetadataIndexStateService; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator; +import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; @@ -154,6 +155,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.FINAL_PIPELINE, MetadataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING, ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING, + DiskThresholdDecider.SETTING_IGNORE_DISK_WATERMARKS, // validate that built-in similarities don't get redefined Setting.groupSetting( diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index ee0a9e9f1e152e..49249f71e1350b 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -45,6 +45,7 @@ import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider; import org.elasticsearch.common.Booleans; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.CheckedFunction; @@ -1054,9 +1055,16 @@ public GetStats getStats() { public StoreStats storeStats() { try { - final RecoveryState recoveryState = this.recoveryState; - final long bytesStillToRecover = recoveryState == null ? -1L : recoveryState.getIndex().bytesStillToRecover(); - return store.stats(bytesStillToRecover == -1 ? StoreStats.UNKNOWN_RESERVED_BYTES : bytesStillToRecover); + final long reservedBytes; + if (DiskThresholdDecider.SETTING_IGNORE_DISK_WATERMARKS.get(indexSettings.getSettings())) { + // if this shard has no disk footprint then it also needs no reserved space + reservedBytes = 0L; + } else { + final RecoveryState recoveryState = this.recoveryState; + final long bytesStillToRecover = recoveryState == null ? -1L : recoveryState.getIndex().bytesStillToRecover(); + reservedBytes = bytesStillToRecover == -1 ? StoreStats.UNKNOWN_RESERVED_BYTES : bytesStillToRecover; + } + return store.stats(reservedBytes); } catch (IOException e) { failShard("Failing shard because of exception during storeStats", e); throw new ElasticsearchException("io exception while building 'store stats'", e); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java index fe12fb0ecaa245..1430a0e3e81a0d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java @@ -463,4 +463,59 @@ public void testDiskUsageWithRelocations() { assertThat(new DiskThresholdDecider.DiskUsageWithRelocations( new DiskUsage("n", "n", "/dev/null", Long.MAX_VALUE, Long.MAX_VALUE), -10).getFreeBytes(), equalTo(Long.MAX_VALUE)); } + + public void testDecidesYesIfWatermarksIgnored() { + ClusterSettings nss = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + DiskThresholdDecider decider = new DiskThresholdDecider(Settings.EMPTY, nss); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test") + .settings(settings(Version.CURRENT).put(DiskThresholdDecider.SETTING_IGNORE_DISK_WATERMARKS.getKey(), true)) + .numberOfShards(1) + .numberOfReplicas(1)) + .build(); + + final Index index = metadata.index("test").getIndex(); + + ShardRouting test_0 = ShardRouting.newUnassigned(new ShardId(index, 0), true, EmptyStoreRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo")); + DiscoveryNode node_0 = new DiscoveryNode("node_0", buildNewFakeTransportAddress(), Collections.emptyMap(), + new HashSet<>(DiscoveryNodeRole.BUILT_IN_ROLES), Version.CURRENT); + DiscoveryNode node_1 = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Collections.emptyMap(), + new HashSet<>(DiscoveryNodeRole.BUILT_IN_ROLES), Version.CURRENT); + + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(metadata.index("test")) + .build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata).routingTable(routingTable).build(); + + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() + .add(node_0) + .add(node_1) + ).build(); + + // actual test -- after all that bloat :) + ImmutableOpenMap.Builder allFullUsages = ImmutableOpenMap.builder(); + allFullUsages.put("node_0", new DiskUsage("node_0", "node_0", "_na_", 100, 0)); // all full + allFullUsages.put("node_1", new DiskUsage("node_1", "node_1", "_na_", 100, 0)); // all full + + ImmutableOpenMap.Builder shardSizes = ImmutableOpenMap.builder(); + shardSizes.put("[test][0][p]", 10L); // 10 bytes + final ImmutableOpenMap usages = allFullUsages.build(); + final ClusterInfo clusterInfo = new ClusterInfo(usages, usages, shardSizes.build(), ImmutableOpenMap.of(), ImmutableOpenMap.of()); + RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Collections.singleton(decider)), + clusterState.getRoutingNodes(), clusterState, clusterInfo, null, System.nanoTime()); + allocation.debugDecision(true); + final RoutingNode routingNode = new RoutingNode("node_0", node_0); + Decision decision = decider.canAllocate(test_0, routingNode, allocation); + assertThat(decision.type(), equalTo(Decision.Type.YES)); + assertThat(decision.getExplanation(), containsString("disk watermarks are ignored on this index")); + + decision = decider.canRemain(test_0.initialize(node_0.getId(), null, 0L).moveToStarted(), routingNode, allocation); + assertThat(decision.type(), equalTo(Decision.Type.YES)); + assertThat(decision.getExplanation(), containsString("disk watermarks are ignored on this index")); + } + } diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java index 0bede379dab0bd..e81ad34dd6e5b5 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java @@ -27,6 +27,8 @@ import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.decider.Decision; +import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; @@ -38,11 +40,13 @@ import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexModule; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.shard.IndexLongFieldRange; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardPath; +import org.elasticsearch.indices.IndexClosedException; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.repositories.RepositoryData; @@ -77,6 +81,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -98,6 +103,7 @@ import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.oneOf; import static org.hamcrest.Matchers.sameInstance; public class SearchableSnapshotsIntegTests extends BaseSearchableSnapshotsIntegTestCase { @@ -480,6 +486,28 @@ public void testCreateAndRestorePartialSearchableSnapshot() throws Exception { expectedDataTiersPreference = getDataTiersPreference(MountSearchableSnapshotRequest.Storage.SHARED_CACHE); } + final AtomicBoolean statsWatcherRunning = new AtomicBoolean(true); + final Thread statsWatcher = new Thread(() -> { + while (statsWatcherRunning.get()) { + final IndicesStatsResponse indicesStatsResponse; + try { + indicesStatsResponse = client().admin().indices().prepareStats(restoredIndexName).clear().setStore(true).get(); + } catch (IndexNotFoundException | IndexClosedException e) { + continue; + // ok + } + + for (ShardStats shardStats : indicesStatsResponse.getShards()) { + assertThat( + shardStats.getShardRouting().toString(), + shardStats.getStats().getStore().getReservedSize().getBytes(), + equalTo(0L) + ); + } + } + }, "test-stats-watcher"); + statsWatcher.start(); + final MountSearchableSnapshotRequest req = new MountSearchableSnapshotRequest( restoredIndexName, fsRepoName, @@ -494,6 +522,9 @@ public void testCreateAndRestorePartialSearchableSnapshot() throws Exception { final RestoreSnapshotResponse restoreSnapshotResponse = client().execute(MountSearchableSnapshotAction.INSTANCE, req).get(); assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0)); + statsWatcherRunning.set(false); + statsWatcher.join(); + final Settings settings = client().admin() .indices() .prepareGetSettings(restoredIndexName) @@ -509,6 +540,8 @@ public void testCreateAndRestorePartialSearchableSnapshot() throws Exception { assertThat(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.get(settings).toString(), equalTo("false")); assertThat(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(settings), equalTo(expectedReplicas)); assertThat(DataTierAllocationDecider.INDEX_ROUTING_PREFER_SETTING.get(settings), equalTo(expectedDataTiersPreference)); + assertTrue(SearchableSnapshots.SNAPSHOT_PARTIAL_SETTING.get(settings)); + assertTrue(DiskThresholdDecider.SETTING_IGNORE_DISK_WATERMARKS.get(settings)); assertTotalHits(restoredIndexName, originalAllHits, originalBarHits); assertRecoveryStats(restoredIndexName, false); @@ -548,6 +581,29 @@ public void testCreateAndRestorePartialSearchableSnapshot() throws Exception { assertThat(client().admin().indices().prepareGetAliases(aliasName).get().getAliases().size(), equalTo(1)); assertTotalHits(aliasName, originalAllHits, originalBarHits); + final Decision diskDeciderDecision = client().admin() + .cluster() + .prepareAllocationExplain() + .setIndex(restoredIndexName) + .setShard(0) + .setPrimary(true) + .setIncludeYesDecisions(true) + .get() + .getExplanation() + .getShardAllocationDecision() + .getMoveDecision() + .getCanRemainDecision() + .getDecisions() + .stream() + .filter(d -> d.label().equals(DiskThresholdDecider.NAME)) + .findFirst() + .orElseThrow(); + assertThat(diskDeciderDecision.type(), equalTo(Decision.Type.YES)); + assertThat( + diskDeciderDecision.getExplanation(), + oneOf("disk watermarks are ignored on this index", "there is only a single data node present") + ); + internalCluster().fullRestart(); assertTotalHits(restoredIndexName, originalAllHits, originalBarHits); assertRecoveryStats(restoredIndexName, false); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportMountSearchableSnapshotAction.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportMountSearchableSnapshotAction.java index 4082a25cd39660..3fa05b4b89c077 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportMountSearchableSnapshotAction.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportMountSearchableSnapshotAction.java @@ -20,6 +20,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator; +import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -129,7 +130,8 @@ private static Settings buildIndexSettings( .put(INDEX_RECOVERY_TYPE_SETTING.getKey(), SearchableSnapshotsConstants.SNAPSHOT_RECOVERY_STATE_FACTORY_KEY); if (storage == MountSearchableSnapshotRequest.Storage.SHARED_CACHE) { - settings.put(SearchableSnapshots.SNAPSHOT_PARTIAL_SETTING.getKey(), true); + settings.put(SearchableSnapshots.SNAPSHOT_PARTIAL_SETTING.getKey(), true) + .put(DiskThresholdDecider.SETTING_IGNORE_DISK_WATERMARKS.getKey(), true); } return settings.build();