From 1c7a675d684ecf0ffabf74b8dc6afd4a29889d0e Mon Sep 17 00:00:00 2001 From: Rory Hunter Date: Mon, 8 Feb 2021 13:47:00 +0000 Subject: [PATCH] Migrate `.tasks` to be managed automatically (#67351) Re-apply changes from 0c9b9c1, which migrates the `.tasks` system index to be managed automatically by the system indices infrastructure. Changes went into #67114 that, I hope, will avoid the problems we saw before in the BWC tests in CI. --- .../elasticsearch/indices/SystemIndices.java | 4 +- .../tasks/TaskResultsService.java | 192 ++++++++++-------- .../tasks/task-index-mapping.json | 60 ------ 3 files changed, 105 insertions(+), 151 deletions(-) delete mode 100644 server/src/main/resources/org/elasticsearch/tasks/task-index-mapping.json diff --git a/server/src/main/java/org/elasticsearch/indices/SystemIndices.java b/server/src/main/java/org/elasticsearch/indices/SystemIndices.java index f486b24eb8c6e6..cd97756b16429a 100644 --- a/server/src/main/java/org/elasticsearch/indices/SystemIndices.java +++ b/server/src/main/java/org/elasticsearch/indices/SystemIndices.java @@ -27,7 +27,7 @@ import java.util.stream.Collectors; import static java.util.stream.Collectors.toUnmodifiableList; -import static org.elasticsearch.tasks.TaskResultsService.TASK_INDEX; +import static org.elasticsearch.tasks.TaskResultsService.TASKS_DESCRIPTOR; /** * This class holds the {@link SystemIndexDescriptor} objects that represent system indices the @@ -36,7 +36,7 @@ */ public class SystemIndices { private static final Map> SERVER_SYSTEM_INDEX_DESCRIPTORS = Map.of( - TaskResultsService.class.getName(), List.of(new SystemIndexDescriptor(TASK_INDEX + "*", "Task Result Index")) + TaskResultsService.class.getName(), List.of(TASKS_DESCRIPTOR) ); private final CharacterRunAutomaton runAutomaton; diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java b/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java index d0a21046acc0e4..8b125d95fa35cb 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java @@ -11,21 +11,15 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; -import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.client.Requests; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.metadata.MappingMetadata; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -33,19 +27,16 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.core.internal.io.Streams; +import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.threadpool.ThreadPool; -import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; +import java.io.UncheckedIOException; import java.util.Iterator; -import java.util.Map; import static org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; /** * Service that can store task results. @@ -55,12 +46,17 @@ public class TaskResultsService { private static final Logger logger = LogManager.getLogger(TaskResultsService.class); public static final String TASK_INDEX = ".tasks"; - - public static final String TASK_RESULT_INDEX_MAPPING_FILE = "task-index-mapping.json"; - public static final String TASK_RESULT_MAPPING_VERSION_META_FIELD = "version"; - public static final int TASK_RESULT_MAPPING_VERSION = 3; + public static final SystemIndexDescriptor TASKS_DESCRIPTOR = SystemIndexDescriptor.builder() + .setIndexPattern(TASK_INDEX + "*") + .setPrimaryIndex(TASK_INDEX) + .setDescription("Task Result Index") + .setSettings(getTaskResultIndexSettings()) + .setMappings(getTaskResultIndexMappings()) + .setVersionMetaKey(TASK_RESULT_MAPPING_VERSION_META_FIELD) + .setOrigin(TASKS_ORIGIN) + .build(); /** * The backoff policy to use when saving a task result fails. The total wait @@ -71,75 +67,15 @@ public class TaskResultsService { private final Client client; - private final ClusterService clusterService; - private final ThreadPool threadPool; @Inject - public TaskResultsService(Client client, ClusterService clusterService, ThreadPool threadPool) { + public TaskResultsService(Client client, ThreadPool threadPool) { this.client = new OriginSettingClient(client, TASKS_ORIGIN); - this.clusterService = clusterService; this.threadPool = threadPool; } public void storeResult(TaskResult taskResult, ActionListener listener) { - - ClusterState state = clusterService.state(); - - if (state.routingTable().hasIndex(TASK_INDEX) == false) { - CreateIndexRequest createIndexRequest = new CreateIndexRequest(); - createIndexRequest.settings(taskResultIndexSettings()); - createIndexRequest.index(TASK_INDEX); - createIndexRequest.mapping(taskResultIndexMapping()); - createIndexRequest.cause("auto(task api)"); - - client.admin().indices().create(createIndexRequest, new ActionListener() { - @Override - public void onResponse(CreateIndexResponse result) { - doStoreResult(taskResult, listener); - } - - @Override - public void onFailure(Exception e) { - if (ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException) { - // we have the index, do it - try { - doStoreResult(taskResult, listener); - } catch (Exception inner) { - inner.addSuppressed(e); - listener.onFailure(inner); - } - } else { - listener.onFailure(e); - } - } - }); - } else { - IndexMetadata metadata = state.getMetadata().index(TASK_INDEX); - if (getTaskResultMappingVersion(metadata) < TASK_RESULT_MAPPING_VERSION) { - // The index already exists but doesn't have our mapping - client.admin().indices().preparePutMapping(TASK_INDEX) - .setSource(taskResultIndexMapping(), XContentType.JSON) - .execute(ActionListener.delegateFailure(listener, (l, r) -> doStoreResult(taskResult, listener))); - } else { - doStoreResult(taskResult, listener); - } - } - } - - private int getTaskResultMappingVersion(IndexMetadata metadata) { - MappingMetadata mappingMetadata = metadata.mapping(); - if (mappingMetadata == null) { - return 0; - } - @SuppressWarnings("unchecked") Map meta = (Map) mappingMetadata.sourceAsMap().get("_meta"); - if (meta == null || meta.containsKey(TASK_RESULT_MAPPING_VERSION_META_FIELD) == false) { - return 1; // The mapping was created before meta field was introduced - } - return (int) meta.get(TASK_RESULT_MAPPING_VERSION_META_FIELD); - } - - private void doStoreResult(TaskResult taskResult, ActionListener listener) { IndexRequestBuilder index = client.prepareIndex(TASK_INDEX).setId(taskResult.getTask().getTaskId().toString()); try (XContentBuilder builder = XContentFactory.contentBuilder(Requests.INDEX_CONTENT_TYPE)) { taskResult.toXContent(builder, ToXContent.EMPTY_PARAMS); @@ -171,7 +107,7 @@ public void onFailure(Exception e) { }); } - private Settings taskResultIndexSettings() { + private static Settings getTaskResultIndexSettings() { return Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) .put(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.getKey(), "0-1") @@ -179,16 +115,94 @@ private Settings taskResultIndexSettings() { .build(); } - public String taskResultIndexMapping() { - try (InputStream is = getClass().getResourceAsStream(TASK_RESULT_INDEX_MAPPING_FILE)) { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - Streams.copy(is, out); - return out.toString(StandardCharsets.UTF_8.name()); - } catch (Exception e) { - logger.error(() -> new ParameterizedMessage( - "failed to create tasks results index template [{}]", TASK_RESULT_INDEX_MAPPING_FILE), e); - throw new IllegalStateException("failed to create tasks results index template [" + TASK_RESULT_INDEX_MAPPING_FILE + "]", e); - } + private static XContentBuilder getTaskResultIndexMappings() { + try { + final XContentBuilder builder = jsonBuilder(); + + builder.startObject(); + { + builder.startObject("_meta"); + builder.field(TASK_RESULT_MAPPING_VERSION_META_FIELD, Version.CURRENT.toString()); + builder.endObject(); + + builder.field("dynamic", "strict"); + builder.startObject("properties"); + { + builder.startObject("completed"); + builder.field("type", "boolean"); + builder.endObject(); + + builder.startObject("task"); + { + builder.startObject("properties"); + { + builder.startObject("action"); + builder.field("type", "keyword"); + builder.endObject(); + + builder.startObject("cancellable"); + builder.field("type", "boolean"); + builder.endObject(); + + builder.startObject("id"); + builder.field("type", "long"); + builder.endObject(); + + builder.startObject("parent_task_id"); + builder.field("type", "keyword"); + builder.endObject(); + + builder.startObject("node"); + builder.field("type", "keyword"); + builder.endObject(); + + builder.startObject("running_time_in_nanos"); + builder.field("type", "long"); + builder.endObject(); + + builder.startObject("start_time_in_millis"); + builder.field("type", "long"); + builder.endObject(); + + builder.startObject("type"); + builder.field("type", "keyword"); + builder.endObject(); + + builder.startObject("status"); + builder.field("type", "object"); + builder.field("enabled", false); + builder.endObject(); + + builder.startObject("description"); + builder.field("type", "text"); + builder.endObject(); + + builder.startObject("headers"); + builder.field("type", "object"); + builder.field("enabled", false); + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + + builder.startObject("response"); + builder.field("type", "object"); + builder.field("enabled", false); + builder.endObject(); + builder.startObject("error"); + builder.field("type", "object"); + builder.field("enabled", false); + builder.endObject(); + } + builder.endObject(); + } + + builder.endObject(); + return builder; + } catch (IOException e) { + throw new UncheckedIOException("Failed to build " + TASK_INDEX + " index mappings", e); + } } } diff --git a/server/src/main/resources/org/elasticsearch/tasks/task-index-mapping.json b/server/src/main/resources/org/elasticsearch/tasks/task-index-mapping.json deleted file mode 100644 index ef5873ae53c584..00000000000000 --- a/server/src/main/resources/org/elasticsearch/tasks/task-index-mapping.json +++ /dev/null @@ -1,60 +0,0 @@ -{ - "_doc" : { - "_meta": { - "version": 3 - }, - "dynamic" : "strict", - "properties" : { - "completed": { - "type": "boolean" - }, - "task" : { - "properties": { - "action": { - "type": "keyword" - }, - "cancellable": { - "type": "boolean" - }, - "id": { - "type": "long" - }, - "parent_task_id": { - "type": "keyword" - }, - "node": { - "type": "keyword" - }, - "running_time_in_nanos": { - "type": "long" - }, - "start_time_in_millis": { - "type": "long" - }, - "type": { - "type": "keyword" - }, - "status": { - "type" : "object", - "enabled" : false - }, - "description": { - "type": "text" - }, - "headers": { - "type" : "object", - "enabled" : false - } - } - }, - "response" : { - "type" : "object", - "enabled" : false - }, - "error" : { - "type" : "object", - "enabled" : false - } - } - } -}