Skip to content

Commit

Permalink
Add ADAPTIVE_V2 retry mode to support the legacy behavior (#5123)
Browse files Browse the repository at this point in the history
* Add a new ADAPTIVE2 mode to support the legacy behavior

* Fix dynamodb test to use adaptive2 mode

* Fixes and tests for the expected behaviors

* Rename the new adaptive mode to ADAPTIVE_V2

* More fixes related to the rename from adaptive2 to adaptive_v2

* Fix dynamodb retry resolver logic for adaptive mode

* Properly clean up the test state

* Address PR comments
  • Loading branch information
sugmanue committed May 1, 2024
1 parent c21eeed commit 67b7844
Show file tree
Hide file tree
Showing 11 changed files with 866 additions and 37 deletions.
Expand Up @@ -18,6 +18,7 @@
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.awscore.internal.AwsErrorCode;
import software.amazon.awssdk.core.internal.retry.RetryPolicyAdapter;
import software.amazon.awssdk.core.internal.retry.SdkDefaultRetryStrategy;
import software.amazon.awssdk.core.retry.RetryMode;
import software.amazon.awssdk.retries.AdaptiveRetryStrategy;
Expand Down Expand Up @@ -54,10 +55,12 @@ private AwsRetryStrategy() {
switch (mode) {
case STANDARD:
return standardRetryStrategy();
case ADAPTIVE:
case ADAPTIVE_V2:
return adaptiveRetryStrategy();
case LEGACY:
return legacyRetryStrategy();
case ADAPTIVE:
return legacyAdaptiveRetryStrategy();
default:
throw new IllegalArgumentException("unknown retry mode: " + mode);
}
Expand All @@ -84,7 +87,6 @@ private AwsRetryStrategy() {
return DefaultRetryStrategy.none();
}


/**
* Returns a {@link StandardRetryStrategy} with AWS-specific conditions added.
*
Expand Down Expand Up @@ -121,8 +123,8 @@ public static AdaptiveRetryStrategy adaptiveRetryStrategy() {
* Configures a retry strategy using its builder to add AWS-specific retry exceptions.
*
* @param builder The builder to add the AWS-specific retry exceptions
* @param <T> The type of the builder extending {@link RetryStrategy.Builder}
* @return The given builder
* @param <T> The type of the builder extending {@link RetryStrategy.Builder}
*/
public static <T extends RetryStrategy.Builder<T, ?>> T configure(T builder) {
return builder.retryOnException(AwsRetryStrategy::retryOnAwsRetryableErrors);
Expand All @@ -135,6 +137,9 @@ public static AdaptiveRetryStrategy adaptiveRetryStrategy() {
* @return The given builder
*/
public static RetryStrategy.Builder<?, ?> configureStrategy(RetryStrategy.Builder<?, ?> builder) {
if (builder instanceof RetryPolicyAdapter.Builder) {
return builder;
}
return builder.retryOnException(AwsRetryStrategy::retryOnAwsRetryableErrors);
}

Expand All @@ -145,4 +150,16 @@ private static boolean retryOnAwsRetryableErrors(Throwable ex) {
}
return false;
}

/**
* Returns a {@link RetryStrategy<?, ?>} that implements the legacy {@link RetryMode#ADAPTIVE} mode.
*
* @return a {@link RetryStrategy<?, ?>} that implements the legacy {@link RetryMode#ADAPTIVE} mode.
*/
private static RetryStrategy<?, ?> legacyAdaptiveRetryStrategy() {
return RetryPolicyAdapter.builder()
.retryPolicy(AwsRetryPolicy.forRetryMode(RetryMode.ADAPTIVE))
.build();
}

}
Expand Up @@ -60,7 +60,6 @@ public final class RetryableStageHelper2 {
public static final String SDK_RETRY_INFO_HEADER = "amz-sdk-request";
private final SdkHttpFullRequest request;
private final RequestExecutionContext context;
private final RetryPolicy retryPolicy;
private RetryPolicyAdapter retryPolicyAdapter;
private final RetryStrategy<?, ?> retryStrategy;
private final HttpClientDependencies dependencies;
Expand All @@ -74,8 +73,16 @@ public RetryableStageHelper2(SdkHttpFullRequest request,
HttpClientDependencies dependencies) {
this.request = request;
this.context = context;
this.retryPolicy = dependencies.clientConfiguration().option(SdkClientOption.RETRY_POLICY);
this.retryStrategy = dependencies.clientConfiguration().option(SdkClientOption.RETRY_STRATEGY);
RetryPolicy retryPolicy = dependencies.clientConfiguration().option(SdkClientOption.RETRY_POLICY);
RetryStrategy<?, ?> retryStrategy = dependencies.clientConfiguration().option(SdkClientOption.RETRY_STRATEGY);
if (retryPolicy != null) {
retryPolicyAdapter = RetryPolicyAdapter.builder()
.retryPolicy(retryPolicy)
.build();
} else if (retryStrategy instanceof RetryPolicyAdapter) {
retryPolicyAdapter = (RetryPolicyAdapter) retryStrategy;
}
this.retryStrategy = retryStrategy;
this.dependencies = dependencies;
}

Expand Down Expand Up @@ -256,15 +263,14 @@ private int retriesAttemptedSoFar() {
* calling code.
*/
private RetryStrategy<?, ?> retryStrategy() {
if (retryPolicy != null) {
if (retryPolicyAdapter == null) {
retryPolicyAdapter = RetryPolicyAdapter.builder()
.retryPolicy(this.retryPolicy)
if (retryPolicyAdapter != null) {
if (retryPolicyAdapter.isInitialized()) {
retryPolicyAdapter = retryPolicyAdapter.toBuilder()
.retryPolicyContext(retryPolicyContext())
.build();
} else {
retryPolicyAdapter = retryPolicyAdapter.toBuilder()
.retryPolicyContext(retryPolicyContext())
.initialize(retryPolicyContext())
.build();
}
return retryPolicyAdapter;
Expand Down
Expand Up @@ -42,25 +42,26 @@
*/
@SdkInternalApi
public final class RetryPolicyAdapter implements RetryStrategy<RetryPolicyAdapter.Builder, RetryPolicyAdapter> {

private final RetryPolicy retryPolicy;
private final RetryPolicyContext retryPolicyContext;
private final RateLimitingTokenBucket rateLimitingTokenBucket;

private RetryPolicyAdapter(Builder builder) {
this.retryPolicy = Validate.paramNotNull(builder.retryPolicy, "retryPolicy");
this.retryPolicyContext = Validate.paramNotNull(builder.retryPolicyContext, "retryPolicyContext");
this.retryPolicyContext = builder.retryPolicyContext;
this.rateLimitingTokenBucket = builder.rateLimitingTokenBucket;
}

@Override
public AcquireInitialTokenResponse acquireInitialToken(AcquireInitialTokenRequest request) {
validateState();
RetryPolicyAdapterToken token = new RetryPolicyAdapterToken(request.scope());
return AcquireInitialTokenResponse.create(token, rateLimitingTokenAcquire());
}

@Override
public RefreshRetryTokenResponse refreshRetryToken(RefreshRetryTokenRequest request) {
validateState();
RetryPolicyAdapterToken token = getToken(request.token());
boolean willRetry = retryPolicy.aggregateRetryCondition().shouldRetry(retryPolicyContext);
if (!willRetry) {
Expand All @@ -73,6 +74,7 @@ public RefreshRetryTokenResponse refreshRetryToken(RefreshRetryTokenRequest requ

@Override
public RecordSuccessResponse recordSuccess(RecordSuccessRequest request) {
validateState();
RetryPolicyAdapterToken token = getToken(request.token());
retryPolicy.aggregateRetryCondition().requestSucceeded(retryPolicyContext);
return RecordSuccessResponse.create(token);
Expand All @@ -88,6 +90,16 @@ public Builder toBuilder() {
return new Builder(this);
}

public boolean isInitialized() {
return retryPolicyContext != null;
}

void validateState() {
if (retryPolicyContext == null) {
throw new IllegalStateException("This RetryPolicyAdapter instance has not been initialized.");
}
}

RetryPolicyAdapterToken getToken(RetryToken token) {
return Validate.isInstanceOf(RetryPolicyAdapterToken.class, token, "Object of class %s was not created by this retry "
+ "strategy", token.getClass().getName());
Expand Down Expand Up @@ -146,7 +158,6 @@ public static class Builder implements RetryStrategy.Builder<RetryPolicyAdapter.
private RateLimitingTokenBucket rateLimitingTokenBucket;

private Builder() {
rateLimitingTokenBucket = new RateLimitingTokenBucket();
}

private Builder(RetryPolicyAdapter adapter) {
Expand All @@ -162,7 +173,7 @@ public Builder retryOnException(Predicate<Throwable> shouldRetry) {

@Override
public Builder maxAttempts(int maxAttempts) {
throw new UnsupportedOperationException("RetryPolicyAdapter does not support calling retryOnException");
throw new UnsupportedOperationException("RetryPolicyAdapter does not support calling maxAttempts");
}

@Override
Expand All @@ -175,13 +186,14 @@ public Builder retryPolicy(RetryPolicy retryPolicy) {
return this;
}

public Builder rateLimitingTokenBucket(RateLimitingTokenBucket rateLimitingTokenBucket) {
this.rateLimitingTokenBucket = rateLimitingTokenBucket;
public Builder retryPolicyContext(RetryPolicyContext retryPolicyContext) {
this.retryPolicyContext = retryPolicyContext;
return this;
}

public Builder retryPolicyContext(RetryPolicyContext retryPolicyContext) {
public Builder initialize(RetryPolicyContext retryPolicyContext) {
this.retryPolicyContext = retryPolicyContext;
this.rateLimitingTokenBucket = new RateLimitingTokenBucket();
return this;
}

Expand Down
Expand Up @@ -19,6 +19,7 @@
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.core.exception.SdkServiceException;
import software.amazon.awssdk.core.retry.RetryMode;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.core.retry.RetryUtils;
import software.amazon.awssdk.retries.AdaptiveRetryStrategy;
import software.amazon.awssdk.retries.DefaultRetryStrategy;
Expand Down Expand Up @@ -55,6 +56,8 @@ private SdkDefaultRetryStrategy() {
case STANDARD:
return standardRetryStrategy();
case ADAPTIVE:
return legacyAdaptiveRetryStrategy();
case ADAPTIVE_V2:
return adaptiveRetryStrategy();
case LEGACY:
return legacyRetryStrategy();
Expand All @@ -74,11 +77,14 @@ public static RetryMode retryMode(RetryStrategy<?, ?> retryStrategy) {
return RetryMode.STANDARD;
}
if (retryStrategy instanceof AdaptiveRetryStrategy) {
return RetryMode.ADAPTIVE;
return RetryMode.ADAPTIVE_V2;
}
if (retryStrategy instanceof LegacyRetryStrategy) {
return RetryMode.LEGACY;
}
if (retryStrategy instanceof RetryPolicyAdapter) {
return RetryMode.ADAPTIVE;
}
throw new IllegalArgumentException("unknown retry strategy class: " + retryStrategy.getClass().getName());
}

Expand Down Expand Up @@ -193,4 +199,16 @@ private static boolean retryOnThrottlingCondition(Throwable ex) {
}
return false;
}

/**
* Returns a {@link RetryStrategy<?, ?>} that implements the legacy {@link RetryMode#ADAPTIVE} mode.
*
* @return a {@link RetryStrategy<?, ?>} that implements the legacy {@link RetryMode#ADAPTIVE} mode.
*/
private static RetryStrategy<?, ?> legacyAdaptiveRetryStrategy() {
return RetryPolicyAdapter.builder()
.retryPolicy(RetryPolicy.forRetryMode(RetryMode.ADAPTIVE))
.build();
}
}

Expand Up @@ -73,7 +73,7 @@ public enum RetryMode {
STANDARD,

/**
* Adaptive retry mode builds on {@code STANDARD} mode.
* Adaptive retry mode builds on {@link #STANDARD} mode.
* <p>
* Adaptive retry mode dynamically limits the rate of AWS requests to maximize success rate. This may be at the
* expense of request latency. Adaptive retry mode is not recommended when predictable latency is important.
Expand All @@ -84,9 +84,31 @@ public enum RetryMode {
* the same client. When using adaptive retry mode, we recommend using a single client per resource.
*
* @see RetryPolicy#isFastFailRateLimiting()
* @deprecated As of 2.25.xx, replaced by {@link #ADAPTIVE_V2}. The ADAPTIVE implementation has a bug that prevents it
* from remembering its state across requests which is needed to correctly estimate its sending rate. Given that
* this bug has been present since its introduction and that correct version might change the traffic patterns of the SDK we
* deemed too risky to fix this implementation.
*/
@Deprecated
ADAPTIVE,

/**
* Adaptive V2 retry mode builds on {@link #STANDARD} mode.
* <p>
* Adaptive retry mode qdynamically limits the rate of AWS requests to maximize success rate. This may be at the
* expense of request latency. Adaptive V2 retry mode is not recommended when predictable latency is important.
* <p>
* {@code ADAPTIVE_V2} mode differs from {@link #ADAPTIVE} mode in the computed delays between calls, including the first
* attempt
* that might be delayed if the algorithm considers that it's needed to increase the odds of a successful response.
* <p>
* <b>Warning:</b> Adaptive V2 retry mode assumes that the client is working against a single resource (e.g. one
* DynamoDB Table or one S3 Bucket). If you use a single client for multiple resources, throttling or outages
* associated with one resource will result in increased latency and failures when accessing all other resources via
* the same client. When using adaptive retry mode, we recommend using a single client per resource.
*/
ADAPTIVE_V2,

;

/**
Expand Down Expand Up @@ -176,6 +198,8 @@ private static Optional<RetryMode> fromString(String string) {
return Optional.of(STANDARD);
case "adaptive":
return Optional.of(ADAPTIVE);
case "adaptive_v2":
return Optional.of(ADAPTIVE_V2);
default:
throw new IllegalStateException("Unsupported retry policy mode configured: " + string);
}
Expand Down
Expand Up @@ -372,6 +372,10 @@ private static final class BuilderImpl implements Builder {
private Boolean fastFailRateLimiting;

private BuilderImpl(RetryMode retryMode) {
if (retryMode == RetryMode.ADAPTIVE_V2) {
throw new UnsupportedOperationException("ADAPTIVE_V2 is not supported by retry policies, use a RetryStrategy "
+ "instead");
}
this.retryMode = retryMode;
this.numRetries = SdkDefaultRetrySetting.maxAttempts(retryMode) - 1;
this.additionalRetryConditionsAllowed = true;
Expand Down
Expand Up @@ -23,6 +23,7 @@
import software.amazon.awssdk.awscore.retry.AwsRetryStrategy;
import software.amazon.awssdk.core.client.config.SdkClientConfiguration;
import software.amazon.awssdk.core.client.config.SdkClientOption;
import software.amazon.awssdk.core.internal.retry.RetryPolicyAdapter;
import software.amazon.awssdk.core.internal.retry.SdkDefaultRetrySetting;
import software.amazon.awssdk.core.retry.RetryMode;
import software.amazon.awssdk.core.retry.RetryPolicy;
Expand Down Expand Up @@ -76,18 +77,8 @@ public static RetryPolicy resolveRetryPolicy(SdkClientConfiguration config) {
return configuredRetryPolicy;
}

RetryMode retryMode = RetryMode.resolver()
.profileFile(config.option(SdkClientOption.PROFILE_FILE_SUPPLIER))
.profileName(config.option(SdkClientOption.PROFILE_NAME))
.defaultRetryMode(config.option(SdkClientOption.DEFAULT_RETRY_MODE))
.resolve();

return AwsRetryPolicy.forRetryMode(retryMode)
.toBuilder()
.additionalRetryConditionsAllowed(false)
.numRetries(MAX_ERROR_RETRY)
.backoffStrategy(BACKOFF_STRATEGY)
.build();
RetryMode retryMode = resolveRetryMode(config);
return retryPolicyFor(retryMode);
}

public static RetryStrategy<?, ?> resolveRetryStrategy(SdkClientConfiguration config) {
Expand All @@ -96,16 +87,35 @@ public static RetryPolicy resolveRetryPolicy(SdkClientConfiguration config) {
return configuredRetryStrategy;
}

RetryMode retryMode = RetryMode.resolver()
.profileFile(config.option(SdkClientOption.PROFILE_FILE_SUPPLIER))
.profileName(config.option(SdkClientOption.PROFILE_NAME))
.defaultRetryMode(config.option(SdkClientOption.DEFAULT_RETRY_MODE))
.resolve();
RetryMode retryMode = resolveRetryMode(config);

if (retryMode == RetryMode.ADAPTIVE) {
return RetryPolicyAdapter.builder()
.retryPolicy(retryPolicyFor(retryMode))
.build();
}

return AwsRetryStrategy.forRetryMode(retryMode)
.toBuilder()
.maxAttempts(MAX_ATTEMPTS)
.backoffStrategy(exponentialDelay(BASE_DELAY, SdkDefaultRetrySetting.MAX_BACKOFF))
.build();
}

private static RetryPolicy retryPolicyFor(RetryMode retryMode) {
return AwsRetryPolicy.forRetryMode(retryMode)
.toBuilder()
.additionalRetryConditionsAllowed(false)
.numRetries(MAX_ERROR_RETRY)
.backoffStrategy(BACKOFF_STRATEGY)
.build();
}

private static RetryMode resolveRetryMode(SdkClientConfiguration config) {
return RetryMode.resolver()
.profileFile(config.option(SdkClientOption.PROFILE_FILE_SUPPLIER))
.profileName(config.option(SdkClientOption.PROFILE_NAME))
.defaultRetryMode(config.option(SdkClientOption.DEFAULT_RETRY_MODE))
.resolve();
}
}

0 comments on commit 67b7844

Please sign in to comment.