Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve s3 backup store client reliability #10603

Merged
merged 3 commits into from
Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions backup-stores/s3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@
<artifactId>auth</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>aws-core</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>netty-nio-client</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/
package io.camunda.zeebe.backup.s3;

import java.time.Duration;
import java.util.Optional;

/**
Expand All @@ -19,6 +20,8 @@
* try to discover an appropriate value from the environment.
* @param credentials If no value is provided, the AWS SDK will try to discover appropriate values
* from the environment.
* @param apiCallTimeout Used as the overall api call timeout for the AWS SDK. API calls that exceed

Check notice

Code scanning / CodeQL

Spurious Javadoc @param tags

@param tag "apiCallTimeout" does not match any actual type parameter of type "S3BackupConfig".
* the timeout may fail and result in failed backups.
* @see <a
* href=https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html#automatically-determine-the-aws-region-from-the-environment>
* Automatically determine the Region from the environment</a>
Expand All @@ -28,25 +31,27 @@ public record S3BackupConfig(
String bucketName,
Optional<String> endpoint,
Optional<String> region,
Optional<Credentials> credentials) {
Optional<Credentials> credentials,
Optional<Duration> apiCallTimeout) {

/**
* Creates a config without setting the region and credentials.
*
* @param bucketName Name of the backup that will be used for storing backups
* @see S3BackupConfig#S3BackupConfig(String bucketName, Optional endpoint, Optional region,
* Optional credentials)
* Optional credentials, Optional apiCallTimeout)
*/
public S3BackupConfig(final String bucketName) {
this(bucketName, Optional.empty(), Optional.empty(), Optional.empty());
this(bucketName, Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
}

public static S3BackupConfig from(
final String bucketName,
final String endpoint,
final String region,
final String accessKey,
final String secretKey) {
final String secretKey,
final Duration apiCallTimeoutMs) {
Credentials credentials = null;
if (accessKey != null && secretKey != null) {
credentials = new Credentials(accessKey, secretKey);
Expand All @@ -55,7 +60,8 @@ public static S3BackupConfig from(
bucketName,
Optional.ofNullable(endpoint),
Optional.ofNullable(region),
Optional.ofNullable(credentials));
Optional.ofNullable(credentials),
Optional.ofNullable(apiCallTimeoutMs));
}

record Credentials(String accessKey, String secretKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.io.IOException;
import java.net.URI;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
Expand All @@ -47,9 +48,12 @@
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.awscore.defaultsmode.DefaultsMode;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.core.retry.RetryMode;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
Expand Down Expand Up @@ -428,6 +432,19 @@ private CompletableFuture<PutObjectResponse> saveNamedFile(

public static S3AsyncClient buildClient(final S3BackupConfig config) {
final var builder = S3AsyncClient.builder();

// Enable auto-tuning of various parameters based on the environment
builder.defaultsMode(DefaultsMode.AUTO);

builder.httpClient(
NettyNioAsyncHttpClient.builder()
// We'd rather wait longer for a connection than have a failed backup. This helps in
// smoothing out spikes when taking a backup.
// Default is 10s: `SdkHttpConfigurationOption.DEFAULT_CONNECTION_ACQUIRE_TIMEOUT`.
.connectionAcquisitionTimeout(Duration.ofSeconds(45))
.build());

builder.overrideConfiguration(cfg -> cfg.retryPolicy(RetryMode.ADAPTIVE));
config.endpoint().ifPresent(endpoint -> builder.endpointOverride(URI.create(endpoint)));
config.region().ifPresent(region -> builder.region(Region.of(region)));
config
Expand All @@ -438,6 +455,9 @@ public static S3AsyncClient buildClient(final S3BackupConfig config) {
StaticCredentialsProvider.create(
AwsBasicCredentials.create(
credentials.accessKey(), credentials.secretKey()))));
config
.apiCallTimeout()
.ifPresent(timeout -> builder.overrideConfiguration(cfg -> cfg.apiCallTimeout(timeout)));
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ private void validateBackupCfg(final BackupStoreCfg backup) {
s3Config.getEndpoint(),
s3Config.getRegion(),
s3Config.getAccessKey(),
s3Config.getSecretKey());
s3Config.getSecretKey(),
s3Config.getApiCallTimeout());
try {
S3BackupStore.validateConfig(storeConfig);
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package io.camunda.zeebe.broker.system.configuration.backup;

import io.camunda.zeebe.broker.system.configuration.ConfigurationEntry;
import java.time.Duration;
import java.util.Objects;

public class S3BackupStoreConfig implements ConfigurationEntry {
Expand All @@ -17,6 +18,7 @@ public class S3BackupStoreConfig implements ConfigurationEntry {
private String region;
private String accessKey;
private String secretKey;
private Duration apiCallTimeout = Duration.ofSeconds(180);

public String getBucketName() {
return bucketName;
Expand Down Expand Up @@ -58,13 +60,22 @@ public void setSecretKey(final String secretKey) {
this.secretKey = secretKey;
}

public Duration getApiCallTimeout() {
return apiCallTimeout;
}

public void setApiCallTimeout(final Duration apiCallTimeout) {
this.apiCallTimeout = apiCallTimeout;
}

@Override
public int hashCode() {
int result = bucketName != null ? bucketName.hashCode() : 0;
result = 31 * result + (endpoint != null ? endpoint.hashCode() : 0);
result = 31 * result + (region != null ? region.hashCode() : 0);
result = 31 * result + (accessKey != null ? accessKey.hashCode() : 0);
result = 31 * result + (secretKey != null ? secretKey.hashCode() : 0);
result = 31 * result + (apiCallTimeout != null ? apiCallTimeout.hashCode() : 0);
return result;
}

Expand All @@ -91,7 +102,10 @@ public boolean equals(final Object o) {
if (!Objects.equals(accessKey, that.accessKey)) {
return false;
}
return Objects.equals(secretKey, that.secretKey);
if (!Objects.equals(secretKey, that.secretKey)) {
return false;
}
return Objects.equals(apiCallTimeout, that.apiCallTimeout);
}

@Override
Expand All @@ -112,6 +126,9 @@ public String toString() {
+ ", secretKey='"
+ "<redacted>"
+ '\''
+ ", apiCallTimeout='"
+ apiCallTimeout
+ '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ private static void installS3Store(
s3Config.getEndpoint(),
s3Config.getRegion(),
s3Config.getAccessKey(),
s3Config.getSecretKey());
s3Config.getSecretKey(),
s3Config.getApiCallTimeout());
final S3BackupStore backupStore = new S3BackupStore(storeConfig);
context.setBackupStore(backupStore);
installed.complete(null);
Expand Down
6 changes: 6 additions & 0 deletions dist/src/main/config/broker.standalone.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,12 @@
# This setting can also be overridden using the environment variable ZEEBE_BROKER_DATA_BACKUP_S3_SECRETKEY
# secretKey:

# Configure a maximum duration for all S3 client API calls.
# Lower values will ensure that failed or slow API calls don't block other backups but may increase the risk
# that backups can't be stored if uploading parts of the backup takes longer than the configured timeout.
# See https://github.com/aws/aws-sdk-java-v2/blob/master/docs/BestPractices.md#utilize-timeout-configurations
# apiCallTimeout: PT180S

# cluster:
# This section contains all cluster related configurations, to setup a zeebe cluster

Expand Down
6 changes: 6 additions & 0 deletions dist/src/main/config/broker.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,12 @@
# This setting can also be overridden using the environment variable ZEEBE_BROKER_DATA_BACKUP_S3_SECRETKEY
# secretKey:

# Configure a maximum duration for all S3 client API calls.
# Lower values will ensure that failed or slow API calls don't block other backups but may increase the risk
# that backups can't be stored if uploading parts of the backup takes longer than the configured timeout.
# See https://github.com/aws/aws-sdk-java-v2/blob/master/docs/BestPractices.md#utilize-timeout-configurations
# apiCallTimeout: PT180S

# cluster:
# This section contains all cluster related configurations, to setup a zeebe cluster

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ private BackupStore buildBackupStore(final BackupStoreCfg backupCfg) {
s3Config.getEndpoint(),
s3Config.getRegion(),
s3Config.getAccessKey(),
s3Config.getSecretKey());
s3Config.getSecretKey(),
s3Config.getApiCallTimeout());
return new S3BackupStore(storeConfig);
} else {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ void beforeEach() {
minio.externalEndpoint(),
minio.region(),
minio.accessKey(),
minio.secretKey());
minio.secretKey(),
Duration.ofSeconds(25));
store = new S3BackupStore(config);

try (final var client = S3BackupStore.buildClient(config)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,12 @@ void createBackupStoreForTest() {
// Create bucket before for storing backups
s3ClientConfig =
S3BackupConfig.from(
bucketName, S3.externalEndpoint(), S3.region(), S3.accessKey(), S3.secretKey());
bucketName,
S3.externalEndpoint(),
S3.region(),
S3.accessKey(),
S3.secretKey(),
Duration.ofSeconds(15));
s3BackupStore = new S3BackupStore(s3ClientConfig);
try (final var s3Client = S3BackupStore.buildClient(s3ClientConfig)) {
s3Client.createBucket(builder -> builder.bucket(bucketName).build()).join();
Expand Down