Skip to content

Commit

Permalink
Backport serialization fix of put/delete shutdown requests to 8.14 (#…
Browse files Browse the repository at this point in the history
…108251)

Backport of #107862 to 8.14
  • Loading branch information
thecoop committed May 3, 2024
1 parent 176061e commit 810e86b
Show file tree
Hide file tree
Showing 5 changed files with 311 additions and 0 deletions.
2 changes: 2 additions & 0 deletions server/src/main/java/org/elasticsearch/TransportVersions.java
Expand Up @@ -133,6 +133,7 @@ static TransportVersion def(int id) {
public static final TransportVersion INDEX_REQUEST_NORMALIZED_BYTES_PARSED = def(8_593_00_0);
public static final TransportVersion INGEST_GRAPH_STRUCTURE_EXCEPTION = def(8_594_00_0);
public static final TransportVersion ML_MODEL_IN_SERVICE_SETTINGS = def(8_595_00_0);
public static final TransportVersion SHUTDOWN_REQUEST_TIMEOUTS_FIX_8_13 = def(8_595_00_1);
// 8.14.0+
public static final TransportVersion RANDOM_AGG_SHARD_SEED = def(8_596_00_0);
public static final TransportVersion ESQL_TIMINGS = def(8_597_00_0);
Expand Down Expand Up @@ -175,6 +176,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ML_INFERENCE_AZURE_OPENAI_EMBEDDINGS = def(8_634_00_0);
public static final TransportVersion ILM_SHRINK_ENABLE_WRITE = def(8_635_00_0);
public static final TransportVersion GEOIP_CACHE_STATS = def(8_636_00_0);
public static final TransportVersion SHUTDOWN_REQUEST_TIMEOUTS_FIX_8_14 = def(8_636_00_1);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Expand Up @@ -7,13 +7,15 @@

package org.elasticsearch.xpack.shutdown;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;

Expand All @@ -35,11 +37,22 @@ public Request(String nodeId) {
}

public Request(StreamInput in) throws IOException {
if (in.getTransportVersion().isPatchFrom(TransportVersions.SHUTDOWN_REQUEST_TIMEOUTS_FIX_8_13)
|| in.getTransportVersion().onOrAfter(TransportVersions.SHUTDOWN_REQUEST_TIMEOUTS_FIX_8_14)) {
// effectively super(in):
setParentTask(TaskId.readFromStream(in));
masterNodeTimeout(in.readTimeValue());
timeout(in.readTimeValue());
}
this.nodeId = in.readString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().isPatchFrom(TransportVersions.SHUTDOWN_REQUEST_TIMEOUTS_FIX_8_13)
|| out.getTransportVersion().onOrAfter(TransportVersions.SHUTDOWN_REQUEST_TIMEOUTS_FIX_8_14)) {
super.writeTo(out);
}
out.writeString(this.nodeId);
}

Expand Down
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.shutdown;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
Expand All @@ -17,6 +18,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.XContentParser;
Expand Down Expand Up @@ -96,6 +98,13 @@ public Request(
}

