Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix serialization of put/delete shutdown requests #107862

Merged
6 changes: 6 additions & 0 deletions docs/changelog/107862.yaml
@@ -0,0 +1,6 @@
pr: 107862
summary: Fix serialization of put-shutdown request
area: Infra/Node Lifecycle
type: bug
issues:
- 107857
Expand Up @@ -183,6 +183,7 @@ static TransportVersion def(int id) {
public static final TransportVersion INDEX_SEGMENTS_VECTOR_FORMATS = def(8_642_00_0);
public static final TransportVersion ADD_RESOURCE_ALREADY_UPLOADED_EXCEPTION = def(8_643_00_0);
public static final TransportVersion ESQL_MV_ORDERING_SORTED_ASCENDING = def(8_644_00_0);
public static final TransportVersion SHUTDOWN_REQUEST_TIMEOUTS_FIX = def(8_645_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Expand Up @@ -7,13 +7,15 @@

package org.elasticsearch.xpack.shutdown;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;

Expand All @@ -35,11 +37,20 @@ public Request(String nodeId) {
}

public Request(StreamInput in) throws IOException {
if (in.getTransportVersion().onOrAfter(TransportVersions.SHUTDOWN_REQUEST_TIMEOUTS_FIX)) {
// effectively super(in):
setParentTask(TaskId.readFromStream(in));
masterNodeTimeout(in.readTimeValue());
ackTimeout(in.readTimeValue());
}
this.nodeId = in.readString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.SHUTDOWN_REQUEST_TIMEOUTS_FIX)) {
super.writeTo(out);
}
out.writeString(this.nodeId);
}

Expand Down
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.shutdown;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
Expand All @@ -17,6 +18,8 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.UpdateForV9;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.XContentParser;
Expand Down Expand Up @@ -95,7 +98,14 @@ public Request(
this.gracePeriod = gracePeriod;
}

@UpdateForV9 // TODO call super(in) instead of explicitly reading superclass contents once bwc no longer needed
public Request(StreamInput in) throws IOException {
if (in.getTransportVersion().onOrAfter(TransportVersions.SHUTDOWN_REQUEST_TIMEOUTS_FIX)) {
// effectively super(in):
setParentTask(TaskId.readFromStream(in));
masterNodeTimeout(in.readTimeValue());
ackTimeout(in.readTimeValue());
}
this.nodeId = in.readString();
this.type = in.readEnum(SingleNodeShutdownMetadata.Type.class);
this.reason = in.readString();
Expand All @@ -114,6 +124,9 @@ public Request(StreamInput in) throws IOException {

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.SHUTDOWN_REQUEST_TIMEOUTS_FIX)) {
super.writeTo(out);
}
out.writeString(nodeId);
if (out.getTransportVersion().before(REPLACE_SHUTDOWN_TYPE_ADDED_VERSION)
&& this.type == SingleNodeShutdownMetadata.Type.REPLACE) {
Expand Down Expand Up @@ -207,5 +220,6 @@ public ActionRequestValidationException validate() {
return null;
}
}

}
}
@@ -0,0 +1,82 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.shutdown;

import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;

public class DeleteShutdownRequestTests extends AbstractWireSerializingTestCase<DeleteShutdownRequestTests.RequestWrapper> {

/**
* Wraps a {@link DeleteShutdownNodeAction.Request} to add proper equality checks
*/
record RequestWrapper(String nodeId, TaskId parentTask, TimeValue masterNodeTimeout, TimeValue ackTimeout) implements Writeable {
@Override
public void writeTo(StreamOutput out) throws IOException {
final var request = new DeleteShutdownNodeAction.Request(nodeId);
request.setParentTask(parentTask);
request.ackTimeout(ackTimeout);
request.masterNodeTimeout(masterNodeTimeout);
request.writeTo(out);
}
}

@Override
protected Writeable.Reader<RequestWrapper> instanceReader() {
return in -> {
final var request = new DeleteShutdownNodeAction.Request(in);
return new RequestWrapper(request.getNodeId(), request.getParentTask(), request.masterNodeTimeout(), request.ackTimeout());
};
}

@Override
protected RequestWrapper createTestInstance() {
return new RequestWrapper(randomIdentifier(), randomTaskId(), randomTimeValue(), randomTimeValue());
}

private static TaskId randomTaskId() {
return randomBoolean() ? TaskId.EMPTY_TASK_ID : new TaskId(randomIdentifier(), randomNonNegativeLong());
}

@Override
protected RequestWrapper mutateInstance(RequestWrapper instance) {
return switch (between(1, 4)) {
case 1 -> new RequestWrapper(
randomValueOtherThan(instance.nodeId, ESTestCase::randomIdentifier),
instance.parentTask,
instance.ackTimeout,
instance.masterNodeTimeout
);
case 2 -> new RequestWrapper(
instance.nodeId,
randomValueOtherThan(instance.parentTask, DeleteShutdownRequestTests::randomTaskId),
instance.ackTimeout,
instance.masterNodeTimeout
);
case 3 -> new RequestWrapper(
instance.nodeId,
instance.parentTask,
randomValueOtherThan(instance.ackTimeout, ESTestCase::randomTimeValue),
instance.masterNodeTimeout
);
case 4 -> new RequestWrapper(
instance.nodeId,
instance.parentTask,
instance.ackTimeout,
randomValueOtherThan(instance.masterNodeTimeout, ESTestCase::randomTimeValue)
);
default -> throw new AssertionError("impossible");
};
}
}
@@ -0,0 +1,196 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.shutdown;

