Skip to content

Commit

Permalink
fix(engine): complete closeFuture when streamprocessor fails
Browse files Browse the repository at this point in the history
When the actor fails the closeFuture is completed, so any caller that waits for the close is not blocked for ever.
  • Loading branch information
deepthidevaki committed Feb 10, 2022
1 parent 69becfd commit e3e7ab2
Showing 1 changed file with 4 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public class StreamProcessor extends Actor implements HealthMonitorable, LogReco
private volatile Phase phase = Phase.INITIAL;

private CompletableActorFuture<Void> openFuture;
private CompletableActorFuture<Void> closeFuture = CompletableActorFuture.completed(null);
private final CompletableActorFuture<Void> closeFuture = new CompletableActorFuture<>();
private volatile long lastTickTime;
private boolean shouldProcess = true;
private ActorFuture<LastProcessingPositions> replayCompletedFuture;
Expand Down Expand Up @@ -190,10 +190,8 @@ protected void onActorCloseRequested() {

@Override
public ActorFuture<Void> closeAsync() {
if (isOpened.compareAndSet(true, false)) {
closeFuture = new CompletableActorFuture<>();
actor.close();
}
isOpened.set(false);
actor.close();
return closeFuture;
}

Expand All @@ -205,10 +203,10 @@ protected void handleFailure(final Throwable failure) {
@Override
public void onActorFailed() {
phase = Phase.FAILED;
closeFuture = CompletableActorFuture.completed(null);
isOpened.set(false);
lifecycleAwareListeners.forEach(StreamProcessorLifecycleAware::onFailed);
tearDown();
closeFuture.complete(null);
}

private boolean shouldProcessNext() {
Expand Down

0 comments on commit e3e7ab2

Please sign in to comment.