diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java index 36eff3e3a5b4e7..bf5df649e83b7f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java @@ -72,6 +72,10 @@ public class DiskThresholdDecider extends AllocationDecider { public static final Setting ENABLE_FOR_SINGLE_DATA_NODE = Setting.boolSetting("cluster.routing.allocation.disk.watermark.enable_for_single_data_node", false, Setting.Property.NodeScope); + public static final Setting SETTING_IGNORE_DISK_WATERMARKS = + Setting.boolSetting("index.routing.allocation.disk.watermark.ignore", false, + Setting.Property.IndexScope, Setting.Property.PrivateIndex); + private final DiskThresholdSettings diskThresholdSettings; private final boolean enableForSingleDataNode; @@ -133,6 +137,9 @@ public static long sizeOfRelocatingShards(RoutingNode node, boolean subtractShar private static final Decision YES_UNALLOCATED_PRIMARY_BETWEEN_WATERMARKS = Decision.single(Decision.Type.YES, NAME, "the node " + "is above the low watermark, but less than the high watermark, and this primary shard has never been allocated before"); + private static final Decision YES_DISK_WATERMARKS_IGNORED = Decision.single(Decision.Type.YES, NAME, + "disk watermarks are ignored on this index"); + @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { ImmutableOpenMap usages = allocation.clusterInfo().getNodeMostAvailableDiskUsages(); @@ -141,6 +148,10 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing return decision; } + if (SETTING_IGNORE_DISK_WATERMARKS.get(allocation.metadata().index(shardRouting.index()).getSettings())) { + return YES_DISK_WATERMARKS_IGNORED; + } + final double usedDiskThresholdLow = 100.0 - diskThresholdSettings.getFreeDiskThresholdLow(); final double usedDiskThresholdHigh = 100.0 - diskThresholdSettings.getFreeDiskThresholdHigh(); @@ -308,6 +319,10 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl return decision; } + if (SETTING_IGNORE_DISK_WATERMARKS.get(allocation.metadata().index(shardRouting.index()).getSettings())) { + return YES_DISK_WATERMARKS_IGNORED; + } + // subtractLeavingShards is passed as true here, since this is only for shards remaining, we will *eventually* have enough disk // since shards are moving away. No new shards will be incoming since in canAllocate we pass false for this check. final DiskUsageWithRelocations usage = getDiskUsage(node, allocation, usages, true); diff --git a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index b9aeefccb231b8..393d28844df3b0 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -11,6 +11,7 @@ import org.elasticsearch.cluster.metadata.MetadataIndexStateService; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator; +import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; @@ -154,6 +155,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.FINAL_PIPELINE, MetadataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING, ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING, + DiskThresholdDecider.SETTING_IGNORE_DISK_WATERMARKS, // validate that built-in similarities don't get redefined Setting.groupSetting( diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index ee0a9e9f1e152e..49249f71e1350b 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -45,6 +45,7 @@ import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider; import org.elasticsearch.common.Booleans; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.CheckedFunction; @@ -1054,9 +1055,16 @@ public GetStats getStats() { public StoreStats storeStats() { try { - final RecoveryState recoveryState = this.recoveryState; - final long bytesStillToRecover = recoveryState == null ? -1L : recoveryState.getIndex().bytesStillToRecover(); - return store.stats(bytesStillToRecover == -1 ? StoreStats.UNKNOWN_RESERVED_BYTES : bytesStillToRecover); + final long reservedBytes; + if (DiskThresholdDecider.SETTING_IGNORE_DISK_WATERMARKS.get(indexSettings.getSettings())) { + // if this shard has no disk footprint then it also needs no reserved space + reservedBytes = 0L; + } else { + final RecoveryState recoveryState = this.recoveryState; + final long bytesStillToRecover = recoveryState == null ? -1L : recoveryState.getIndex().bytesStillToRecover(); + reservedBytes = bytesStillToRecover == -1 ? StoreStats.UNKNOWN_RESERVED_BYTES : bytesStillToRecover; + } + return store.stats(reservedBytes); } catch (IOException e) { failShard("Failing shard because of exception during storeStats", e); throw new ElasticsearchException("io exception while building 'store stats'", e); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java index fe12fb0ecaa245..1430a0e3e81a0d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java @@ -463,4 +463,59 @@ public void testDiskUsageWithRelocations() { assertThat(new DiskThresholdDecider.DiskUsageWithRelocations( new DiskUsage("n", "n", "/dev/null", Long.MAX_VALUE, Long.MAX_VALUE), -10).getFreeBytes(), equalTo(Long.MAX_VALUE)); } + + public void testDecidesYesIfWatermarksIgnored() { + ClusterSettings nss = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + DiskThresholdDecider decider = new DiskThresholdDecider(Settings.EMPTY, nss); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test") + .settings(settings(Version.CURRENT).put(DiskThresholdDecider.SETTING_IGNORE_DISK_WATERMARKS.getKey(), true)) + .numberOfShards(1) + .numberOfReplicas(1)) + .build(); + + final Index index = metadata.index("test").getIndex(); + + ShardRouting test_0 = ShardRouting.newUnassigned(new ShardId(index, 0), true, EmptyStoreRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo")); + DiscoveryNode node_0 = new DiscoveryNode("node_0", buildNewFakeTransportAddress(), Collections.emptyMap(), + new HashSet<>(DiscoveryNodeRole.BUILT_IN_ROLES), Version.CURRENT); + DiscoveryNode node_1 = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Collections.emptyMap(), + new HashSet<>(DiscoveryNodeRole.BUILT_IN_ROLES), Version.CURRENT); + + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(metadata.index("test")) + .build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata).routingTable(routingTable).build(); + + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() + .add(node_0) + .add(node_1) + ).build(); + + // actual test -- after all that bloat :) + ImmutableOpenMap.Builder allFullUsages = ImmutableOpenMap.builder(); + allFullUsages.put("node_0", new DiskUsage("node_0", "node_0", "_na_", 100, 0)); // all full + allFullUsages.put("node_1", new DiskUsage("node_1", "node_1", "_na_", 100, 0)); // all full + + ImmutableOpenMap.Builder shardSizes = ImmutableOpenMap.builder(); + shardSizes.put("[test][0][p]", 10L); // 10 bytes + final ImmutableOpenMap usages = allFullUsages.build(); + final ClusterInfo clusterInfo = new ClusterInfo(usages, usages, shardSizes.build(), ImmutableOpenMap.of(), ImmutableOpenMap.of()); + RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Collections.singleton(decider)), + clusterState.getRoutingNodes(), clusterState, clusterInfo, null, System.nanoTime()); + allocation.debugDecision(true); + final RoutingNode routingNode = new RoutingNode("node_0", node_0); + Decision decision = decider.canAllocate(test_0, routingNode, allocation); + assertThat(decision.type(), equalTo(Decision.Type.YES)); + assertThat(decision.getExplanation(), containsString("disk watermarks are ignored on this index")); + + decision = decider.canRemain(test_0.initialize(node_0.getId(), null, 0L).moveToStarted(), routingNode, allocation); + assertThat(decision.type(), equalTo(Decision.Type.YES)); + assertThat(decision.getExplanation(), containsString("disk watermarks are ignored on this index")); + } + } diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java index 0bede379dab0bd..e81ad34dd6e5b5 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java @@ -27,6 +27,8 @@ import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.decider.Decision; +import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; @@ -38,11 +40,13 @@ import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexModule; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.shard.IndexLongFieldRange; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardPath; +import org.elasticsearch.indices.IndexClosedException; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.repositories.RepositoryData; @@ -77,6 +81,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -98,6 +103,7 @@ import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.oneOf; import static org.hamcrest.Matchers.sameInstance; public class SearchableSnapshotsIntegTests extends BaseSearchableSnapshotsIntegTestCase { @@ -480,6 +486,28 @@ public void testCreateAndRestorePartialSearchableSnapshot() throws Exception { expectedDataTiersPreference = getDataTiersPreference(MountSearchableSnapshotRequest.Storage.SHARED_CACHE); } + final AtomicBoolean statsWatcherRunning = new AtomicBoolean(true); + final Thread statsWatcher = new Thread(() -> { + while (statsWatcherRunning.get()) { + final IndicesStatsResponse indicesStatsResponse; + try { + indicesStatsResponse = client().admin().indices().prepareStats(restoredIndexName).clear().setStore(true).get(); + } catch (IndexNotFoundException | IndexClosedException e) { + continue; + // ok + } + + for (ShardStats shardStats : indicesStatsResponse.getShards()) { + assertThat( + shardStats.getShardRouting().toString(), + shardStats.getStats().getStore().getReservedSize().getBytes(), + equalTo(0L) + ); + } + } + }, "test-stats-watcher"); + statsWatcher.start(); + final MountSearchableSnapshotRequest req = new MountSearchableSnapshotRequest( restoredIndexName, fsRepoName, @@ -494,6 +522,9 @@ public void testCreateAndRestorePartialSearchableSnapshot() throws Exception { final RestoreSnapshotResponse restoreSnapshotResponse = client().execute(MountSearchableSnapshotAction.INSTANCE, req).get(); assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0)); + statsWatcherRunning.set(false); + statsWatcher.join(); + final Settings settings = client().admin() .indices() .prepareGetSettings(restoredIndexName) @@ -509,6 +540,8 @@ public void testCreateAndRestorePartialSearchableSnapshot() throws Exception { assertThat(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.get(settings).toString(), equalTo("false")); assertThat(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(settings), equalTo(expectedReplicas)); assertThat(DataTierAllocationDecider.INDEX_ROUTING_PREFER_SETTING.get(settings), equalTo(expectedDataTiersPreference)); + assertTrue(SearchableSnapshots.SNAPSHOT_PARTIAL_SETTING.get(settings)); + assertTrue(DiskThresholdDecider.SETTING_IGNORE_DISK_WATERMARKS.get(settings)); assertTotalHits(restoredIndexName, originalAllHits, originalBarHits); assertRecoveryStats(restoredIndexName, false); @@ -548,6 +581,29 @@ public void testCreateAndRestorePartialSearchableSnapshot() throws Exception { assertThat(client().admin().indices().prepareGetAliases(aliasName).get().getAliases().size(), equalTo(1)); assertTotalHits(aliasName, originalAllHits, originalBarHits); + final Decision diskDeciderDecision = client().admin() + .cluster() + .prepareAllocationExplain() + .setIndex(restoredIndexName) + .setShard(0) + .setPrimary(true) + .setIncludeYesDecisions(true) + .get() + .getExplanation() + .getShardAllocationDecision() + .getMoveDecision() + .getCanRemainDecision() + .getDecisions() + .stream() + .filter(d -> d.label().equals(DiskThresholdDecider.NAME)) + .findFirst() + .orElseThrow(); + assertThat(diskDeciderDecision.type(), equalTo(Decision.Type.YES)); + assertThat( + diskDeciderDecision.getExplanation(), + oneOf("disk watermarks are ignored on this index", "there is only a single data node present") + ); + internalCluster().fullRestart(); assertTotalHits(restoredIndexName, originalAllHits, originalBarHits); assertRecoveryStats(restoredIndexName, false); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportMountSearchableSnapshotAction.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportMountSearchableSnapshotAction.java index 4082a25cd39660..3fa05b4b89c077 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportMountSearchableSnapshotAction.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportMountSearchableSnapshotAction.java @@ -20,6 +20,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator; +import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -129,7 +130,8 @@ private static Settings buildIndexSettings( .put(INDEX_RECOVERY_TYPE_SETTING.getKey(), SearchableSnapshotsConstants.SNAPSHOT_RECOVERY_STATE_FACTORY_KEY); if (storage == MountSearchableSnapshotRequest.Storage.SHARED_CACHE) { - settings.put(SearchableSnapshots.SNAPSHOT_PARTIAL_SETTING.getKey(), true); + settings.put(SearchableSnapshots.SNAPSHOT_PARTIAL_SETTING.getKey(), true) + .put(DiskThresholdDecider.SETTING_IGNORE_DISK_WATERMARKS.getKey(), true); } return settings.build();