Skip to content

Commit

Permalink
[Transform] Retry Destination IndexNotFoundException (#108394)
Browse files Browse the repository at this point in the history
A Destination Index can be removed from its previous shard in the
middle of a Transform run.  Ideally, this happens as part of the Delete
API, and the Transform has already been stopped, but in the case
that it isn't, we want to retry the checkpoint.

If the Transform had been stopped, the retry will move the Indexer into
a graceful shutdown.

If the Transform had not been stopped, the retry will check if the Index
exists or recreate the Index if it does not exist.

This is currently how unattended Transforms work, and this change will
make it so regular Transforms can also auto-recover from this error.

Fix #107263
  • Loading branch information
prwhelan committed May 9, 2024
1 parent ff20164 commit 0b71746
Show file tree
Hide file tree
Showing 7 changed files with 278 additions and 121 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/108394.yaml
@@ -0,0 +1,6 @@
pr: 108394
summary: Handle `IndexNotFoundException`
area: Transform
type: bug
issues:
- 107263
Expand Up @@ -193,7 +193,11 @@ protected void handleBulkResponse(BulkResponse bulkResponse, ActionListener<Bulk

for (BulkItemResponse item : bulkResponse.getItems()) {
if (item.isFailed()) {
deduplicatedFailures.putIfAbsent(item.getFailure().getCause().getClass().getSimpleName(), item);
var exceptionClass = item.getFailure().getCause().getClass();
if (IndexNotFoundException.class.isAssignableFrom(exceptionClass)) {
context.setShouldRecreateDestinationIndex(true);
}
deduplicatedFailures.putIfAbsent(exceptionClass.getSimpleName(), item);
failureCount++;
}
}
Expand Down
Expand Up @@ -45,6 +45,7 @@ public interface Listener {
private volatile Instant changesLastDetectedAt;
private volatile Instant lastSearchTime;
private volatile boolean shouldStopAtCheckpoint = false;
private volatile boolean shouldRecreateDestinationIndex = false;
private volatile AuthorizationState authState;
private volatile int pageSize = 0;

Expand Down Expand Up @@ -174,6 +175,14 @@ public void setShouldStopAtCheckpoint(boolean shouldStopAtCheckpoint) {
this.shouldStopAtCheckpoint = shouldStopAtCheckpoint;
}

public boolean shouldRecreateDestinationIndex() {
return shouldRecreateDestinationIndex;
}

public void setShouldRecreateDestinationIndex(boolean shouldRecreateDestinationIndex) {
this.shouldRecreateDestinationIndex = shouldRecreateDestinationIndex;
}

public AuthorizationState getAuthState() {
return authState;
}
Expand Down
Expand Up @@ -348,8 +348,12 @@ protected void onStart(long now, ActionListener<Boolean> listener) {
}, listener::onFailure);

var deducedDestIndexMappings = new SetOnce<Map<String, String>>();
var shouldMaybeCreateDestIndexForUnattended = context.getCheckpoint() == 0
&& TransformEffectiveSettings.isUnattended(transformConfig.getSettings());

// if the unattended transform had not created the destination index yet, or if the destination index was deleted for any
// type of transform during the last run, then we try to create the destination index.
// This is important to create the destination index explicitly before indexing documents. Otherwise, the destination
// index aliases may be missing.
var shouldMaybeCreateDestIndex = isFirstUnattendedRun() || context.shouldRecreateDestinationIndex();

ActionListener<Map<String, String>> fieldMappingsListener = ActionListener.wrap(destIndexMappings -> {
if (destIndexMappings.isEmpty() == false) {
Expand All @@ -359,11 +363,12 @@ protected void onStart(long now, ActionListener<Boolean> listener) {
// ... otherwise we fall back to index mappings deduced based on source indices
this.fieldMappings = deducedDestIndexMappings.get();
}
// Since the unattended transform could not have created the destination index yet, we do it here.
// This is important to create the destination index explicitly before indexing first documents. Otherwise, the destination
// index aliases may be missing.
if (destIndexMappings.isEmpty() && shouldMaybeCreateDestIndexForUnattended) {
doMaybeCreateDestIndex(deducedDestIndexMappings.get(), configurationReadyListener);

if (destIndexMappings.isEmpty() && shouldMaybeCreateDestIndex) {
doMaybeCreateDestIndex(deducedDestIndexMappings.get(), configurationReadyListener.delegateFailure((delegate, response) -> {
context.setShouldRecreateDestinationIndex(false);
delegate.onResponse(response);
}));
} else {
configurationReadyListener.onResponse(null);
}
Expand All @@ -380,7 +385,7 @@ protected void onStart(long now, ActionListener<Boolean> listener) {
deducedDestIndexMappings.set(validationResponse.getDestIndexMappings());
if (isContinuous()) {
transformsConfigManager.getTransformConfiguration(getJobId(), ActionListener.wrap(config -> {
if (transformConfig.equals(config) && fieldMappings != null && shouldMaybeCreateDestIndexForUnattended == false) {
if (transformConfig.equals(config) && fieldMappings != null && shouldMaybeCreateDestIndex == false) {
logger.trace("[{}] transform config has not changed.", getJobId());
configurationReadyListener.onResponse(null);
} else {
Expand Down Expand Up @@ -415,7 +420,7 @@ protected void onStart(long now, ActionListener<Boolean> listener) {
}, listener::onFailure);

Instant instantOfTrigger = Instant.ofEpochMilli(now);
// If we are not on the initial batch checkpoint and its the first pass of whatever continuous checkpoint we are on,
// If we are not on the initial batch checkpoint and it's the first pass of whatever continuous checkpoint we are on,
// we should verify if there are local changes based on the sync config. If not, do not proceed further and exit.
if (context.getCheckpoint() > 0 && initialRun()) {
checkpointProvider.sourceHasChanged(getLastCheckpoint(), ActionListener.wrap(hasChanged -> {
Expand All @@ -436,8 +441,7 @@ protected void onStart(long now, ActionListener<Boolean> listener) {
hasSourceChanged = true;
listener.onFailure(failure);
}));
} else if (context.getCheckpoint() == 0 && TransformEffectiveSettings.isUnattended(transformConfig.getSettings())) {
// this transform runs in unattended mode and has never run, to go on
} else if (shouldMaybeCreateDestIndex) {
validate(changedSourceListener);
} else {
hasSourceChanged = true;
Expand All @@ -447,6 +451,13 @@ protected void onStart(long now, ActionListener<Boolean> listener) {
}
}

/**
* Returns true if this transform runs in unattended mode and has never run.
*/
private boolean isFirstUnattendedRun() {
return context.getCheckpoint() == 0 && TransformEffectiveSettings.isUnattended(transformConfig.getSettings());
}

protected void initializeFunction() {
// create the function
function = FunctionFactory.create(getConfig());
Expand Down
Expand Up @@ -9,6 +9,7 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchContextMissingException;
import org.elasticsearch.tasks.TaskCancelledException;
Expand Down Expand Up @@ -63,7 +64,7 @@ public static Throwable getFirstIrrecoverableExceptionFromBulkResponses(Collecti
}

if (unwrappedThrowable instanceof ElasticsearchException elasticsearchException) {
if (isExceptionIrrecoverable(elasticsearchException)) {
if (isExceptionIrrecoverable(elasticsearchException) && isNotIndexNotFoundException(elasticsearchException)) {
return elasticsearchException;
}
}
Expand All @@ -72,6 +73,15 @@ public static Throwable getFirstIrrecoverableExceptionFromBulkResponses(Collecti
return null;
}

/**
* We can safely recover from IndexNotFoundExceptions on Bulk responses.
* If the transform is running, the next checkpoint will recreate the index.
* If the transform is not running, the next start request will recreate the index.
*/
private static boolean isNotIndexNotFoundException(ElasticsearchException elasticsearchException) {
return elasticsearchException instanceof IndexNotFoundException == false;
}

public static boolean isExceptionIrrecoverable(ElasticsearchException elasticsearchException) {
if (IRRECOVERABLE_REST_STATUSES.contains(elasticsearchException.status())) {

Expand Down

0 comments on commit 0b71746

Please sign in to comment.