Skip to content

Commit

Permalink
Properly support capabilities checks with mixed version clusters (#10…
Browse files Browse the repository at this point in the history
…8513)

When a capabilities check is done on a cluster with some nodes that do not support capabilities, always return false
  • Loading branch information
thecoop committed May 10, 2024
1 parent b6874a5 commit d6cb12e
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.upgrades;

import com.carrotsearch.randomizedtesting.annotations.Name;

import org.elasticsearch.client.ResponseException;
import org.elasticsearch.core.UpdateForV9;
import org.junit.Before;

import java.io.IOException;
import java.util.List;

import static org.elasticsearch.test.hamcrest.OptionalMatchers.isPresentWith;

@UpdateForV9
public class NodesCapabilitiesUpgradeIT extends AbstractRollingUpgradeTestCase {

private static Boolean upgradingBeforeCapabilities;

public NodesCapabilitiesUpgradeIT(@Name("upgradedNodes") int upgradedNodes) {
super(upgradedNodes);
}

@Before
public void checkBeforeHasNoCapabilities() throws IOException {
if (upgradingBeforeCapabilities == null) {
// try to do a _capabilities query on a node before we upgrade
try {
clusterHasCapability("GET", "_capabilities", List.of(), List.of());
upgradingBeforeCapabilities = false;
} catch (ResponseException e) {
if (e.getResponse().getStatusLine().getStatusCode() == 400) {
upgradingBeforeCapabilities = true;
} else {
throw e;
}
}
}

assumeTrue("Only valid when upgrading from versions without capabilities API", upgradingBeforeCapabilities);
}

public void testCapabilitiesReturnsFalsePartiallyUpgraded() throws IOException {
if (isMixedCluster()) {
// capabilities checks should either fail (if talking to an old node),
// or return false as not all nodes have the API (if talking to a new node)
try {
assertThat(
"Upgraded node should report no capabilities supported",
clusterHasCapability("GET", "_capabilities", List.of(), List.of()),
isPresentWith(false)
);
} catch (ResponseException e) {
if (e.getResponse().getStatusLine().getStatusCode() != 400) {
// throw explicitly to capture exception too
throw new AssertionError("Old node should not have the capabilities API", e);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
---
"Capabilities API":

- skip:
awaits_fix: "https://github.com/elastic/elasticsearch/issues/108509"

- requires:
capabilities:
- method: GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,13 @@ protected void writeNodesTo(StreamOutput out, List<NodeCapability> nodes) throws
}

public Optional<Boolean> isSupported() {
// if there are any failures, we don't know if it is fully supported by all nodes in the cluster
if (hasFailures() || getNodes().isEmpty()) return Optional.empty();
if (hasFailures() || getNodes().isEmpty()) {
// there's no nodes in the response (uh? what about ourselves?)
// or there's a problem (hopefully transient) talking to one or more nodes.
// We don't have enough information to decide if it's supported or not, so return unknown
return Optional.empty();
}

return Optional.of(getNodes().stream().allMatch(NodeCapability::isSupported));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.action.admin.cluster.node.capabilities;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
Expand All @@ -18,15 +19,18 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.features.FeatureService;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.admin.cluster.RestNodesCapabilitiesAction;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.Set;

public class TransportNodesCapabilitiesAction extends TransportNodesAction<
Expand All @@ -38,14 +42,16 @@ public class TransportNodesCapabilitiesAction extends TransportNodesAction<
public static final ActionType<NodesCapabilitiesResponse> TYPE = new ActionType<>("cluster:monitor/nodes/capabilities");

private final RestController restController;
private final FeatureService featureService;

@Inject
public TransportNodesCapabilitiesAction(
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
RestController restController
RestController restController,
FeatureService featureService
) {
super(
TYPE.name(),
Expand All @@ -56,6 +62,23 @@ public TransportNodesCapabilitiesAction(
threadPool.executor(ThreadPool.Names.MANAGEMENT)
);
this.restController = restController;
this.featureService = featureService;
}

@Override
protected void doExecute(Task task, NodesCapabilitiesRequest request, ActionListener<NodesCapabilitiesResponse> listener) {
if (featureService.clusterHasFeature(clusterService.state(), RestNodesCapabilitiesAction.CAPABILITIES_ACTION) == false) {
// not everything in the cluster supports capabilities.
// Therefore we don't support whatever it is we're being asked for
listener.onResponse(new NodesCapabilitiesResponse(clusterService.getClusterName(), List.of(), List.of()) {
@Override
public Optional<Boolean> isSupported() {
return Optional.of(false);
}
});
} else {
super.doExecute(task, request, listener);
}
}

@Override
Expand Down
7 changes: 7 additions & 0 deletions server/src/main/java/org/elasticsearch/rest/RestFeatures.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,17 @@
import org.elasticsearch.features.FeatureSpecification;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.rest.action.admin.cluster.RestClusterGetSettingsAction;
import org.elasticsearch.rest.action.admin.cluster.RestNodesCapabilitiesAction;

import java.util.Map;
import java.util.Set;

public class RestFeatures implements FeatureSpecification {
@Override
public Set<NodeFeature> getFeatures() {
return Set.of(RestNodesCapabilitiesAction.CAPABILITIES_ACTION);
}

@Override
public Map<NodeFeature, Version> getHistoricalFeatures() {
return Map.of(RestClusterGetSettingsAction.SUPPORTS_GET_SETTINGS_ACTION, Version.V_8_3_0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.admin.cluster.node.capabilities.NodesCapabilitiesRequest;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.Scope;
Expand All @@ -26,6 +27,8 @@
@ServerlessScope(Scope.INTERNAL)
public class RestNodesCapabilitiesAction extends BaseRestHandler {

public static final NodeFeature CAPABILITIES_ACTION = new NodeFeature("rest.capabilities_action");

@Override
public List<Route> routes() {
return List.of(new Route(RestRequest.Method.GET, "/_capabilities"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
Expand Down Expand Up @@ -261,6 +262,43 @@ protected static Set<String> readVersionsFromNodesInfo(RestClient adminClient) t
.collect(Collectors.toUnmodifiableMap(entry -> entry.getKey().toString(), entry -> (Map<?, ?>) entry.getValue()));
}

/**
* Does the cluster being tested support the set of capabilities
* for specified path and method.
*/
protected static Optional<Boolean> clusterHasCapability(
String method,
String path,
Collection<String> parameters,
Collection<String> capabilities
) throws IOException {
return clusterHasCapability(adminClient, method, path, parameters, capabilities);
}

/**
* Does the cluster on the other side of {@code client} support the set
* of capabilities for specified path and method.
*/
protected static Optional<Boolean> clusterHasCapability(
RestClient client,
String method,
String path,
Collection<String> parameters,
Collection<String> capabilities
) throws IOException {
Request request = new Request("GET", "_capabilities");
request.addParameter("method", method);
request.addParameter("path", path);
if (parameters.isEmpty() == false) {
request.addParameter("parameters", String.join(",", parameters));
}
if (capabilities.isEmpty() == false) {
request.addParameter("capabilities", String.join(",", capabilities));
}
Map<String, Object> response = entityAsMap(client.performRequest(request).getEntity());
return Optional.ofNullable((Boolean) response.get("supported"));
}

protected static boolean clusterHasFeature(String featureId) {
return testFeatureService.clusterHasFeature(featureId);
}
Expand Down

0 comments on commit d6cb12e

Please sign in to comment.