Skip to content

Commit

Permalink
Add a Bulkhead policy
Browse files Browse the repository at this point in the history
  • Loading branch information
jhalterman committed Dec 23, 2021
1 parent 0b5d1b4 commit 9c1e034
Show file tree
Hide file tree
Showing 12 changed files with 612 additions and 3 deletions.
10 changes: 8 additions & 2 deletions CHANGELOG.md
@@ -1,8 +1,14 @@
# 3.1
# 3.2.0

### Improvements

- Issue #308 - Introduce a `RateLimiter` policy.
- Issue #309 - Introduced a `Bulkhead` policy.

# 3.1.0

### Improvements

- Issue #308 - Introduced a `RateLimiter` policy.

# 3.0.2

Expand Down
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -6,7 +6,7 @@
[![JavaDoc](https://img.shields.io/maven-central/v/dev.failsafe/failsafe.svg?maxAge=60&label=javadoc)](https://failsafe.dev/javadoc/)
[![Join the chat at https://gitter.im/jhalterman/failsafe](https://badges.gitter.im/jhalterman/failsafe.svg)](https://gitter.im/jhalterman/failsafe)

Failsafe is a lightweight, zero-dependency library for handling failures in Java 8+, with a concise API for handling everyday use cases and the flexibility to handle everything else. It works by wrapping executable logic with one or more resilience policies, which can be combined and composed as needed. Current policies include [Retry](https://failsafe.dev/retry/), [CircuitBreaker](https://failsafe.dev/circuit-breaker/), [RateLimiter](https://failsafe.dev/rate-limiter/), [Timeout](https://failsafe.dev/timeout/), and [Fallback](https://failsafe.dev/fallback/).
Failsafe is a lightweight, zero-dependency library for handling failures in Java 8+, with a concise API for handling everyday use cases and the flexibility to handle everything else. It works by wrapping executable logic with one or more resilience policies, which can be combined and composed as needed. Current policies include [Retry](https://failsafe.dev/retry/), [CircuitBreaker](https://failsafe.dev/circuit-breaker/), [RateLimiter](https://failsafe.dev/rate-limiter/), [Timeout](https://failsafe.dev/timeout/), [Bulkhead](https://failsafe.dev/bulkhead/), and [Fallback](https://failsafe.dev/fallback/).

## Usage

Expand Down
120 changes: 120 additions & 0 deletions src/main/java/dev/failsafe/Bulkhead.java
@@ -0,0 +1,120 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
package dev.failsafe;

import dev.failsafe.internal.BulkheadImpl;

import java.time.Duration;

/**
* A bulkhead allows you to restrict concurrent executions as a way of preventing system overload.
* <p>
* This class is threadsafe.
* </p>
*
* @param <R> result type
* @author Jonathan Halterman
* @see BulkheadConfig
* @see BulkheadBuilder
* @see BulkheadFullException
*/
public interface Bulkhead<R> extends Policy<R> {
/**
* Returns a Bulkhead for the {@code maxConcurrency} that has {@link BulkheadBuilder#withMaxWaitTime(Duration) zero
* wait} and is {@link BulkheadBuilder#withFairness() not fair} by default.
*
* @param maxConcurrency controls the max concurrent executions that are permitted within the bulkhead
*/
static <R> BulkheadBuilder<R> builder(int maxConcurrency) {
return new BulkheadBuilder<>(maxConcurrency);
}

/**
* Creates a new BulkheadBuilder that will be based on the {@code config}. The built bulkhead is {@link
* BulkheadBuilder#withFairness() not fair} by default.
*/
static <R> BulkheadBuilder<R> builder(BulkheadConfig<R> config) {
return new BulkheadBuilder<>(config);
}

/**
* Returns a Bulkhead for the {@code maxConcurrency} that has {@link BulkheadBuilder#withMaxWaitTime(Duration) zero
* wait} and is {@link BulkheadBuilder#withFairness() not fair} by default. Alias for {@code
* Bulkhead.builder(maxConcurrency).build()}. To configure additional options on a Bulkhead, use {@link #builder(int)}
* instead.
*
* @param maxConcurrency controls the max concurrent executions that are permitted within the bulkhead
* @see #builder(int)
*/
static <R> Bulkhead<R> of(int maxConcurrency) {
return new BulkheadImpl<>(new BulkheadConfig<>(maxConcurrency));
}

/**
* Returns the {@link BulkheadConfig} that the Bulkhead was built with.
*/
@Override
BulkheadConfig<R> getConfig();

/**
* Attempts to acquire a permit to perform an execution against within the bulkhead, waiting until one is available or
* the thread is interrupted. After execution is complete, the permit should be {@link #releasePermit() released} back
* to the bulkhead.
*
* @throws InterruptedException if the current thread is interrupted while waiting to acquire a permit
* @see #tryAcquirePermit()
*/
void acquirePermit() throws InterruptedException;

/**
* Attempts to acquire a permit to perform an execution within the bulkhead, waiting up to the {@code maxWaitTime}
* until one is available, else throwing {@link BulkheadFullException} if a permit will not be available in time.
* After execution is complete, the permit should be {@link #releasePermit() released} back to the bulkhead.
*
* @throws NullPointerException if {@code maxWaitTime} is null
* @throws BulkheadFullException if the bulkhead cannot acquire a permit within the {@code maxWaitTime}
* @throws InterruptedException if the current thread is interrupted while waiting to acquire a permit
* @see #tryAcquirePermit(Duration)
*/
default void acquirePermit(Duration maxWaitTime) throws InterruptedException {
if (!tryAcquirePermit(maxWaitTime))
throw new BulkheadFullException(this);
}

/**
* Tries to acquire a permit to perform an execution within the bulkhead, returning immediately without waiting. After
* execution is complete, the permit should be {@link #releasePermit() released} back to the bulkhead.
*
* @return whether the requested {@code permits} are successfully acquired or not
*/
boolean tryAcquirePermit();

/**
* Tries to acquire a permit to perform an execution within the bulkhead, waiting up to the {@code maxWaitTime} until
* they are available. After execution is complete, the permit should be {@link #releasePermit() released} back to the
* bulkhead.
*
* @return whether a permit is successfully acquired
* @throws NullPointerException if {@code maxWaitTime} is null
* @throws InterruptedException if the current thread is interrupted while waiting to acquire a permit
*/
boolean tryAcquirePermit(Duration maxWaitTime) throws InterruptedException;

/**
* Releases a permit to execute.
*/
void releasePermit();
}
68 changes: 68 additions & 0 deletions src/main/java/dev/failsafe/BulkheadBuilder.java
@@ -0,0 +1,68 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
package dev.failsafe;

import dev.failsafe.internal.BulkheadImpl;
import dev.failsafe.internal.util.Assert;

import java.time.Duration;

/**
* Builds {@link Bulkhead} instances.
* <p>
* This class is <i>not</i> threadsafe.
* </p>
*
* @param <R> result type
* @author Jonathan Halterman
* @see BulkheadConfig
* @see BulkheadFullException
*/
public class BulkheadBuilder<R> extends PolicyBuilder<BulkheadBuilder<R>, BulkheadConfig<R>, R> {
BulkheadBuilder(int maxConcurrency) {
super(new BulkheadConfig<>(maxConcurrency));
}

BulkheadBuilder(BulkheadConfig<R> config) {
super(new BulkheadConfig<>(config));
}

/**
* Builds a new {@link Bulkhead} using the builder's configuration.
*/
public Bulkhead<R> build() {
return new BulkheadImpl<>(new BulkheadConfig<>(config));
}

/**
* Configures the {@code maxWaitTime} to wait for permits to be available. If permits cannot be acquired before the
* {@code maxWaitTime} is exceeded, then the bulkhead will throw {@link BulkheadFullException}.
*
* @throws NullPointerException if {@code maxWaitTime} is null
*/
public BulkheadBuilder<R> withMaxWaitTime(Duration maxWaitTime) {
config.maxWaitTime = Assert.notNull(maxWaitTime, "maxWaitTime");
return this;
}

/**
* Configures the bulkhead to be fair in permitting waiting execution in order.
*/
public BulkheadBuilder<R> withFairness() {
config.fair = true;
return this;
}
}
70 changes: 70 additions & 0 deletions src/main/java/dev/failsafe/BulkheadConfig.java
@@ -0,0 +1,70 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
package dev.failsafe;

import java.time.Duration;

/**
* Configuration for a {@link Bulkhead}.
*
* @param <R> result type
* @author Jonathan Halterman
*/
public class BulkheadConfig<R> extends PolicyConfig<R> {
int maxConcurrency;
Duration maxWaitTime;
boolean fair;

BulkheadConfig(int maxConcurrency) {
this.maxConcurrency = maxConcurrency;
maxWaitTime = Duration.ZERO;
}

BulkheadConfig(BulkheadConfig<R> config) {
super(config);
maxConcurrency = config.maxConcurrency;
maxWaitTime = config.maxWaitTime;
fair = config.fair;
}

/**
* Returns that max concurrent executions that are permitted within the bulkhead.
*
* @see Bulkhead#builder(int)
*/
public int getMaxConcurrency() {
return maxConcurrency;
}

/**
* Returns the max time to wait for permits to be available. If permits cannot be acquired before the max wait time is
* exceeded, then the bulkhead will throw {@link BulkheadFullException}.
*
* @see BulkheadBuilder#withMaxWaitTime(Duration)
*/
public Duration getMaxWaitTime() {
return maxWaitTime;
}

/**
* Returns whether the Bulkhead is fair in permitting waiting executions in order.
*
* @see BulkheadBuilder#withFairness()
*/
public boolean isFair() {
return fair;
}
}
36 changes: 36 additions & 0 deletions src/main/java/dev/failsafe/BulkheadFullException.java
@@ -0,0 +1,36 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
package dev.failsafe;

/**
* Thrown when an execution is attempted against a {@link Bulkhead} that is full.
*
* @author Jonathan Halterman
*/
public class BulkheadFullException extends FailsafeException {
private static final long serialVersionUID = 1L;

private final Bulkhead<?> bulkhead;

public BulkheadFullException(Bulkhead<?> bulkhead) {
this.bulkhead = bulkhead;
}

/** Returns the {@link Bulkhead} that caused the exception. */
public Bulkhead<?> getBulkhead() {
return bulkhead;
}
}
66 changes: 66 additions & 0 deletions src/main/java/dev/failsafe/internal/BulkheadExecutor.java
@@ -0,0 +1,66 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
package dev.failsafe.internal;

import dev.failsafe.Bulkhead;
import dev.failsafe.BulkheadFullException;
import dev.failsafe.ExecutionContext;
import dev.failsafe.RateLimitExceededException;
import dev.failsafe.spi.ExecutionResult;
import dev.failsafe.spi.PolicyExecutor;

import java.time.Duration;

/**
* A PolicyExecutor that handles failures according to a {@link Bulkhead}.
*
* @param <R> result type
* @author Jonathan Halterman
*/
public class BulkheadExecutor<R> extends PolicyExecutor<R> {
private final BulkheadImpl<R> bulkhead;
private final Duration maxWaitTime;

public BulkheadExecutor(BulkheadImpl<R> bulkhead, int policyIndex) {
super(bulkhead, policyIndex);
this.bulkhead = bulkhead;
maxWaitTime = bulkhead.getConfig().getMaxWaitTime();
}

@Override
protected ExecutionResult<R> preExecute() {
try {
return bulkhead.tryAcquirePermit(maxWaitTime) ?
null :
ExecutionResult.failure(new BulkheadFullException(bulkhead));
} catch (InterruptedException e) {
// Set interrupt flag
Thread.currentThread().interrupt();
return ExecutionResult.failure(e);
}
}

@Override
public void onSuccess(ExecutionResult<R> result) {
bulkhead.releasePermit();
}

@Override
protected ExecutionResult<R> onFailure(ExecutionContext<R> context, ExecutionResult<R> result) {
bulkhead.releasePermit();
return result;
}
}

0 comments on commit 9c1e034

Please sign in to comment.