public Request(StreamInput in) throws IOException {
if (in.getTransportVersion().isPatchFrom(TransportVersions.SHUTDOWN_REQUEST_TIMEOUTS_FIX_8_13)
|| in.getTransportVersion().onOrAfter(TransportVersions.SHUTDOWN_REQUEST_TIMEOUTS_FIX_8_14)) {
// effectively super(in):
setParentTask(TaskId.readFromStream(in));
masterNodeTimeout(in.readTimeValue());
timeout(in.readTimeValue());
}
this.nodeId = in.readString();
this.type = in.readEnum(SingleNodeShutdownMetadata.Type.class);
this.reason = in.readString();
Expand All @@ -114,6 +123,10 @@ public Request(StreamInput in) throws IOException {

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().isPatchFrom(TransportVersions.SHUTDOWN_REQUEST_TIMEOUTS_FIX_8_13)
|| out.getTransportVersion().onOrAfter(TransportVersions.SHUTDOWN_REQUEST_TIMEOUTS_FIX_8_14)) {
super.writeTo(out);
}
out.writeString(nodeId);
if (out.getTransportVersion().before(REPLACE_SHUTDOWN_TYPE_ADDED_VERSION)
&& this.type == SingleNodeShutdownMetadata.Type.REPLACE) {
Expand Down
@@ -0,0 +1,87 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.shutdown;

import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;

public class DeleteShutdownRequestTests extends AbstractWireSerializingTestCase<DeleteShutdownRequestTests.RequestWrapper> {

/**
* Wraps a {@link DeleteShutdownNodeAction.Request} to add proper equality checks
*/
record RequestWrapper(String nodeId, TaskId parentTask, TimeValue masterNodeTimeout, TimeValue ackTimeout) implements Writeable {
@Override
public void writeTo(StreamOutput out) throws IOException {
final var request = new DeleteShutdownNodeAction.Request(nodeId);
request.setParentTask(parentTask);
request.timeout(ackTimeout);
request.masterNodeTimeout(masterNodeTimeout);
request.writeTo(out);
}
}

@Override
protected Writeable.Reader<RequestWrapper> instanceReader() {
return in -> {
final var request = new DeleteShutdownNodeAction.Request(in);
return new RequestWrapper(request.getNodeId(), request.getParentTask(), request.masterNodeTimeout(), request.ackTimeout());
};
}

@Override
protected RequestWrapper createTestInstance() {
return new RequestWrapper(
randomIdentifier(),
randomTaskId(),
TimeValue.parseTimeValue(randomTimeValue(), getTestName()),
TimeValue.parseTimeValue(randomTimeValue(), getTestName())
);
}

private static TaskId randomTaskId() {
return randomBoolean() ? TaskId.EMPTY_TASK_ID : new TaskId(randomIdentifier(), randomNonNegativeLong());
}

@Override
protected RequestWrapper mutateInstance(RequestWrapper instance) {
return switch (between(1, 4)) {
case 1 -> new RequestWrapper(
randomValueOtherThan(instance.nodeId, ESTestCase::randomIdentifier),
instance.parentTask,
instance.ackTimeout,
instance.masterNodeTimeout
);
case 2 -> new RequestWrapper(
instance.nodeId,
randomValueOtherThan(instance.parentTask, DeleteShutdownRequestTests::randomTaskId),
instance.ackTimeout,
instance.masterNodeTimeout
);
case 3 -> new RequestWrapper(
instance.nodeId,
instance.parentTask,
randomValueOtherThan(instance.ackTimeout, () -> TimeValue.parseTimeValue(randomTimeValue(), getTestName())),
instance.masterNodeTimeout
);
case 4 -> new RequestWrapper(
instance.nodeId,
instance.parentTask,
instance.ackTimeout,
randomValueOtherThan(instance.masterNodeTimeout, () -> TimeValue.parseTimeValue(randomTimeValue(), getTestName()))
);
default -> throw new AssertionError("impossible");
};
}
}
@@ -0,0 +1,196 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.shutdown;

import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;

public class PutShutdownRequestTests extends AbstractWireSerializingTestCase<PutShutdownRequestTests.RequestWrapper> {

/**
* Wraps a {@link org.elasticsearch.xpack.shutdown.PutShutdownNodeAction.Request} to add proper equality checks
*/
record RequestWrapper(
String nodeId,
SingleNodeShutdownMetadata.Type type,
String reason,
TimeValue allocationDelay,
String targetNodeName,
TimeValue gracePeriod,
TaskId parentTask,
TimeValue masterNodeTimeout,
TimeValue ackTimeout
) implements Writeable {
@Override
public void writeTo(StreamOutput out) throws IOException {
final var request = new PutShutdownNodeAction.Request(nodeId, type, reason, allocationDelay, targetNodeName, gracePeriod);
request.setParentTask(parentTask);
request.timeout(ackTimeout);
request.masterNodeTimeout(masterNodeTimeout);
request.writeTo(out);
}
}

@Override
protected Writeable.Reader<RequestWrapper> instanceReader() {
return in -> {
final var request = new PutShutdownNodeAction.Request(in);
return new RequestWrapper(
request.getNodeId(),
request.getType(),
request.getReason(),
request.getAllocationDelay(),
request.getTargetNodeName(),
request.getGracePeriod(),
request.getParentTask(),
request.masterNodeTimeout(),
request.ackTimeout()
);
};
}

@Override
protected RequestWrapper createTestInstance() {
return new RequestWrapper(
randomIdentifier(),
randomFrom(SingleNodeShutdownMetadata.Type.values()),
randomIdentifier(),
randomOptionalTimeValue(),
randomOptionalIdentifier(),
randomOptionalTimeValue(),
randomTaskId(),
TimeValue.parseTimeValue(randomTimeValue(), getTestName()),
TimeValue.parseTimeValue(randomTimeValue(), getTestName())
);
}

private static String randomOptionalIdentifier() {
return randomBoolean() ? null : randomIdentifier();
}

private TimeValue randomOptionalTimeValue() {
return randomBoolean() ? null : TimeValue.parseTimeValue(randomTimeValue(), getTestName());
}

private static TaskId randomTaskId() {
return randomBoolean() ? TaskId.EMPTY_TASK_ID : new TaskId(randomIdentifier(), randomNonNegativeLong());
}

@Override
protected RequestWrapper mutateInstance(RequestWrapper instance) {
return switch (between(1, 9)) {
case 1 -> new RequestWrapper(
randomValueOtherThan(instance.nodeId, ESTestCase::randomIdentifier),
instance.type,
instance.reason,
instance.allocationDelay,
instance.targetNodeName,
instance.gracePeriod,
instance.parentTask,
instance.ackTimeout,
instance.masterNodeTimeout
);
case 2 -> new RequestWrapper(
instance.nodeId,
randomValueOtherThan(instance.type, () -> randomFrom(SingleNodeShutdownMetadata.Type.values())),
instance.reason,
instance.allocationDelay,
instance.targetNodeName,
instance.gracePeriod,
instance.parentTask,
instance.ackTimeout,
instance.masterNodeTimeout
);
case 3 -> new RequestWrapper(
instance.nodeId,
instance.type,
randomValueOtherThan(instance.reason, ESTestCase::randomIdentifier),
instance.allocationDelay,
instance.targetNodeName,
instance.gracePeriod,
instance.parentTask,
instance.ackTimeout,
instance.masterNodeTimeout
);
case 4 -> new RequestWrapper(
instance.nodeId,
instance.type,
instance.reason,
randomValueOtherThan(instance.allocationDelay, this::randomOptionalTimeValue),
instance.targetNodeName,
instance.gracePeriod,
instance.parentTask,
instance.ackTimeout,
instance.masterNodeTimeout
);
case 5 -> new RequestWrapper(
instance.nodeId,
instance.type,
instance.reason,
instance.allocationDelay,
randomValueOtherThan(instance.targetNodeName, PutShutdownRequestTests::randomOptionalIdentifier),
instance.gracePeriod,
instance.parentTask,
instance.ackTimeout,
instance.masterNodeTimeout
);
case 6 -> new RequestWrapper(
instance.nodeId,
instance.type,
instance.reason,
instance.allocationDelay,
instance.targetNodeName,
randomValueOtherThan(instance.gracePeriod, this::randomOptionalTimeValue),
instance.parentTask,
instance.ackTimeout,
instance.masterNodeTimeout
);
case 7 -> new RequestWrapper(
instance.nodeId,
instance.type,
instance.reason,
instance.allocationDelay,
instance.targetNodeName,
instance.gracePeriod,
randomValueOtherThan(instance.parentTask, PutShutdownRequestTests::randomTaskId),
instance.ackTimeout,
instance.masterNodeTimeout
);
case 8 -> new RequestWrapper(
instance.nodeId,
instance.type,
instance.reason,
instance.allocationDelay,
instance.targetNodeName,
instance.gracePeriod,
instance.parentTask,
randomValueOtherThan(instance.ackTimeout, () -> TimeValue.parseTimeValue(randomTimeValue(), getTestName())),
instance.masterNodeTimeout
);
case 9 -> new RequestWrapper(
instance.nodeId,
instance.type,
instance.reason,
instance.allocationDelay,
instance.targetNodeName,
instance.gracePeriod,
instance.parentTask,
instance.ackTimeout,
randomValueOtherThan(instance.masterNodeTimeout, () -> TimeValue.parseTimeValue(randomTimeValue(), getTestName()))
);
default -> throw new AssertionError("impossible");
};
}
}

0 comments on commit 810e86b

Please sign in to comment.