Skip to content

Commit

Permalink
Fix timeouts in Datastream Lifecycle module (#108621)
Browse files Browse the repository at this point in the history
Relates #107984
  • Loading branch information
DaveCTurner committed May 15, 2024
1 parent 2dceafc commit 6922441
Show file tree
Hide file tree
Showing 10 changed files with 31 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.datastreams.CreateDataStreamAction;
import org.elasticsearch.action.datastreams.lifecycle.GetDataStreamLifecycleAction;
import org.elasticsearch.action.datastreams.lifecycle.PutDataStreamLifecycleAction;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.datastreams.DataStreamsPlugin;
Expand Down Expand Up @@ -229,6 +230,8 @@ public void testDeleteLifecycle() throws Exception {
// Remove lifecycle from concrete data stream
{
DeleteDataStreamLifecycleAction.Request deleteDataLifecycleRequest = new DeleteDataStreamLifecycleAction.Request(
TimeValue.THIRTY_SECONDS,
AcknowledgedRequest.DEFAULT_ACK_TIMEOUT,
new String[] { "with-lifecycle-1" }
);
assertThat(
Expand All @@ -254,6 +257,8 @@ public void testDeleteLifecycle() throws Exception {
// Remove lifecycle from all data streams
{
DeleteDataStreamLifecycleAction.Request deleteDataLifecycleRequest = new DeleteDataStreamLifecycleAction.Request(
TimeValue.THIRTY_SECONDS,
AcknowledgedRequest.DEFAULT_ACK_TIMEOUT,
new String[] { "*" }
);
assertThat(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ public void testSystemDataStreamRetention() throws Exception {
client().execute(
PutDataStreamGlobalRetentionAction.INSTANCE,
new PutDataStreamGlobalRetentionAction.Request(
TimeValue.THIRTY_SECONDS,
TimeValue.timeValueSeconds(globalRetentionSeconds),
TimeValue.timeValueSeconds(globalRetentionSeconds)
)
Expand Down Expand Up @@ -290,7 +291,10 @@ public void testSystemDataStreamRetention() throws Exception {

client().execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(SYSTEM_DATA_STREAM_NAME)).actionGet();
} finally {
client().execute(DeleteDataStreamGlobalRetentionAction.INSTANCE, new DeleteDataStreamGlobalRetentionAction.Request());
client().execute(
DeleteDataStreamGlobalRetentionAction.INSTANCE,
new DeleteDataStreamGlobalRetentionAction.Request(TimeValue.THIRTY_SECONDS)
);
}
} finally {
dataStreamLifecycleServices.forEach(dataStreamLifecycleService -> dataStreamLifecycleService.setNowSupplier(clock::millis));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ public void testSystemExplainLifecycle() throws Exception {
client().execute(
PutDataStreamGlobalRetentionAction.INSTANCE,
new PutDataStreamGlobalRetentionAction.Request(
TimeValue.THIRTY_SECONDS,
TimeValue.timeValueSeconds(globalRetentionSeconds),
TimeValue.timeValueSeconds(globalRetentionSeconds)
)
Expand Down Expand Up @@ -260,7 +261,10 @@ public void testSystemExplainLifecycle() throws Exception {
);
}
} finally {
client().execute(DeleteDataStreamGlobalRetentionAction.INSTANCE, new DeleteDataStreamGlobalRetentionAction.Request());
client().execute(
DeleteDataStreamGlobalRetentionAction.INSTANCE,
new DeleteDataStreamGlobalRetentionAction.Request(TimeValue.THIRTY_SECONDS)
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.datastreams.lifecycle.UpdateDataStreamGlobalRetentionService;
import org.elasticsearch.features.FeatureService;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -64,8 +65,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(dryRun);
}

public Request() {
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT);
public Request(TimeValue masterNodeTimeout) {
super(masterNodeTimeout);
}

public boolean dryRun() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -47,8 +48,8 @@ public void writeTo(StreamOutput out) throws IOException {
indicesOptions.writeIndicesOptions(out);
}

public Request(String[] names) {
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT, DEFAULT_ACK_TIMEOUT);
public Request(TimeValue masterNodeTimeout, TimeValue ackTimeout, String[] names) {
super(masterNodeTimeout, ackTimeout);
this.names = names;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,6 @@ private GetDataStreamGlobalRetentionAction() {/* no instances */}

public static final class Request extends MasterNodeReadRequest<Request> {

public Request() {
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT);
}

public Request(StreamInput in) throws IOException {
super(in);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ public Request(StreamInput in) throws IOException {
super(in);
}

public Request() {
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT);
public Request(TimeValue masterNodeTimeout) {
super(masterNodeTimeout);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ObjectParser;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;
import java.util.List;
Expand All @@ -53,34 +50,9 @@ private PutDataStreamGlobalRetentionAction() {/* no instances */}

public static final class Request extends MasterNodeRequest<Request> {

public static final ConstructingObjectParser<PutDataStreamGlobalRetentionAction.Request, Void> PARSER =
new ConstructingObjectParser<>(
"put_data_stream_global_retention_request",
args -> new PutDataStreamGlobalRetentionAction.Request((TimeValue) args[0], (TimeValue) args[1])
);

static {
PARSER.declareField(
ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.textOrNull(), DataStreamGlobalRetention.DEFAULT_RETENTION_FIELD.getPreferredName()),
DataStreamGlobalRetention.DEFAULT_RETENTION_FIELD,
ObjectParser.ValueType.STRING_OR_NULL
);
PARSER.declareField(
ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.textOrNull(), DataStreamGlobalRetention.MAX_RETENTION_FIELD.getPreferredName()),
DataStreamGlobalRetention.MAX_RETENTION_FIELD,
ObjectParser.ValueType.STRING_OR_NULL
);
}

private final DataStreamGlobalRetention globalRetention;
private boolean dryRun = false;

public static PutDataStreamGlobalRetentionAction.Request parseRequest(XContentParser parser) {
return PARSER.apply(parser, null);
}

public Request(StreamInput in) throws IOException {
super(in);
globalRetention = DataStreamGlobalRetention.read(in);
Expand All @@ -107,8 +79,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(dryRun);
}

public Request(@Nullable TimeValue defaultRetention, @Nullable TimeValue maxRetention) {
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT);
public Request(TimeValue masterNodeTimeout, @Nullable TimeValue defaultRetention, @Nullable TimeValue maxRetention) {
super(masterNodeTimeout);
this.globalRetention = new DataStreamGlobalRetention(defaultRetention, maxRetention);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ public List<Route> routes() {

@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) {
GetDataStreamLifecycleStatsAction.Request request = new GetDataStreamLifecycleStatsAction.Request();
request.masterNodeTimeout(getMasterNodeTimeout(restRequest));
final var request = new GetDataStreamLifecycleStatsAction.Request(getMasterNodeTimeout(restRequest));
return channel -> client.execute(
GetDataStreamLifecycleStatsAction.INSTANCE,
request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.datastreams.lifecycle.rest;

import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.datastreams.lifecycle.action.DeleteDataStreamLifecycleAction;
Expand All @@ -20,6 +21,7 @@
import java.util.List;

import static org.elasticsearch.rest.RestRequest.Method.DELETE;
import static org.elasticsearch.rest.RestUtils.getMasterNodeTimeout;

@ServerlessScope(Scope.INTERNAL)
public class RestDeleteDataStreamLifecycleAction extends BaseRestHandler {
Expand All @@ -36,7 +38,9 @@ public List<Route> routes() {

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
DeleteDataStreamLifecycleAction.Request deleteDataLifecycleRequest = new DeleteDataStreamLifecycleAction.Request(
final var deleteDataLifecycleRequest = new DeleteDataStreamLifecycleAction.Request(
getMasterNodeTimeout(request),
request.paramAsTime("timeout", AcknowledgedRequest.DEFAULT_ACK_TIMEOUT),
Strings.splitStringByCommaToArray(request.param("name"))
);
deleteDataLifecycleRequest.indicesOptions(IndicesOptions.fromRequest(request, deleteDataLifecycleRequest.indicesOptions()));
Expand Down

0 comments on commit 6922441

Please sign in to comment.