From ae8bac601acac5ad919fa71a2e1671f2263ac0ea Mon Sep 17 00:00:00 2001 From: nicky9door Date: Sat, 24 Jun 2023 11:41:05 -0400 Subject: [PATCH] Changed BulkheadImpl.releasePermit to ensure that it claims a permit when running a task from the pending queue (#366) * Changed BulkheadImpl.releasePermit to ensure that it claims a permit when running a task from the pending queue Before this change, releasePermit would check the queue but not modify the permit field. This could eventually result in permits being equal to maxPermits and would erroneously report the bulkhead as being full This should fix issue #365 --------- Co-authored-by: Nick Merlo --- .../dev/failsafe/internal/BulkheadImpl.java | 4 ++- .../dev/failsafe/functional/BulkheadTest.java | 26 +++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/dev/failsafe/internal/BulkheadImpl.java b/core/src/main/java/dev/failsafe/internal/BulkheadImpl.java index 5c85fc3e..477532ef 100644 --- a/core/src/main/java/dev/failsafe/internal/BulkheadImpl.java +++ b/core/src/main/java/dev/failsafe/internal/BulkheadImpl.java @@ -99,8 +99,10 @@ public synchronized void releasePermit() { if (permits < maxPermits) { permits += 1; CompletableFuture future = futures.pollFirst(); - if (future != null) + if (future != null){ + permits -= 1; future.complete(null); + } } } diff --git a/core/src/test/java/dev/failsafe/functional/BulkheadTest.java b/core/src/test/java/dev/failsafe/functional/BulkheadTest.java index b7bf1358..0f27f5ad 100644 --- a/core/src/test/java/dev/failsafe/functional/BulkheadTest.java +++ b/core/src/test/java/dev/failsafe/functional/BulkheadTest.java @@ -18,10 +18,15 @@ import dev.failsafe.Bulkhead; import dev.failsafe.BulkheadFullException; import dev.failsafe.Failsafe; +import dev.failsafe.FailsafeExecutor; +import dev.failsafe.function.CheckedRunnable; import dev.failsafe.testing.Testing; +import org.testng.Assert; import org.testng.annotations.Test; import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; /** * Tests various Bulkhead scenarios. @@ -46,6 +51,27 @@ public void testPermitAcquiredAfterWait() { }, "test"); } + public void testPermitAcquiredAfterWaitWithLargeQueue(){ + Bulkhead bulkhead = Bulkhead.builder(1).withMaxWaitTime(Duration.ofSeconds(15)).build(); + FailsafeExecutor exec = Failsafe.with(bulkhead); + CompletableFuture[] tasks = new CompletableFuture[10]; + for(int i = 0; i < tasks.length; i++){ + int index = i; + CheckedRunnable sleep = () -> { + ignoreExceptions(() ->{ + System.out.println("Running sleep task " + (index + 1)); + TimeUnit.MILLISECONDS.sleep(10); + System.out.println("Finished sleep task " + (index + 1)); + }); + }; + CompletableFuture task = exec.runAsync(sleep); + task.whenComplete((r, ex) -> Assert.assertNull(ex)); + tasks[i] = task; + } + + CompletableFuture.allOf(tasks).join(); + } + public void shouldThrowBulkheadFullExceptionAfterPermitsExceeded() { // Given Bulkhead bulkhead = Bulkhead.of(2);