import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;

public class PutShutdownRequestTests extends AbstractWireSerializingTestCase<PutShutdownRequestTests.RequestWrapper> {

/**
* Wraps a {@link org.elasticsearch.xpack.shutdown.PutShutdownNodeAction.Request} to add proper equality checks
*/
record RequestWrapper(
String nodeId,
SingleNodeShutdownMetadata.Type type,
String reason,
TimeValue allocationDelay,
String targetNodeName,
TimeValue gracePeriod,
TaskId parentTask,
TimeValue masterNodeTimeout,
TimeValue ackTimeout
) implements Writeable {
@Override
public void writeTo(StreamOutput out) throws IOException {
final var request = new PutShutdownNodeAction.Request(nodeId, type, reason, allocationDelay, targetNodeName, gracePeriod);
request.setParentTask(parentTask);
request.ackTimeout(ackTimeout);
request.masterNodeTimeout(masterNodeTimeout);
request.writeTo(out);
}
}

@Override
protected Writeable.Reader<RequestWrapper> instanceReader() {
return in -> {
final var request = new PutShutdownNodeAction.Request(in);
return new RequestWrapper(
request.getNodeId(),
request.getType(),
request.getReason(),
request.getAllocationDelay(),
request.getTargetNodeName(),
request.getGracePeriod(),
request.getParentTask(),
request.masterNodeTimeout(),
request.ackTimeout()
);
};
}

@Override
protected RequestWrapper createTestInstance() {
return new RequestWrapper(
randomIdentifier(),
randomFrom(SingleNodeShutdownMetadata.Type.values()),
randomIdentifier(),
randomOptionalTimeValue(),
randomOptionalIdentifier(),
randomOptionalTimeValue(),
randomTaskId(),
randomTimeValue(),
randomTimeValue()
);
}

private static String randomOptionalIdentifier() {
return randomBoolean() ? null : randomIdentifier();
}

private static TimeValue randomOptionalTimeValue() {
return randomBoolean() ? null : randomTimeValue();
}

private static TaskId randomTaskId() {
return randomBoolean() ? TaskId.EMPTY_TASK_ID : new TaskId(randomIdentifier(), randomNonNegativeLong());
}

@Override
protected RequestWrapper mutateInstance(RequestWrapper instance) {
return switch (between(1, 9)) {
case 1 -> new RequestWrapper(
randomValueOtherThan(instance.nodeId, ESTestCase::randomIdentifier),
instance.type,
instance.reason,
instance.allocationDelay,
instance.targetNodeName,
instance.gracePeriod,
instance.parentTask,
instance.ackTimeout,
instance.masterNodeTimeout
);
case 2 -> new RequestWrapper(
instance.nodeId,
randomValueOtherThan(instance.type, () -> randomFrom(SingleNodeShutdownMetadata.Type.values())),
instance.reason,
instance.allocationDelay,
instance.targetNodeName,
instance.gracePeriod,
instance.parentTask,
instance.ackTimeout,
instance.masterNodeTimeout
);
case 3 -> new RequestWrapper(
instance.nodeId,
instance.type,
randomValueOtherThan(instance.reason, ESTestCase::randomIdentifier),
instance.allocationDelay,
instance.targetNodeName,
instance.gracePeriod,
instance.parentTask,
instance.ackTimeout,
instance.masterNodeTimeout
);
case 4 -> new RequestWrapper(
instance.nodeId,
instance.type,
instance.reason,
randomValueOtherThan(instance.allocationDelay, PutShutdownRequestTests::randomOptionalTimeValue),
instance.targetNodeName,
instance.gracePeriod,
instance.parentTask,
instance.ackTimeout,
instance.masterNodeTimeout
);
case 5 -> new RequestWrapper(
instance.nodeId,
instance.type,
instance.reason,
instance.allocationDelay,
randomValueOtherThan(instance.targetNodeName, PutShutdownRequestTests::randomOptionalIdentifier),
instance.gracePeriod,
instance.parentTask,
instance.ackTimeout,
instance.masterNodeTimeout
);
case 6 -> new RequestWrapper(
instance.nodeId,
instance.type,
instance.reason,
instance.allocationDelay,
instance.targetNodeName,
randomValueOtherThan(instance.gracePeriod, PutShutdownRequestTests::randomOptionalTimeValue),
instance.parentTask,
instance.ackTimeout,
instance.masterNodeTimeout
);
case 7 -> new RequestWrapper(
instance.nodeId,
instance.type,
instance.reason,
instance.allocationDelay,
instance.targetNodeName,
instance.gracePeriod,
randomValueOtherThan(instance.parentTask, PutShutdownRequestTests::randomTaskId),
instance.ackTimeout,
instance.masterNodeTimeout
);
case 8 -> new RequestWrapper(
instance.nodeId,
instance.type,
instance.reason,
instance.allocationDelay,
instance.targetNodeName,
instance.gracePeriod,
instance.parentTask,
randomValueOtherThan(instance.ackTimeout, ESTestCase::randomTimeValue),
instance.masterNodeTimeout
);
case 9 -> new RequestWrapper(
instance.nodeId,
instance.type,
instance.reason,
instance.allocationDelay,
instance.targetNodeName,
instance.gracePeriod,
instance.parentTask,
instance.ackTimeout,
randomValueOtherThan(instance.masterNodeTimeout, ESTestCase::randomTimeValue)
);
default -> throw new AssertionError("impossible");
};
}
}