Skip to content

Commit

Permalink
Ignore disk watermarks on partial shards (elastic#68673)
Browse files Browse the repository at this point in the history
Today the disk threshold decider applies even to partially-restored
shards, which makes no sense since these shards effectively consume no
disk space of their own. With this commit the disk threshold decider now
freely permits the allocation of these shards.
  • Loading branch information
DaveCTurner authored and easyice committed Mar 25, 2021
1 parent 6a5fa78 commit a415cf6
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 4 deletions.
Expand Up @@ -72,6 +72,10 @@ public class DiskThresholdDecider extends AllocationDecider {
public static final Setting<Boolean> ENABLE_FOR_SINGLE_DATA_NODE =
Setting.boolSetting("cluster.routing.allocation.disk.watermark.enable_for_single_data_node", false, Setting.Property.NodeScope);

public static final Setting<Boolean> SETTING_IGNORE_DISK_WATERMARKS =
Setting.boolSetting("index.routing.allocation.disk.watermark.ignore", false,
Setting.Property.IndexScope, Setting.Property.PrivateIndex);

private final DiskThresholdSettings diskThresholdSettings;
private final boolean enableForSingleDataNode;

Expand Down Expand Up @@ -133,6 +137,9 @@ public static long sizeOfRelocatingShards(RoutingNode node, boolean subtractShar
private static final Decision YES_UNALLOCATED_PRIMARY_BETWEEN_WATERMARKS = Decision.single(Decision.Type.YES, NAME, "the node " +
"is above the low watermark, but less than the high watermark, and this primary shard has never been allocated before");

private static final Decision YES_DISK_WATERMARKS_IGNORED = Decision.single(Decision.Type.YES, NAME,
"disk watermarks are ignored on this index");

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
ImmutableOpenMap<String, DiskUsage> usages = allocation.clusterInfo().getNodeMostAvailableDiskUsages();
Expand All @@ -141,6 +148,10 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
return decision;
}

if (SETTING_IGNORE_DISK_WATERMARKS.get(allocation.metadata().index(shardRouting.index()).getSettings())) {
return YES_DISK_WATERMARKS_IGNORED;
}

final double usedDiskThresholdLow = 100.0 - diskThresholdSettings.getFreeDiskThresholdLow();
final double usedDiskThresholdHigh = 100.0 - diskThresholdSettings.getFreeDiskThresholdHigh();

Expand Down Expand Up @@ -308,6 +319,10 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
return decision;
}

if (SETTING_IGNORE_DISK_WATERMARKS.get(allocation.metadata().index(shardRouting.index()).getSettings())) {
return YES_DISK_WATERMARKS_IGNORED;
}

// subtractLeavingShards is passed as true here, since this is only for shards remaining, we will *eventually* have enough disk
// since shards are moving away. No new shards will be incoming since in canAllocate we pass false for this check.
final DiskUsageWithRelocations usage = getDiskUsage(node, allocation, usages, true);
Expand Down
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.cluster.metadata.MetadataIndexStateService;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
Expand Down Expand Up @@ -154,6 +155,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.FINAL_PIPELINE,
MetadataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING,
ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING,
DiskThresholdDecider.SETTING_IGNORE_DISK_WATERMARKS,

// validate that built-in similarities don't get redefined
Setting.groupSetting(
Expand Down
14 changes: 11 additions & 3 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Expand Up @@ -45,6 +45,7 @@
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.CheckedFunction;
Expand Down Expand Up @@ -1054,9 +1055,16 @@ public GetStats getStats() {

public StoreStats storeStats() {
try {
final RecoveryState recoveryState = this.recoveryState;
final long bytesStillToRecover = recoveryState == null ? -1L : recoveryState.getIndex().bytesStillToRecover();
return store.stats(bytesStillToRecover == -1 ? StoreStats.UNKNOWN_RESERVED_BYTES : bytesStillToRecover);
final long reservedBytes;
if (DiskThresholdDecider.SETTING_IGNORE_DISK_WATERMARKS.get(indexSettings.getSettings())) {
// if this shard has no disk footprint then it also needs no reserved space
reservedBytes = 0L;
} else {
final RecoveryState recoveryState = this.recoveryState;
final long bytesStillToRecover = recoveryState == null ? -1L : recoveryState.getIndex().bytesStillToRecover();
reservedBytes = bytesStillToRecover == -1 ? StoreStats.UNKNOWN_RESERVED_BYTES : bytesStillToRecover;
}
return store.stats(reservedBytes);
} catch (IOException e) {
failShard("Failing shard because of exception during storeStats", e);
throw new ElasticsearchException("io exception while building 'store stats'", e);
Expand Down
Expand Up @@ -463,4 +463,59 @@ public void testDiskUsageWithRelocations() {
assertThat(new DiskThresholdDecider.DiskUsageWithRelocations(
new DiskUsage("n", "n", "/dev/null", Long.MAX_VALUE, Long.MAX_VALUE), -10).getFreeBytes(), equalTo(Long.MAX_VALUE));
}

public void testDecidesYesIfWatermarksIgnored() {
ClusterSettings nss = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
DiskThresholdDecider decider = new DiskThresholdDecider(Settings.EMPTY, nss);

Metadata metadata = Metadata.builder()
.put(IndexMetadata.builder("test")
.settings(settings(Version.CURRENT).put(DiskThresholdDecider.SETTING_IGNORE_DISK_WATERMARKS.getKey(), true))
.numberOfShards(1)
.numberOfReplicas(1))
.build();

final Index index = metadata.index("test").getIndex();

ShardRouting test_0 = ShardRouting.newUnassigned(new ShardId(index, 0), true, EmptyStoreRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
DiscoveryNode node_0 = new DiscoveryNode("node_0", buildNewFakeTransportAddress(), Collections.emptyMap(),
new HashSet<>(DiscoveryNodeRole.BUILT_IN_ROLES), Version.CURRENT);
DiscoveryNode node_1 = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Collections.emptyMap(),
new HashSet<>(DiscoveryNodeRole.BUILT_IN_ROLES), Version.CURRENT);

RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metadata.index("test"))
.build();

ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata).routingTable(routingTable).build();

clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
.add(node_0)
.add(node_1)
).build();

// actual test -- after all that bloat :)
ImmutableOpenMap.Builder<String, DiskUsage> allFullUsages = ImmutableOpenMap.builder();
allFullUsages.put("node_0", new DiskUsage("node_0", "node_0", "_na_", 100, 0)); // all full
allFullUsages.put("node_1", new DiskUsage("node_1", "node_1", "_na_", 100, 0)); // all full

ImmutableOpenMap.Builder<String, Long> shardSizes = ImmutableOpenMap.builder();
shardSizes.put("[test][0][p]", 10L); // 10 bytes
final ImmutableOpenMap<String, DiskUsage> usages = allFullUsages.build();
final ClusterInfo clusterInfo = new ClusterInfo(usages, usages, shardSizes.build(), ImmutableOpenMap.of(), ImmutableOpenMap.of());
RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Collections.singleton(decider)),
clusterState.getRoutingNodes(), clusterState, clusterInfo, null, System.nanoTime());
allocation.debugDecision(true);
final RoutingNode routingNode = new RoutingNode("node_0", node_0);
Decision decision = decider.canAllocate(test_0, routingNode, allocation);
assertThat(decision.type(), equalTo(Decision.Type.YES));
assertThat(decision.getExplanation(), containsString("disk watermarks are ignored on this index"));

decision = decider.canRemain(test_0.initialize(node_0.getId(), null, 0L).moveToStarted(), routingNode, allocation);
assertThat(decision.type(), equalTo(Decision.Type.YES));
assertThat(decision.getExplanation(), containsString("disk watermarks are ignored on this index"));
}

}
Expand Up @@ -27,6 +27,8 @@
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -38,11 +40,13 @@
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.shard.IndexLongFieldRange;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.RepositoryData;
Expand Down Expand Up @@ -77,6 +81,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
Expand All @@ -98,6 +103,7 @@
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.oneOf;
import static org.hamcrest.Matchers.sameInstance;

