diff --git a/core/src/main/java/dev/failsafe/internal/BulkheadImpl.java b/core/src/main/java/dev/failsafe/internal/BulkheadImpl.java index 5c85fc3e..477532ef 100644 --- a/core/src/main/java/dev/failsafe/internal/BulkheadImpl.java +++ b/core/src/main/java/dev/failsafe/internal/BulkheadImpl.java @@ -99,8 +99,10 @@ public synchronized void releasePermit() { if (permits < maxPermits) { permits += 1; CompletableFuture future = futures.pollFirst(); - if (future != null) + if (future != null){ + permits -= 1; future.complete(null); + } } } diff --git a/core/src/test/java/dev/failsafe/functional/BulkheadTest.java b/core/src/test/java/dev/failsafe/functional/BulkheadTest.java index b7bf1358..0f27f5ad 100644 --- a/core/src/test/java/dev/failsafe/functional/BulkheadTest.java +++ b/core/src/test/java/dev/failsafe/functional/BulkheadTest.java @@ -18,10 +18,15 @@ import dev.failsafe.Bulkhead; import dev.failsafe.BulkheadFullException; import dev.failsafe.Failsafe; +import dev.failsafe.FailsafeExecutor; +import dev.failsafe.function.CheckedRunnable; import dev.failsafe.testing.Testing; +import org.testng.Assert; import org.testng.annotations.Test; import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; /** * Tests various Bulkhead scenarios. @@ -46,6 +51,27 @@ public void testPermitAcquiredAfterWait() { }, "test"); } + public void testPermitAcquiredAfterWaitWithLargeQueue(){ + Bulkhead bulkhead = Bulkhead.builder(1).withMaxWaitTime(Duration.ofSeconds(15)).build(); + FailsafeExecutor exec = Failsafe.with(bulkhead); + CompletableFuture[] tasks = new CompletableFuture[10]; + for(int i = 0; i < tasks.length; i++){ + int index = i; + CheckedRunnable sleep = () -> { + ignoreExceptions(() ->{ + System.out.println("Running sleep task " + (index + 1)); + TimeUnit.MILLISECONDS.sleep(10); + System.out.println("Finished sleep task " + (index + 1)); + }); + }; + CompletableFuture task = exec.runAsync(sleep); + task.whenComplete((r, ex) -> Assert.assertNull(ex)); + tasks[i] = task; + } + + CompletableFuture.allOf(tasks).join(); + } + public void shouldThrowBulkheadFullExceptionAfterPermitsExceeded() { // Given Bulkhead bulkhead = Bulkhead.of(2);