Skip to content

Commit

Permalink
Fix serialization of put/delete shutdown requests (#107862)
Browse files Browse the repository at this point in the history
Co-authored-by: Simon Cooper <simon.cooper@elastic.co>
  • Loading branch information
DaveCTurner and thecoop committed May 3, 2024
1 parent 75e02bc commit eb90e36
Show file tree
Hide file tree
Showing 6 changed files with 320 additions and 0 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/107862.yaml
@@ -0,0 +1,6 @@
pr: 107862
summary: Fix serialization of put-shutdown request
area: Infra/Node Lifecycle
type: bug
issues:
- 107857
3 changes: 3 additions & 0 deletions server/src/main/java/org/elasticsearch/TransportVersions.java
Expand Up @@ -133,6 +133,7 @@ static TransportVersion def(int id) {
public static final TransportVersion INDEX_REQUEST_NORMALIZED_BYTES_PARSED = def(8_593_00_0);
public static final TransportVersion INGEST_GRAPH_STRUCTURE_EXCEPTION = def(8_594_00_0);
public static final TransportVersion V_8_13_0 = def(8_595_00_0);
public static final TransportVersion SHUTDOWN_REQUEST_TIMEOUTS_FIX_8_13 = def(8_595_00_1);
// 8.14.0+
public static final TransportVersion RANDOM_AGG_SHARD_SEED = def(8_596_00_0);
public static final TransportVersion ESQL_TIMINGS = def(8_597_00_0);
Expand Down Expand Up @@ -175,6 +176,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ML_INFERENCE_AZURE_OPENAI_EMBEDDINGS = def(8_634_00_0);
public static final TransportVersion ILM_SHRINK_ENABLE_WRITE = def(8_635_00_0);
public static final TransportVersion GEOIP_CACHE_STATS = def(8_636_00_0);
public static final TransportVersion SHUTDOWN_REQUEST_TIMEOUTS_FIX_8_14 = def(8_636_00_1);
public static final TransportVersion WATERMARK_THRESHOLDS_STATS = def(8_637_00_0);
public static final TransportVersion ENRICH_CACHE_ADDITIONAL_STATS = def(8_638_00_0);
public static final TransportVersion ML_INFERENCE_RATE_LIMIT_SETTINGS_ADDED = def(8_639_00_0);
Expand All @@ -189,6 +191,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_REQUEST_TABLES = def(8_648_00_0);
public static final TransportVersion ROLE_REMOTE_CLUSTER_PRIVS = def(8_649_00_0);
public static final TransportVersion NO_GLOBAL_RETENTION_FOR_SYSTEM_DATA_STREAMS = def(8_650_00_0);
public static final TransportVersion SHUTDOWN_REQUEST_TIMEOUTS_FIX = def(8_651_00_0);
/*
* STOP! READ THIS FIRST! No, really,
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _
Expand Down
Expand Up @@ -7,13 +7,15 @@

package org.elasticsearch.xpack.shutdown;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;

Expand All @@ -35,11 +37,24 @@ public Request(String nodeId) {
}

public Request(StreamInput in) throws IOException {
if (in.getTransportVersion().isPatchFrom(TransportVersions.SHUTDOWN_REQUEST_TIMEOUTS_FIX_8_13)
|| in.getTransportVersion().isPatchFrom(TransportVersions.SHUTDOWN_REQUEST_TIMEOUTS_FIX_8_14)
|| in.getTransportVersion().onOrAfter(TransportVersions.SHUTDOWN_REQUEST_TIMEOUTS_FIX)) {
// effectively super(in):
setParentTask(TaskId.readFromStream(in));
masterNodeTimeout(in.readTimeValue());
ackTimeout(in.readTimeValue());
}
this.nodeId = in.readString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().isPatchFrom(TransportVersions.SHUTDOWN_REQUEST_TIMEOUTS_FIX_8_13)
|| out.getTransportVersion().isPatchFrom(TransportVersions.SHUTDOWN_REQUEST_TIMEOUTS_FIX_8_14)
|| out.getTransportVersion().onOrAfter(TransportVersions.SHUTDOWN_REQUEST_TIMEOUTS_FIX)) {
super.writeTo(out);
}
out.writeString(this.nodeId);
}

Expand Down
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.shutdown;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
Expand All @@ -17,6 +18,8 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.UpdateForV9;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.XContentParser;
Expand Down Expand Up @@ -95,7 +98,16 @@ public Request(
this.gracePeriod = gracePeriod;
}

@UpdateForV9 // TODO call super(in) instead of explicitly reading superclass contents once bwc no longer needed
public Request(StreamInput in) throws IOException {
if (in.getTransportVersion().isPatchFrom(TransportVersions.SHUTDOWN_REQUEST_TIMEOUTS_FIX_8_13)
|| in.getTransportVersion().isPatchFrom(TransportVersions.SHUTDOWN_REQUEST_TIMEOUTS_FIX_8_14)
|| in.getTransportVersion().onOrAfter(TransportVersions.SHUTDOWN_REQUEST_TIMEOUTS_FIX)) {
// effectively super(in):
setParentTask(TaskId.readFromStream(in));
masterNodeTimeout(in.readTimeValue());
ackTimeout(in.readTimeValue());
}
this.nodeId = in.readString();
this.type = in.readEnum(SingleNodeShutdownMetadata.Type.class);
this.reason = in.readString();
Expand All @@ -114,6 +126,11 @@ public Request(StreamInput in) throws IOException {

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().isPatchFrom(TransportVersions.SHUTDOWN_REQUEST_TIMEOUTS_FIX_8_13)
|| out.getTransportVersion().isPatchFrom(TransportVersions.SHUTDOWN_REQUEST_TIMEOUTS_FIX_8_14)
|| out.getTransportVersion().onOrAfter(TransportVersions.SHUTDOWN_REQUEST_TIMEOUTS_FIX)) {
super.writeTo(out);
}
out.writeString(nodeId);
if (out.getTransportVersion().before(REPLACE_SHUTDOWN_TYPE_ADDED_VERSION)
&& this.type == SingleNodeShutdownMetadata.Type.REPLACE) {
Expand Down Expand Up @@ -207,5 +224,6 @@ public ActionRequestValidationException validate() {
return null;
}
}

}
}
@@ -0,0 +1,82 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.shutdown;

import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;

public class DeleteShutdownRequestTests extends AbstractWireSerializingTestCase<DeleteShutdownRequestTests.RequestWrapper> {

/**
* Wraps a {@link DeleteShutdownNodeAction.Request} to add proper equality checks
*/
record RequestWrapper(String nodeId, TaskId parentTask, TimeValue masterNodeTimeout, TimeValue ackTimeout) implements Writeable {
@Override
public void writeTo(StreamOutput out) throws IOException {
final var request = new DeleteShutdownNodeAction.Request(nodeId);
request.setParentTask(parentTask);
request.ackTimeout(ackTimeout);
request.masterNodeTimeout(masterNodeTimeout);
request.writeTo(out);
}
}

@Override
protected Writeable.Reader<RequestWrapper> instanceReader() {
return in -> {
final var request = new DeleteShutdownNodeAction.Request(in);
return new RequestWrapper(request.getNodeId(), request.getParentTask(), request.masterNodeTimeout(), request.ackTimeout());
};
}

@Override
protected RequestWrapper createTestInstance() {
return new RequestWrapper(randomIdentifier(), randomTaskId(), randomTimeValue(), randomTimeValue());
}

private static TaskId randomTaskId() {
return randomBoolean() ? TaskId.EMPTY_TASK_ID : new TaskId(randomIdentifier(), randomNonNegativeLong());
}

@Override
protected RequestWrapper mutateInstance(RequestWrapper instance) {
return switch (between(1, 4)) {
case 1 -> new RequestWrapper(
randomValueOtherThan(instance.nodeId, ESTestCase::randomIdentifier),
instance.parentTask,
instance.ackTimeout,
instance.masterNodeTimeout
);
case 2 -> new RequestWrapper(
instance.nodeId,
randomValueOtherThan(instance.parentTask, DeleteShutdownRequestTests::randomTaskId),
instance.ackTimeout,
instance.masterNodeTimeout
);
case 3 -> new RequestWrapper(
instance.nodeId,
instance.parentTask,
randomValueOtherThan(instance.ackTimeout, ESTestCase::randomTimeValue),
instance.masterNodeTimeout
);
case 4 -> new RequestWrapper(
instance.nodeId,
instance.parentTask,
instance.ackTimeout,
randomValueOtherThan(instance.masterNodeTimeout, ESTestCase::randomTimeValue)
);
default -> throw new AssertionError("impossible");
};
}
}

0 comments on commit eb90e36

Please sign in to comment.