Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reject task updates addressed to previous failed node #21744

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

findepi
Copy link
Member

@findepi findepi commented Apr 29, 2024

In case of a worker node failure, a new node can be started with same IP address. Once the new node is done initializing and registers with discovery service, the coordinator will eventually be aware of it and use it for processing queries. Before then, and before coordinator is aware of node failure, it may try to reach the old failed node. Since the old and the new may have same IP address, coordinator may effective talk to uninitialized worker node as if it was ready to service queries. This may lead to failures such as "unknown catalog" or rather "unknown handle id", when decoding task update requests.

Prevent such hard to understand failures by passing node ID in task update requests. Provided that the new worker is started with a different node ID, these requests will be cleanly rejected without trying to service them.

Per #21735 (comment) fixes #21735

@wendigo
Copy link
Contributor

wendigo commented Apr 29, 2024

One of my favorite all time bug fix

@findepi findepi force-pushed the findepi/reject-task-updates-addressed-to-previous-failed-node-c1e964 branch from d0c0010 to ded32f1 Compare April 29, 2024 10:42
TaskUpdateRequest taskUpdateRequest,
@Context UriInfo uriInfo,
@Suspended AsyncResponse asyncResponse)
{
requireNonNull(taskUpdateRequest, "taskUpdateRequest is null");
if (!nodeIdentifier.equals(requestNodeId)) {
asyncResponse.resume(Response.status(Status.BAD_REQUEST)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to put this in an Http Filter?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it require to update all the comms requests?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be able to filter by common path if it exists IIRC

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if(path.startsWith(/v1/task) {
//filter logic 
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I somehow prefer explicit approach at least for now, where not all communication channels are updated.
I think a filter-based approach is reasonable follow-up once we have the approach proven and we just want to reduce boilerplate and ensure it's applied uniformly. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HTTP filter on receiving side would be easy. HTTP client request filter wouldn't be possible, as the filter wouldn't have access to the expected announcement ID for the target.

@martint
Copy link
Member

martint commented Apr 29, 2024

What happens when a node fails and restarts with the same IP and node id? This is a legitimate case, as the node id is expected to be stable for a given node across restarts (especially, when the node may have persistent data - raptor, etc)

@findepi
Copy link
Member Author

findepi commented Apr 29, 2024

What happens when a node fails and restarts with the same IP and node id?

That's definitely not handled by this PR.

I was initially thinking about introducing something like "incarnation ID" (e.g. a UUID randomly chosen at a node start). Switched over to node ID on the assumption that (1) very quick node restarts with IP re-use are most likely to happen in kubernetes env and (2) node ID can be generated there (it's generated when not set, right?)

cc @elonazoulay

@findepi findepi marked this pull request as draft April 29, 2024 20:22
@elonazoulay
Copy link
Member

elonazoulay commented Apr 29, 2024

What happens when a node fails and restarts with the same IP and node id?

That's definitely not handled by this PR.

I was initially thinking about introducing something like "incarnation ID" (e.g. a UUID randomly chosen at a node start). Switched over to node ID on the assumption that (1) very quick node restarts with IP re-use are most likely to happen in kubernetes env and (2) node ID can be generated there (it's generated when not set, right?)

cc @elonazoulay

That sounds like a good idea! It looks like 1) and 2) are correct. This is the behavior if node id is unset

Copy link
Member

@electrum electrum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is using node ID, which is expected to be stable across restarts. For the Docker image, we set this to the hostname. The node identifiers uniquely identifies the deployment, which may have persistent cached data, etc.

Instead, this should use instance ID via NodeInfo.getInstanceId(), which will be unique on every restart. Unfortunately, we don't provide this in discovery announcements. I think the easiest approach is to modify the code in Server.updateConnectorIds() to add instance ID as a property.

I also like @jklamer's idea of using an HTTP request filter. Instead of modifying the request paths, we can add a header X-Trino-Instance-Id that is validated in a filter. This makes it easy to add validations to more resources by adding the header in the request.

@electrum
Copy link
Member

electrum commented Apr 29, 2024

Related, I noticed that we have X-Trino-Task-Instance-Id which is created on the remote task and returned for task result responses. This was added as part of FTE. With this change, we should be able to remove that as this change seems to serve the same purpose. cc @dain @losipiuk

@electrum
Copy link
Member

I think the problem of node restarts is more complicated than it appears. We probably should track the instance ID and node IDs for remote tasks in the coordinator and proactively fail them once we detect that the node has restarted (same node ID with different instance ID, or different IDs for the same task host/port).

@losipiuk How does this affect FTE?

@findepi findepi force-pushed the findepi/reject-task-updates-addressed-to-previous-failed-node-c1e964 branch from ded32f1 to eb0b960 Compare April 30, 2024 07:38
@findepi
Copy link
Member Author

findepi commented Apr 30, 2024

We probably should track the instance ID and node IDs for remote tasks in the coordinator and proactively fail them once we detect that the node has restarted

This might be needed, but also it may turn out unnecessary. The task ID identifies the task a node was doing. If a node restarts, it won't know that task ID, so it won't update coordinator on its progress. This should result in task failure.
What I think we can improve is the speed of failure recovery. This probably requires more deliberate approach, with some well chosen error codes to prevent retries from the coordinator.

we have X-Trino-Task-Instance-Id

Indeed. It seems it was added in 8d15b14
it looks like aiming to address the "proactively fail fast" goal in some situations.

@findepi findepi force-pushed the findepi/reject-task-updates-addressed-to-previous-failed-node-c1e964 branch from eb0b960 to 22405b9 Compare April 30, 2024 07:42
@findepi
Copy link
Member Author

findepi commented Apr 30, 2024

Instead, this should use instance ID via NodeInfo.getInstanceId(), which will be unique on every restart. Unfortunately, we don't provide this in discovery announcements. I think the easiest approach is to modify the code in Server.updateConnectorIds() to add instance ID as a property.

Done

I also like @jklamer's idea of using an HTTP request filter. Instead of modifying the request paths, we can add a header X-Trino-Instance-Id that is validated in a filter. This makes it easy to add validations to more resources by adding the header in the request.

Not done at this point.

It is easy to inject validation when header is present, but it's harder to inject validation that requires the header to be present. For instance, I have problem finding io.trino.server.TaskResource#getAllTaskInfo usage and the usage would need to be updated to pass the header. Also, a really good filter should not be limited to TaskResource only.

@github-actions github-actions bot added bigquery BigQuery connector mongodb MongoDB connector labels May 7, 2024
@findepi findepi force-pushed the findepi/reject-task-updates-addressed-to-previous-failed-node-c1e964 branch from 39d6298 to 3d62a27 Compare May 7, 2024 20:21
@findepi
Copy link
Member Author

findepi commented May 7, 2024

Added a test.

@findepi findepi force-pushed the findepi/reject-task-updates-addressed-to-previous-failed-node-c1e964 branch from 3d62a27 to 95115f7 Compare May 7, 2024 20:34
queryRunner.restartWorker(worker);
}

assertThatThrownBy(future::get)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you expect error if nodeRestartBeforeQueryStart==true?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because coordinator's copy of services isn't updated yet, and coordinator tries to schedule a new task on an old instance of a worker. This is exactly the kind of production problem that sparked this PR -- worker was restarted after crash and coordinator was trying to use this new worker before it's initialized (has catalogs) because it was thinking it's the previously announced instance.

in my local testing testNodeRestartBeforeQueryStart passed 50 of 50.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To your point, there is no strong guarantee, as there is no synchronization / delayed registration for a newly started worker. Moreover, the worker startup is synchronous.

Copy link
Member

@losipiuk losipiuk May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok so the assumption here is that we are scheduling query quickly enough that coordinator state is stale, right? a comment?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW this indicates a bigger problem:
the worker crash and restart has a more prolonged effect on query execution that it could.
I.e. it affects currently executing queries, but also new queries.
A user can retry a query after it failed due to a worker crash, and the retried query can still fail because of that very same worker crash.

@electrum @dain is this a known limitation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i run the test removing the instance_id related changes (both: checks in TaskResource and discovery propagation).

The results for testRestartDuringQuery were better before changes in this PR in terms on how quickly the cluster heals itself.
After worker is started, the next query succeeds. But it doesn't mean a retried query is guaranteed to succeed. It won't succeed, if retry happens before the worker is fully restarted.

The results for testRestartBeforeQuery were much worse before changes in this PR.
The worker crash pretty deterministically leads to next query hanging indefinitely. This also means that in testRestartDuringQuery case, if the retried query was sent too quickly (before worker fully restarted), it would hang instead of failing.

Thus, I consider this PR an improvement and want to continue on it & get it merged.
But, we may want to iterate further and heal from crashes faster.

cc @losipiuk @electrum

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The results for testRestartBeforeQuery were much worse before changes in this PR.
The worker crash pretty deterministically leads to next query hanging indefinitely.

that's was just observation mistake.
my test query was simply never completing (very long), but executing just fine.

@findepi findepi force-pushed the findepi/reject-task-updates-addressed-to-previous-failed-node-c1e964 branch 2 times, most recently from 7bc02a7 to f8d2826 Compare May 8, 2024 13:17
@findepi findepi force-pushed the findepi/reject-task-updates-addressed-to-previous-failed-node-c1e964 branch 7 times, most recently from 7719662 to af33656 Compare May 9, 2024 10:53
Propagate Airlift's `NodeInfo.instanceId` to Trino's `InternalNode`.
The instance ID allows to identify node restarts.
In case of a worker node failure, a new node can be started with same
IP address. Once the new node is done initializing and registers with
discovery service, the coordinator will eventually be aware of it and
use it for processing queries. Before then, and before coordinator is
aware of node failure, it may try to reach the old failed node. Since
the old and the new may have same IP address, coordinator may effective
talk to uninitialized worker node as if it was ready to service queries.
This may lead to failures such as "unknown catalog" or rather "unknown
handle id", when decoding task update requests.

Prevent such hard to understand failures by passing node ID in task
update requests. Provided that the new worker is started with a
different node ID, these requests will be cleanly rejected without
trying to service them.
@findepi findepi force-pushed the findepi/reject-task-updates-addressed-to-previous-failed-node-c1e964 branch from af33656 to 4242f87 Compare May 10, 2024 13:40
@findepi
Copy link
Member Author

findepi commented May 10, 2024

#21921 is a simpler alternative that avoids the problem described here: #21744 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bigquery BigQuery connector cla-signed delta-lake Delta Lake connector hive Hive connector hudi Hudi connector iceberg Iceberg connector mongodb MongoDB connector
Development

Successfully merging this pull request may close these issues.

Failed worker doesn't handle requests properly on recovery
8 participants