Skip to content

Commit

Permalink
Adding a wait-for-status parameter to the health_report API
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke committed Apr 26, 2024
1 parent 13dd169 commit 6d3683e
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 11 deletions.
6 changes: 6 additions & 0 deletions docs/reference/health/health.asciidoc
@@ -1,3 +1,4 @@

[[health-api]]
=== Health API
++++
Expand Down Expand Up @@ -98,6 +99,11 @@ for health status set `verbose` to `false` to disable the more expensive analysi
`1000` affected indices and `1000` affected nodes).
Defaults to `1000`.

`wait_for_status`::
(Optional, string) One of `green`, `yellow` or `red`. Will wait until the status of
the cluster changes to the one provided or better, i.e. `green` > `yellow` > `red`.
By default, will not wait for any status.

[role="child_attributes"]
[[health-api-response-body]]
==== {api-response-body-title}
Expand Down
Expand Up @@ -47,6 +47,15 @@
"type": "int",
"description": "Limit the number of affected resources the health API returns",
"default": 1000
},
"wait_for_status":{
"type":"enum",
"options":[
"green",
"yellow",
"red"
],
"description":"Wait until cluster is in a specific state"
}
}
}
Expand Down
Expand Up @@ -5,7 +5,8 @@
reason: "health was added in 8.2.0, master_is_stable in 8.4.0, and REST API updated in 8.7"

- do:
health_report: { }
health_report:
wait_for_status: green

- is_true: cluster_name
- match: { status: "green" }
Expand Down
89 changes: 79 additions & 10 deletions server/src/main/java/org/elasticsearch/health/GetHealthAction.java
Expand Up @@ -8,6 +8,8 @@

package org.elasticsearch.health;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
Expand All @@ -18,16 +20,22 @@
import org.elasticsearch.client.internal.ParentTaskAssigningClient;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.health.node.selection.HealthNode;
import org.elasticsearch.health.stats.HealthApiStats;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.ToXContent;

Expand All @@ -37,6 +45,8 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;

import static org.elasticsearch.action.ValidateActions.addValidationError;

Expand Down Expand Up @@ -152,6 +162,7 @@ public static class Request extends ActionRequest {
private final String indicatorName;
private final boolean verbose;
private final int size;
private HealthStatus waitForStatus;

public Request(boolean verbose, int size) {
this(null, verbose, size);
Expand Down Expand Up @@ -181,26 +192,36 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId,
public void writeTo(StreamOutput out) throws IOException {
TransportAction.localOnly();
}

public void waitForStatus(HealthStatus waitForStatus) {
this.waitForStatus = waitForStatus;
}
}

public static class LocalAction extends TransportAction<Request, Response> {

private final ClusterService clusterService;
private final ThreadPool threadPool;
private final ExecutorService executor;
private final HealthService healthService;
private final NodeClient client;
private final HealthApiStats healthApiStats;
protected Logger logger = LogManager.getLogger(GetHealthAction.class);

@Inject
public LocalAction(
ActionFilters actionFilters,
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
HealthService healthService,
NodeClient client,
HealthApiStats healthApiStats
) {
super(NAME, actionFilters, transportService.getTaskManager());
this.clusterService = clusterService;
this.threadPool = threadPool;
this.executor = threadPool.executor(ThreadPool.Names.MANAGEMENT);
this.healthService = healthService;
this.client = client;
this.healthApiStats = healthApiStats;
Expand All @@ -213,21 +234,69 @@ protected void doExecute(Task task, Request request, ActionListener<Response> re
if (cancellableTask.notifyIfCancelled(responseListener)) {
return;
}

Consumer<ClusterState> onNewClusterStateAfterDelay = clusterState -> doExecute(task, request, responseListener);
healthService.getHealth(
new ParentTaskAssigningClient(client, clusterService.localNode(), task),
request.indicatorName,
request.verbose,
request.size,
responseListener.map(healthIndicatorResults -> {
Response response = new Response(
clusterService.getClusterName(),
healthIndicatorResults,
request.indicatorName == null
);
healthApiStats.track(request.verbose, response);
return response;
})
new ActionListener<>() {
@Override
public void onResponse(List<HealthIndicatorResult> healthIndicatorResults) {
if (request.waitForStatus == null
|| request.waitForStatus.value() <= HealthStatus.merge(
healthIndicatorResults.stream().map(HealthIndicatorResult::status)
).value()) {
Response response = getResponseFromHealthIndicators(healthIndicatorResults);
responseListener.onResponse(response);
} else {
final ClusterStateObserver observer = new ClusterStateObserver(
clusterService.state(),
clusterService,
null,
logger,
threadPool.getThreadContext()
);
final ClusterStateObserver.Listener stateListener = new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState newState) {
executor.execute(() -> onNewClusterStateAfterDelay.accept(newState));
}

@Override
public void onClusterServiceClose() {
responseListener.onFailure(new NodeClosedException(clusterService.localNode()));
}

@Override
public void onTimeout(TimeValue timeout) {
Response response = getResponseFromHealthIndicators(healthIndicatorResults);
responseListener.onResponse(response);
}
};
observer.waitForNextChange(
stateListener,
clusterState -> HealthNode.findHealthNode(clusterState) != null,
null
);
}
}

private Response getResponseFromHealthIndicators(List<HealthIndicatorResult> healthIndicatorResults) {
Response response = new Response(
clusterService.getClusterName(),
healthIndicatorResults,
request.indicatorName == null
);
healthApiStats.track(request.verbose, response);
return response;
}

@Override
public void onFailure(Exception e) {
responseListener.onFailure(e);
}
}
);
}
}
Expand Down
Expand Up @@ -18,6 +18,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Locale;

import static org.elasticsearch.rest.RestRequest.Method.GET;

Expand Down Expand Up @@ -45,6 +46,10 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
boolean verbose = request.paramAsBoolean(VERBOSE_PARAM, true);
int size = request.paramAsInt(SIZE_PARAM, 1000);
GetHealthAction.Request getHealthRequest = new GetHealthAction.Request(indicatorName, verbose, size);
String waitForStatus = request.param("wait_for_status");
if (waitForStatus != null) {
getHealthRequest.waitForStatus(HealthStatus.valueOf(waitForStatus.toUpperCase(Locale.ROOT)));
}
return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute(
GetHealthAction.INSTANCE,
getHealthRequest,
Expand Down

0 comments on commit 6d3683e

Please sign in to comment.