public class SearchableSnapshotsIntegTests extends BaseSearchableSnapshotsIntegTestCase {
Expand Down Expand Up @@ -480,6 +486,28 @@ public void testCreateAndRestorePartialSearchableSnapshot() throws Exception {
expectedDataTiersPreference = getDataTiersPreference(MountSearchableSnapshotRequest.Storage.SHARED_CACHE);
}

final AtomicBoolean statsWatcherRunning = new AtomicBoolean(true);
final Thread statsWatcher = new Thread(() -> {
while (statsWatcherRunning.get()) {
final IndicesStatsResponse indicesStatsResponse;
try {
indicesStatsResponse = client().admin().indices().prepareStats(restoredIndexName).clear().setStore(true).get();
} catch (IndexNotFoundException | IndexClosedException e) {
continue;
// ok
}

for (ShardStats shardStats : indicesStatsResponse.getShards()) {
assertThat(
shardStats.getShardRouting().toString(),
shardStats.getStats().getStore().getReservedSize().getBytes(),
equalTo(0L)
);
}
}
}, "test-stats-watcher");
statsWatcher.start();

final MountSearchableSnapshotRequest req = new MountSearchableSnapshotRequest(
restoredIndexName,
fsRepoName,
Expand All @@ -494,6 +522,9 @@ public void testCreateAndRestorePartialSearchableSnapshot() throws Exception {
final RestoreSnapshotResponse restoreSnapshotResponse = client().execute(MountSearchableSnapshotAction.INSTANCE, req).get();
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0));

statsWatcherRunning.set(false);
statsWatcher.join();

final Settings settings = client().admin()
.indices()
.prepareGetSettings(restoredIndexName)
Expand All @@ -509,6 +540,8 @@ public void testCreateAndRestorePartialSearchableSnapshot() throws Exception {
assertThat(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.get(settings).toString(), equalTo("false"));
assertThat(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(settings), equalTo(expectedReplicas));
assertThat(DataTierAllocationDecider.INDEX_ROUTING_PREFER_SETTING.get(settings), equalTo(expectedDataTiersPreference));
assertTrue(SearchableSnapshots.SNAPSHOT_PARTIAL_SETTING.get(settings));
assertTrue(DiskThresholdDecider.SETTING_IGNORE_DISK_WATERMARKS.get(settings));

assertTotalHits(restoredIndexName, originalAllHits, originalBarHits);
assertRecoveryStats(restoredIndexName, false);
Expand Down Expand Up @@ -548,6 +581,29 @@ public void testCreateAndRestorePartialSearchableSnapshot() throws Exception {
assertThat(client().admin().indices().prepareGetAliases(aliasName).get().getAliases().size(), equalTo(1));
assertTotalHits(aliasName, originalAllHits, originalBarHits);

final Decision diskDeciderDecision = client().admin()
.cluster()
.prepareAllocationExplain()
.setIndex(restoredIndexName)
.setShard(0)
.setPrimary(true)
.setIncludeYesDecisions(true)
.get()
.getExplanation()
.getShardAllocationDecision()
.getMoveDecision()
.getCanRemainDecision()
.getDecisions()
.stream()
.filter(d -> d.label().equals(DiskThresholdDecider.NAME))
.findFirst()
.orElseThrow();
assertThat(diskDeciderDecision.type(), equalTo(Decision.Type.YES));
assertThat(
diskDeciderDecision.getExplanation(),
oneOf("disk watermarks are ignored on this index", "there is only a single data node present")
);

internalCluster().fullRestart();
assertTotalHits(restoredIndexName, originalAllHits, originalBarHits);
assertRecoveryStats(restoredIndexName, false);
Expand Down
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -129,7 +130,8 @@ private static Settings buildIndexSettings(
.put(INDEX_RECOVERY_TYPE_SETTING.getKey(), SearchableSnapshotsConstants.SNAPSHOT_RECOVERY_STATE_FACTORY_KEY);

if (storage == MountSearchableSnapshotRequest.Storage.SHARED_CACHE) {
settings.put(SearchableSnapshots.SNAPSHOT_PARTIAL_SETTING.getKey(), true);
settings.put(SearchableSnapshots.SNAPSHOT_PARTIAL_SETTING.getKey(), true)
.put(DiskThresholdDecider.SETTING_IGNORE_DISK_WATERMARKS.getKey(), true);
}

return settings.build();
Expand Down

0 comments on commit a415cf6

Please sign in to comment.