Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ADAPTIVE_V2 retry mode to support the legacy behavior #5123

Merged
Expand Up @@ -18,6 +18,7 @@
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.awscore.internal.AwsErrorCode;
import software.amazon.awssdk.core.internal.retry.RetryPolicyAdapter;
import software.amazon.awssdk.core.internal.retry.SdkDefaultRetryStrategy;
import software.amazon.awssdk.core.retry.RetryMode;
import software.amazon.awssdk.retries.AdaptiveRetryStrategy;
Expand Down Expand Up @@ -54,10 +55,12 @@ private AwsRetryStrategy() {
switch (mode) {
case STANDARD:
return standardRetryStrategy();
case ADAPTIVE:
case ADAPTIVE_V2:
return adaptiveRetryStrategy();
case LEGACY:
return legacyRetryStrategy();
case ADAPTIVE:
return legacyAdaptiveRetryStrategy();
default:
throw new IllegalArgumentException("unknown retry mode: " + mode);
}
Expand All @@ -84,7 +87,6 @@ private AwsRetryStrategy() {
return DefaultRetryStrategy.none();
}


/**
* Returns a {@link StandardRetryStrategy} with AWS-specific conditions added.
*
Expand Down Expand Up @@ -121,8 +123,8 @@ public static AdaptiveRetryStrategy adaptiveRetryStrategy() {
* Configures a retry strategy using its builder to add AWS-specific retry exceptions.
*
* @param builder The builder to add the AWS-specific retry exceptions
* @param <T> The type of the builder extending {@link RetryStrategy.Builder}
* @return The given builder
* @param <T> The type of the builder extending {@link RetryStrategy.Builder}
*/
public static <T extends RetryStrategy.Builder<T, ?>> T configure(T builder) {
return builder.retryOnException(AwsRetryStrategy::retryOnAwsRetryableErrors);
Expand All @@ -135,6 +137,9 @@ public static AdaptiveRetryStrategy adaptiveRetryStrategy() {
* @return The given builder
*/
public static RetryStrategy.Builder<?, ?> configureStrategy(RetryStrategy.Builder<?, ?> builder) {
if (builder instanceof RetryPolicyAdapter.Builder) {
return builder;
}
return builder.retryOnException(AwsRetryStrategy::retryOnAwsRetryableErrors);
}

Expand All @@ -145,4 +150,16 @@ private static boolean retryOnAwsRetryableErrors(Throwable ex) {
}
return false;
}

/**
* Returns a {@link RetryStrategy<?, ?>} that implements the legacy {@link RetryMode#ADAPTIVE} mode.
*
* @return a {@link RetryStrategy<?, ?>} that implements the legacy {@link RetryMode#ADAPTIVE} mode.
*/
private static RetryStrategy<?, ?> legacyAdaptiveRetryStrategy() {
return RetryPolicyAdapter.builder()
.retryPolicy(AwsRetryPolicy.forRetryMode(RetryMode.ADAPTIVE))
.build();
}

}
Expand Up @@ -60,7 +60,6 @@ public final class RetryableStageHelper2 {
public static final String SDK_RETRY_INFO_HEADER = "amz-sdk-request";
private final SdkHttpFullRequest request;
private final RequestExecutionContext context;
private final RetryPolicy retryPolicy;
private RetryPolicyAdapter retryPolicyAdapter;
private final RetryStrategy<?, ?> retryStrategy;
private final HttpClientDependencies dependencies;
Expand All @@ -74,8 +73,16 @@ public RetryableStageHelper2(SdkHttpFullRequest request,
HttpClientDependencies dependencies) {
this.request = request;
this.context = context;
this.retryPolicy = dependencies.clientConfiguration().option(SdkClientOption.RETRY_POLICY);
this.retryStrategy = dependencies.clientConfiguration().option(SdkClientOption.RETRY_STRATEGY);
RetryPolicy retryPolicy = dependencies.clientConfiguration().option(SdkClientOption.RETRY_POLICY);
RetryStrategy<?, ?> retryStrategy = dependencies.clientConfiguration().option(SdkClientOption.RETRY_STRATEGY);
if (retryPolicy != null) {
retryPolicyAdapter = RetryPolicyAdapter.builder()
.retryPolicy(retryPolicy)
.build();
} else if (retryStrategy instanceof RetryPolicyAdapter) {
retryPolicyAdapter = (RetryPolicyAdapter) retryStrategy;
}
this.retryStrategy = retryStrategy;
this.dependencies = dependencies;
}

Expand Down Expand Up @@ -256,15 +263,14 @@ private int retriesAttemptedSoFar() {
* calling code.
*/
private RetryStrategy<?, ?> retryStrategy() {
if (retryPolicy != null) {
if (retryPolicyAdapter == null) {
retryPolicyAdapter = RetryPolicyAdapter.builder()
.retryPolicy(this.retryPolicy)
if (retryPolicyAdapter != null) {
if (retryPolicyAdapter.isInitialized()) {
retryPolicyAdapter = retryPolicyAdapter.toBuilder()
.retryPolicyContext(retryPolicyContext())
.build();
} else {
retryPolicyAdapter = retryPolicyAdapter.toBuilder()
.retryPolicyContext(retryPolicyContext())
.initialize(retryPolicyContext())
.build();
}
return retryPolicyAdapter;
Expand Down
Expand Up @@ -42,25 +42,26 @@
*/
@SdkInternalApi
public final class RetryPolicyAdapter implements RetryStrategy<RetryPolicyAdapter.Builder, RetryPolicyAdapter> {

private final RetryPolicy retryPolicy;
private final RetryPolicyContext retryPolicyContext;
private final RateLimitingTokenBucket rateLimitingTokenBucket;

private RetryPolicyAdapter(Builder builder) {
this.retryPolicy = Validate.paramNotNull(builder.retryPolicy, "retryPolicy");
this.retryPolicyContext = Validate.paramNotNull(builder.retryPolicyContext, "retryPolicyContext");
this.retryPolicyContext = builder.retryPolicyContext;
this.rateLimitingTokenBucket = builder.rateLimitingTokenBucket;
}

@Override
public AcquireInitialTokenResponse acquireInitialToken(AcquireInitialTokenRequest request) {
validateState();
RetryPolicyAdapterToken token = new RetryPolicyAdapterToken(request.scope());
return AcquireInitialTokenResponse.create(token, rateLimitingTokenAcquire());
}

@Override
public RefreshRetryTokenResponse refreshRetryToken(RefreshRetryTokenRequest request) {
validateState();
RetryPolicyAdapterToken token = getToken(request.token());
boolean willRetry = retryPolicy.aggregateRetryCondition().shouldRetry(retryPolicyContext);
if (!willRetry) {
Expand All @@ -73,6 +74,7 @@ public RefreshRetryTokenResponse refreshRetryToken(RefreshRetryTokenRequest requ

@Override
public RecordSuccessResponse recordSuccess(RecordSuccessRequest request) {
validateState();
RetryPolicyAdapterToken token = getToken(request.token());
retryPolicy.aggregateRetryCondition().requestSucceeded(retryPolicyContext);
return RecordSuccessResponse.create(token);
Expand All @@ -88,6 +90,16 @@ public Builder toBuilder() {
return new Builder(this);
}

public boolean isInitialized() {
return retryPolicyContext != null;
}

void validateState() {
if (retryPolicyContext == null) {
throw new IllegalStateException("This RetryPolicyAdapter instance has not been initialized.");
}
}

RetryPolicyAdapterToken getToken(RetryToken token) {
return Validate.isInstanceOf(RetryPolicyAdapterToken.class, token, "Object of class %s was not created by this retry "
+ "strategy", token.getClass().getName());
Expand Down Expand Up @@ -146,7 +158,6 @@ public static class Builder implements RetryStrategy.Builder<RetryPolicyAdapter.
private RateLimitingTokenBucket rateLimitingTokenBucket;

private Builder() {
rateLimitingTokenBucket = new RateLimitingTokenBucket();
}

private Builder(RetryPolicyAdapter adapter) {
Expand All @@ -162,7 +173,7 @@ public Builder retryOnException(Predicate<Throwable> shouldRetry) {

@Override
public Builder maxAttempts(int maxAttempts) {
throw new UnsupportedOperationException("RetryPolicyAdapter does not support calling retryOnException");
throw new UnsupportedOperationException("RetryPolicyAdapter does not support calling maxAttempts");
}

@Override
Expand All @@ -175,13 +186,14 @@ public Builder retryPolicy(RetryPolicy retryPolicy) {
return this;
}

public Builder rateLimitingTokenBucket(RateLimitingTokenBucket rateLimitingTokenBucket) {
this.rateLimitingTokenBucket = rateLimitingTokenBucket;
public Builder retryPolicyContext(RetryPolicyContext retryPolicyContext) {
this.retryPolicyContext = retryPolicyContext;
return this;
}

public Builder retryPolicyContext(RetryPolicyContext retryPolicyContext) {
public Builder initialize(RetryPolicyContext retryPolicyContext) {
this.retryPolicyContext = retryPolicyContext;
this.rateLimitingTokenBucket = new RateLimitingTokenBucket();
return this;
}

Expand Down
Expand Up @@ -19,6 +19,7 @@
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.core.exception.SdkServiceException;
import software.amazon.awssdk.core.retry.RetryMode;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.core.retry.RetryUtils;
import software.amazon.awssdk.retries.AdaptiveRetryStrategy;
import software.amazon.awssdk.retries.DefaultRetryStrategy;
Expand Down Expand Up @@ -55,6 +56,8 @@ private SdkDefaultRetryStrategy() {
case STANDARD:
return standardRetryStrategy();
case ADAPTIVE:
return legacyAdaptiveRetryStrategy();
case ADAPTIVE_V2:
return adaptiveRetryStrategy();
case LEGACY:
return legacyRetryStrategy();
Expand All @@ -74,11 +77,14 @@ public static RetryMode retryMode(RetryStrategy<?, ?> retryStrategy) {
return RetryMode.STANDARD;
}
if (retryStrategy instanceof AdaptiveRetryStrategy) {
return RetryMode.ADAPTIVE;
return RetryMode.ADAPTIVE_V2;
}
if (retryStrategy instanceof LegacyRetryStrategy) {
return RetryMode.LEGACY;
}
if (retryStrategy instanceof RetryPolicyAdapter) {
return RetryMode.ADAPTIVE;
}
throw new IllegalArgumentException("unknown retry strategy class: " + retryStrategy.getClass().getName());
}

Expand Down Expand Up @@ -193,4 +199,16 @@ private static boolean retryOnThrottlingCondition(Throwable ex) {
}
return false;
}

/**
* Returns a {@link RetryStrategy<?, ?>} that implements the legacy {@link RetryMode#ADAPTIVE} mode.
*
* @return a {@link RetryStrategy<?, ?>} that implements the legacy {@link RetryMode#ADAPTIVE} mode.
*/
private static RetryStrategy<?, ?> legacyAdaptiveRetryStrategy() {
return RetryPolicyAdapter.builder()
.retryPolicy(RetryPolicy.forRetryMode(RetryMode.ADAPTIVE))
.build();
}
}

Expand Up @@ -73,7 +73,7 @@ public enum RetryMode {
STANDARD,

/**
* Adaptive retry mode builds on {@code STANDARD} mode.
* Adaptive retry mode builds on {@link #STANDARD} mode.
* <p>
* Adaptive retry mode dynamically limits the rate of AWS requests to maximize success rate. This may be at the
* expense of request latency. Adaptive retry mode is not recommended when predictable latency is important.
Expand All @@ -84,9 +84,31 @@ public enum RetryMode {
* the same client. When using adaptive retry mode, we recommend using a single client per resource.
*
* @see RetryPolicy#isFastFailRateLimiting()
* @deprecated As of 2.25.xx, replaced by {@link #ADAPTIVE_V2}. The ADAPTIVE implementation has a bug that prevents it
* from remembering its state across requests which is needed to correctly estimate its sending rate. Given that
* this bug has been present since its introduction and that correct version might change the traffic patterns of the SDK we
* deemed too risky to fix this implementation.
*/
@Deprecated
ADAPTIVE,

/**
* Adaptive V2 retry mode builds on {@link #STANDARD} mode.
* <p>
* Adaptive retry mode qdynamically limits the rate of AWS requests to maximize success rate. This may be at the
* expense of request latency. Adaptive V2 retry mode is not recommended when predictable latency is important.
* <p>
* {@code ADAPTIVE_V2} mode differs from {@link #ADAPTIVE} mode in the computed delays between calls, including the first
* attempt
* that might be delayed if the algorithm considers that it's needed to increase the odds of a successful response.
* <p>
* <b>Warning:</b> Adaptive V2 retry mode assumes that the client is working against a single resource (e.g. one
* DynamoDB Table or one S3 Bucket). If you use a single client for multiple resources, throttling or outages
* associated with one resource will result in increased latency and failures when accessing all other resources via
* the same client. When using adaptive retry mode, we recommend using a single client per resource.
*/
ADAPTIVE_V2,

;

/**
Expand Down Expand Up @@ -176,6 +198,8 @@ private static Optional<RetryMode> fromString(String string) {
return Optional.of(STANDARD);
case "adaptive":
return Optional.of(ADAPTIVE);
case "adaptive_v2":
return Optional.of(ADAPTIVE_V2);
default:
throw new IllegalStateException("Unsupported retry policy mode configured: " + string);
}
Expand Down
Expand Up @@ -372,6 +372,10 @@ private static final class BuilderImpl implements Builder {
private Boolean fastFailRateLimiting;

private BuilderImpl(RetryMode retryMode) {
if (retryMode == RetryMode.ADAPTIVE_V2) {
throw new UnsupportedOperationException("ADAPTIVE_V2 is not supported by retry policies, use a RetryStrategy "
+ "instead");
}
this.retryMode = retryMode;
this.numRetries = SdkDefaultRetrySetting.maxAttempts(retryMode) - 1;
this.additionalRetryConditionsAllowed = true;
Expand Down
Expand Up @@ -23,6 +23,7 @@
import software.amazon.awssdk.awscore.retry.AwsRetryStrategy;
import software.amazon.awssdk.core.client.config.SdkClientConfiguration;
import software.amazon.awssdk.core.client.config.SdkClientOption;
import software.amazon.awssdk.core.internal.retry.RetryPolicyAdapter;
import software.amazon.awssdk.core.internal.retry.SdkDefaultRetrySetting;
import software.amazon.awssdk.core.retry.RetryMode;
import software.amazon.awssdk.core.retry.RetryPolicy;
Expand Down Expand Up @@ -76,18 +77,8 @@ public static RetryPolicy resolveRetryPolicy(SdkClientConfiguration config) {
return configuredRetryPolicy;
}

RetryMode retryMode = RetryMode.resolver()
.profileFile(config.option(SdkClientOption.PROFILE_FILE_SUPPLIER))
.profileName(config.option(SdkClientOption.PROFILE_NAME))
.defaultRetryMode(config.option(SdkClientOption.DEFAULT_RETRY_MODE))
.resolve();

return AwsRetryPolicy.forRetryMode(retryMode)
.toBuilder()
.additionalRetryConditionsAllowed(false)
.numRetries(MAX_ERROR_RETRY)
.backoffStrategy(BACKOFF_STRATEGY)
.build();
RetryMode retryMode = resolveRetryMode(config);
return retryPolicyFor(retryMode);
}

public static RetryStrategy<?, ?> resolveRetryStrategy(SdkClientConfiguration config) {
Expand All @@ -96,16 +87,35 @@ public static RetryPolicy resolveRetryPolicy(SdkClientConfiguration config) {
return configuredRetryStrategy;
}

RetryMode retryMode = RetryMode.resolver()
.profileFile(config.option(SdkClientOption.PROFILE_FILE_SUPPLIER))
.profileName(config.option(SdkClientOption.PROFILE_NAME))
.defaultRetryMode(config.option(SdkClientOption.DEFAULT_RETRY_MODE))
.resolve();
RetryMode retryMode = resolveRetryMode(config);

if (retryMode == RetryMode.ADAPTIVE) {
return RetryPolicyAdapter.builder()
.retryPolicy(retryPolicyFor(retryMode))
.build();
}

return AwsRetryStrategy.forRetryMode(retryMode)
.toBuilder()
.maxAttempts(MAX_ATTEMPTS)
.backoffStrategy(exponentialDelay(BASE_DELAY, SdkDefaultRetrySetting.MAX_BACKOFF))
.build();
}

private static RetryPolicy retryPolicyFor(RetryMode retryMode) {
return AwsRetryPolicy.forRetryMode(retryMode)
.toBuilder()
.additionalRetryConditionsAllowed(false)
.numRetries(MAX_ERROR_RETRY)
.backoffStrategy(BACKOFF_STRATEGY)
.build();
}

private static RetryMode resolveRetryMode(SdkClientConfiguration config) {
return RetryMode.resolver()
.profileFile(config.option(SdkClientOption.PROFILE_FILE_SUPPLIER))
.profileName(config.option(SdkClientOption.PROFILE_NAME))
.defaultRetryMode(config.option(SdkClientOption.DEFAULT_RETRY_MODE))
.resolve();
}
}