Skip to content

Commit

Permalink
Handle blocked cluster state in file settings (#108481)
Browse files Browse the repository at this point in the history
When file settings is first loaded on a master node starting up, the
cluster state will likely be in a blocked state before it recovers. In
that case the file settings will not be processable since the metadata
will be missing in cluster state.

This commit makes watching for file settings not start until the cluster
state is in a recovered state. It also updates the the reserved state
update task to handle a similar case where a task may be queued and then
run at time when the node is no longer master, but before the watcher is
stopped.
  • Loading branch information
rjernst committed May 10, 2024
1 parent ef12b99 commit 2d14095
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ public void testReservedStatePersistsOnRestart() throws Exception {

FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode);

assertTrue(masterFileSettingsService.watching());
assertBusy(() -> assertTrue(masterFileSettingsService.watching()));

logger.info("--> write some settings");
writeJSONFile(masterNode, testJSON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.gateway.GatewayService;

import java.io.IOException;
import java.nio.file.Files;
Expand Down Expand Up @@ -58,7 +59,8 @@ protected void doStop() {
@Override
public final void clusterChanged(ClusterChangedEvent event) {
ClusterState clusterState = event.state();
if (clusterState.nodes().isLocalNodeElectedMaster()) {
if (clusterState.nodes().isLocalNodeElectedMaster()
&& clusterState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK) == false) {
synchronized (this) {
if (watching() || active == false) {
refreshExistingFileStateIfNeeded(clusterState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.cluster.metadata.ReservedStateErrorMetadata;
import org.elasticsearch.cluster.metadata.ReservedStateHandlerMetadata;
import org.elasticsearch.cluster.metadata.ReservedStateMetadata;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.reservedstate.NonStateTransformResult;
import org.elasticsearch.reservedstate.ReservedClusterStateHandler;
import org.elasticsearch.reservedstate.TransformState;
Expand Down Expand Up @@ -80,6 +81,13 @@ ActionListener<ActionResponse.Empty> listener() {
}

protected ClusterState execute(final ClusterState currentState) {
if (currentState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// If cluster state has become blocked, this task was submitted while the node was master but is now not master.
// The new master will re-read file settings, so whatever update was to be written here will be handled
// by the new master.
return currentState;
}

ReservedStateMetadata existingMetadata = currentState.metadata().reservedStateMetadata().get(namespace);
Map<String, Object> reservedState = stateChunk.state();
ReservedStateVersion reservedStateVersion = stateChunk.metadata();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.common.file;

import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.node.NodeRoleSettings;
import org.elasticsearch.test.ESTestCase;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
import java.nio.file.Path;
import java.util.concurrent.ExecutionException;

import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class MasterNodeFileWatchingServiceTests extends ESTestCase {

static final DiscoveryNode localNode = DiscoveryNodeUtils.create("local-node");
MasterNodeFileWatchingService testService;
Path watchedFile;
Runnable fileChangedCallback;

@Before
public void setupTestService() throws IOException {
watchedFile = createTempFile();
ClusterService clusterService = mock(ClusterService.class);
Settings settings = Settings.builder()
.put(NodeRoleSettings.NODE_ROLES_SETTING.getKey(), DiscoveryNodeRole.MASTER_ROLE.roleName())
.build();
when(clusterService.getSettings()).thenReturn(settings);
fileChangedCallback = () -> {};
testService = new MasterNodeFileWatchingService(clusterService, watchedFile) {

@Override
protected void processFileChanges() throws InterruptedException, ExecutionException, IOException {
fileChangedCallback.run();
}

@Override
protected void processInitialFileMissing() throws InterruptedException, ExecutionException, IOException {
// file always exists, but we don't care about the missing case for master node behavior
}
};
testService.start();
}

@After
public void stopTestService() {
testService.stop();
}

public void testBecomingMasterNodeStartsWatcher() {
ClusterState notRecoveredClusterState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId()))
.blocks(ClusterBlocks.builder().addGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK))
.build();
testService.clusterChanged(new ClusterChangedEvent("test", notRecoveredClusterState, ClusterState.EMPTY_STATE));
// just a master node isn't sufficient, cluster state also must be recovered
assertThat(testService.watching(), is(false));

ClusterState recoveredClusterState = ClusterState.builder(notRecoveredClusterState)
.blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK)
.build();
testService.clusterChanged(new ClusterChangedEvent("test", recoveredClusterState, notRecoveredClusterState));
// just a master node isn't sufficient, cluster state also must be recovered
assertThat(testService.watching(), is(true));
}

public void testChangingMasterStopsWatcher() {
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId()))
.build();
testService.clusterChanged(new ClusterChangedEvent("test", clusterState, ClusterState.EMPTY_STATE));
assertThat(testService.watching(), is(true));

final DiscoveryNode anotherNode = DiscoveryNodeUtils.create("another-node");
ClusterState differentMasterClusterState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(
DiscoveryNodes.builder().add(localNode).add(anotherNode).localNodeId(localNode.getId()).masterNodeId(anotherNode.getId())
)
.build();
testService.clusterChanged(new ClusterChangedEvent("test", differentMasterClusterState, clusterState));
assertThat(testService.watching(), is(false));
}

public void testBlockingClusterStateStopsWatcher() {
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId()))
.build();
testService.clusterChanged(new ClusterChangedEvent("test", clusterState, ClusterState.EMPTY_STATE));
assertThat(testService.watching(), is(true));

ClusterState blockedClusterState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId()))
.blocks(ClusterBlocks.builder().addGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK))
.build();
testService.clusterChanged(new ClusterChangedEvent("test", blockedClusterState, clusterState));
assertThat(testService.watching(), is(false));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.reservedstate.service;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.test.ESTestCase;

import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.sameInstance;

public class ReservedStateUpdateTaskTests extends ESTestCase {
public void testBlockedClusterState() {
var task = new ReservedStateUpdateTask("dummy", null, List.of(), Map.of(), List.of(), e -> {}, ActionListener.noop());
ClusterState notRecoveredClusterState = ClusterState.builder(ClusterName.DEFAULT)
.blocks(ClusterBlocks.builder().addGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK))
.build();
assertThat(task.execute(notRecoveredClusterState), sameInstance(notRecoveredClusterState));
}
}

0 comments on commit 2d14095

Please sign in to comment.