Skip to content

Commit

Permalink
[FLINK-23806][runtime] Avoid StackOverflowException when a large scal…
Browse files Browse the repository at this point in the history
…e job failed to acquire enough slots in time

This closes #16842.
  • Loading branch information
zhuzhurk committed Aug 19, 2021
1 parent 02f6ca4 commit f543e9a
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,10 @@ private void restartTasks(
}

private CompletableFuture<?> cancelTasksAsync(final Set<ExecutionVertexID> verticesToRestart) {
// clean up all the related pending requests to avoid that immediately returned slot
// is used to fulfill the pending requests of these tasks
verticesToRestart.stream().forEach(executionSlotAllocator::cancel);

final List<CompletableFuture<?>> cancelFutures =
verticesToRestart.stream()
.map(this::cancelExecutionVertex)
Expand All @@ -337,7 +341,6 @@ private CompletableFuture<?> cancelExecutionVertex(final ExecutionVertexID execu

notifyCoordinatorOfCancellation(vertex);

executionSlotAllocator.cancel(executionVertexId);
return executionVertexOperations.cancel(vertex);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -1049,6 +1051,56 @@ public void allocationIsCanceledWhenVertexIsFailedOrCanceled() throws Exception
assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), hasSize(0));
}

@Test
public void pendingSlotRequestsOfVerticesToRestartWillNotBeFulfilledByReturnedSlots()
throws Exception {
final int parallelism = 10;
final JobGraph jobGraph = sourceSinkJobGraph(parallelism);
testExecutionSlotAllocator.disableAutoCompletePendingRequests();
testExecutionSlotAllocator.enableCompletePendingRequestsWithReturnedSlots();

final DefaultScheduler scheduler =
createScheduler(
jobGraph,
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
new PipelinedRegionSchedulingStrategy.Factory(),
new RestartAllFailoverStrategy.Factory());
scheduler.startScheduling();

final ExecutionVertex ev1 =
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0);

final Set<CompletableFuture<LogicalSlot>> pendingLogicalSlotFutures =
testExecutionSlotAllocator.getPendingRequests().values().stream()
.map(SlotExecutionVertexAssignment::getLogicalSlotFuture)
.collect(Collectors.toSet());
assertThat(pendingLogicalSlotFutures, hasSize(parallelism * 2));

testExecutionSlotAllocator.completePendingRequest(ev1.getID());
assertThat(
pendingLogicalSlotFutures.stream().filter(CompletableFuture::isDone).count(),
is(1L));

final String exceptionMessage = "expected exception";
scheduler.updateTaskExecutionState(
new TaskExecutionState(
ev1.getCurrentExecutionAttempt().getAttemptId(),
ExecutionState.FAILED,
new RuntimeException(exceptionMessage)));

assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), hasSize(0));

// the failed task will return its slot before triggering failover. And the slot
// will be returned and re-assigned to another task which is waiting for a slot.
// failover will be triggered after that and the re-assigned slot will be returned
// once the attached task is canceled, but the slot will not be assigned to other
// tasks which are identified to be restarted soon.
assertThat(testExecutionSlotAllocator.getReturnedSlots(), hasSize(2));
assertThat(
pendingLogicalSlotFutures.stream().filter(CompletableFuture::isCancelled).count(),
is(parallelism * 2L - 2L));
}

@Test
public void testExceptionHistoryWithGlobalFailOver() {
final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
Expand Down Expand Up @@ -1506,6 +1558,21 @@ private static JobGraph nonParallelSourceSinkJobGraph() {
return JobGraphTestUtils.streamingJobGraph(source, sink);
}

private static JobGraph sourceSinkJobGraph(final int parallelism) {
final JobVertex source = new JobVertex("source");
source.setParallelism(parallelism);
source.setInvokableClass(NoOpInvokable.class);

final JobVertex sink = new JobVertex("sink");
sink.setParallelism(parallelism);
sink.setInvokableClass(NoOpInvokable.class);

sink.connectNewDataSetAsInput(
source, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);

return JobGraphTestUtils.streamingJobGraph(source, sink);
}

private static JobVertex getOnlyJobVertex(final JobGraph jobGraph) {
final List<JobVertex> sortedVertices = jobGraph.getVerticesSortedTopologicallyFromSources();
Preconditions.checkState(sortedVertices.size() == 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public class TestExecutionSlotAllocator implements ExecutionSlotAllocator, SlotO

private boolean autoCompletePendingRequests = true;

private boolean completePendingRequestsWithReturnedSlots = false;

private final List<LogicalSlot> returnedSlots = new ArrayList<>();

public TestExecutionSlotAllocator() {
Expand Down Expand Up @@ -133,6 +135,10 @@ public void disableAutoCompletePendingRequests() {
autoCompletePendingRequests = false;
}

public void enableCompletePendingRequestsWithReturnedSlots() {
completePendingRequestsWithReturnedSlots = true;
}

@Override
public void cancel(final ExecutionVertexID executionVertexId) {
final SlotExecutionVertexAssignment slotVertexAssignment =
Expand All @@ -145,6 +151,19 @@ public void cancel(final ExecutionVertexID executionVertexId) {
@Override
public void returnLogicalSlot(final LogicalSlot logicalSlot) {
returnedSlots.add(logicalSlot);

if (completePendingRequestsWithReturnedSlots) {
if (pendingRequests.size() > 0) {
// logical slots are not re-usable, creating a new one instead.
final LogicalSlot slot =
logicalSlotBuilder.setSlotOwner(this).createTestingLogicalSlot();

final SlotExecutionVertexAssignment slotVertexAssignment =
pendingRequests.remove(pendingRequests.keySet().stream().findAny().get());

slotVertexAssignment.getLogicalSlotFuture().complete(slot);
}
}
}

public List<LogicalSlot> getReturnedSlots() {
Expand Down

0 comments on commit f543e9a

Please sign in to comment.