Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulkhead policy may drop requests when maxWaitTime is specified #365

Closed
timothybasanov opened this issue Jun 7, 2023 · 8 comments
Closed
Labels

Comments

@timothybasanov
Copy link

NOT A CONTRIBUTION

Steps:

  • Create a Failsafe executor with a single Bulkhead policy with a concurrency of 1
  • Call getStageAsync 3 times in a row. Make a supplier return a future that is each finished asynchronously in a second

Expected: All three tasks are done one at a time taking a few seconds.
Actual: Two tasks are done successfully. The third one is never called.

Notes: The suspicion is that permits are miscalculated when a task finishes its execution and the permit counter is at a max already.

@jhalterman
Copy link
Member

@timothybasanov Does this test describe the scenario you're seeing?

  public void test() throws Throwable {
    Bulkhead<Object> bulkhead = Bulkhead.of(1);
    List<CompletableFuture<Object>> stages = new ArrayList<>();

    Waiter waiter = new Waiter();
    for (int i = 0; i < 3; i++) {
      stages.add(Failsafe.with(bulkhead).getStageAsync(() -> CompletableFuture.supplyAsync(() -> {
        Testing.sleep(10);
        waiter.resume();
        return null;
      })));
      Testing.sleep(200); // Needed to avoid BulkheadFullException
    }

    waiter.await(1, TimeUnit.MINUTES, 3);
    for (CompletableFuture<Object> stage : stages)
      assertTrue(stage.isDone());
  }

In my testing, the only time a task is not run is when it's rejected with BulkheadFullException, which will happen if the previous task is still running.

@timothybasanov
Copy link
Author

Yep, you're right. I was not specific enough in the original issue. It only happens if a Bulkhead has a wait time set up. With 3 requests 1 second each and a max wait time of 10 seconds I expect all of them to succeed.

@timothybasanov timothybasanov changed the title Bulkhead policy hangs Bulkhead policy may drop requests when maxWaitTime is specified Jun 8, 2023
@Tembrel
Copy link
Contributor

Tembrel commented Jun 8, 2023

This reproduces it as a JUnit test.

package com.example.failsafe;

import dev.failsafe.*;
import java.time.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

import org.junit.*;
import static org.junit.Assert.*;


/**
 * See https://github.com/failsafe-lib/failsafe/issues/365
 */
public class Issue365 {

    static final int NUM_TASKS = 3;
    static final long MAX_WAIT_SECS = 10;
    static final long TASK_SLEEP_SECS = 1;
    static final int CONCURRENCY = 1;

    Bulkhead<Integer> BULKHEAD = Bulkhead.<Integer>builder(CONCURRENCY)
        .withMaxWaitTime(Duration.ofSeconds(MAX_WAIT_SECS))
        .build();

    FailsafeExecutor<Integer> EXEC = Failsafe.with(BULKHEAD);


    @Test public void threeTasks() {
        AtomicInteger count = new AtomicInteger(NUM_TASKS);
        try {
            CompletableFuture<?>[] tasks = new CompletableFuture<?>[NUM_TASKS];
            for (int id = 0; id < NUM_TASKS; ++id) {
                int ID = id;
                tasks[ID] = EXEC.getStageAsync(() ->
                    CompletableFuture.supplyAsync(() -> sleepThenReturn(count, ID)));
            }
            CompletableFuture.allOf(tasks).get();
        } catch (InterruptedException ex) {
            System.out.println("interrupted get");
        } catch (ExecutionException ex) {
            if (ex.getCause() instanceof BulkheadFullException) {
                System.out.printf("Bulkhead full: %s%n", ex.getCause());
            }
        }

        assertEquals(0, count.get());
    }

    private Integer sleepThenReturn(AtomicInteger count, int id) {
        try {
            TimeUnit.SECONDS.sleep(TASK_SLEEP_SECS);
            int remaining = count.decrementAndGet();
            System.out.printf("task %d done sleeping, %d remaining%n", id, remaining);
            return id;
        } catch (InterruptedException ex) {
            System.out.println("interrupted sleep");
            return -1;
        }
    }
}

The assertEquals test fails, and the output is:

task 0 done sleeping, 2 remaining
task 1 done sleeping, 1 remaining
Bulkhead full: dev.failsafe.BulkheadFullException

@nicky9door
Copy link
Contributor

nicky9door commented Jun 22, 2023

I recently came across this issue and think I have found the culprit. In BulkHeadImpl the release permit method looks like this

public synchronized void releasePermit() {
    if (permits < maxPermits) {
      permits += 1;
      CompletableFuture<Void> future = futures.pollFirst();
      if (future != null)
        future.complete(null);
    }
  }

Which I think needs to be changed to this:

public synchronized void releasePermit() {
    if (permits < maxPermits) {
      permits += 1;
      CompletableFuture<Void> future = futures.pollFirst();
      if (future != null){
        permits -= 1;
        future.complete(null);
      }
    }
  }

This ensures that we properly claim the permit when we execute the next pending task. What is currently happening is that each time a job from the queue is completed, the number of permits increments. If there are more jobs in the queue than maxPermits, this results in permits eventually equaling maxPermits and the initial if-statement never fires. By claiming a permit before running the queued task, it should keep the count in order.

@jhalterman
Copy link
Member

Thanks for the test @Tembrel.

@nicky9door Well spotted! Would you like to submit a PR for this?

@jhalterman jhalterman added the bug label Jun 24, 2023
nicky9door pushed a commit to nicky9door/failsafe that referenced this issue Jun 24, 2023
…when running a task from the pending queue

Before this change, releasePermit would check the queue but not modify the permit field. This could eventually
result in permits being equal to maxPermits and would erroneously report the bulkhead as being full

This should fix issue failsafe-lib#365
@nicky9door
Copy link
Contributor

@jhalterman PR #366 submitted. I added a test case based on @Tembrel comment. Wasn't to sure on the correct way the test should be written but it seems to work ok

jhalterman pushed a commit that referenced this issue Jun 24, 2023
…when running a task from the pending queue (#366)

* Changed BulkheadImpl.releasePermit to ensure that it claims a permit when running a task from the pending queue

Before this change, releasePermit would check the queue but not modify the permit field. This could eventually
result in permits being equal to maxPermits and would erroneously report the bulkhead as being full

This should fix issue #365

---------

Co-authored-by: Nick Merlo <nick.merlo@aligntrac.com>
@jhalterman
Copy link
Member

Fixed by #366

@jhalterman
Copy link
Member

This fix has been released in 3.3.2.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants