Skip to content

Commit

Permalink
merge: #10619
Browse files Browse the repository at this point in the history
10619: [Backport stable/8.1] Improve s3 backup store client reliability r=oleschoenburg a=backport-action

# Description
Backport of #10603 to `stable/8.1`.

relates to 

Co-authored-by: Ole Schönburg <ole.schoenburg@gmail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and lenaschoenburg committed Oct 7, 2022
2 parents 9fa438c + cad8699 commit 4bbf8b2
Show file tree
Hide file tree
Showing 11 changed files with 85 additions and 11 deletions.
10 changes: 10 additions & 0 deletions backup-stores/s3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@
<artifactId>auth</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>aws-core</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>netty-nio-client</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/
package io.camunda.zeebe.backup.s3;

import java.time.Duration;
import java.util.Optional;

/**
Expand All @@ -19,6 +20,8 @@
* try to discover an appropriate value from the environment.
* @param credentials If no value is provided, the AWS SDK will try to discover appropriate values
* from the environment.
* @param apiCallTimeout Used as the overall api call timeout for the AWS SDK. API calls that exceed
* the timeout may fail and result in failed backups.
* @see <a
* href=https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html#automatically-determine-the-aws-region-from-the-environment>
* Automatically determine the Region from the environment</a>
Expand All @@ -28,25 +31,27 @@ public record S3BackupConfig(
String bucketName,
Optional<String> endpoint,
Optional<String> region,
Optional<Credentials> credentials) {
Optional<Credentials> credentials,
Optional<Duration> apiCallTimeout) {

/**
* Creates a config without setting the region and credentials.
*
* @param bucketName Name of the backup that will be used for storing backups
* @see S3BackupConfig#S3BackupConfig(String bucketName, Optional endpoint, Optional region,
* Optional credentials)
* Optional credentials, Optional apiCallTimeout)
*/
public S3BackupConfig(final String bucketName) {
this(bucketName, Optional.empty(), Optional.empty(), Optional.empty());
this(bucketName, Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
}

public static S3BackupConfig from(
final String bucketName,
final String endpoint,
final String region,
final String accessKey,
final String secretKey) {
final String secretKey,
final Duration apiCallTimeoutMs) {
Credentials credentials = null;
if (accessKey != null && secretKey != null) {
credentials = new Credentials(accessKey, secretKey);
Expand All @@ -55,7 +60,8 @@ public static S3BackupConfig from(
bucketName,
Optional.ofNullable(endpoint),
Optional.ofNullable(region),
Optional.ofNullable(credentials));
Optional.ofNullable(credentials),
Optional.ofNullable(apiCallTimeoutMs));
}

record Credentials(String accessKey, String secretKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.io.IOException;
import java.net.URI;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
Expand All @@ -47,9 +48,12 @@
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.awscore.defaultsmode.DefaultsMode;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.core.retry.RetryMode;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
Expand Down Expand Up @@ -428,6 +432,19 @@ private CompletableFuture<PutObjectResponse> saveNamedFile(

public static S3AsyncClient buildClient(final S3BackupConfig config) {
final var builder = S3AsyncClient.builder();

// Enable auto-tuning of various parameters based on the environment
builder.defaultsMode(DefaultsMode.AUTO);

builder.httpClient(
NettyNioAsyncHttpClient.builder()
// We'd rather wait longer for a connection than have a failed backup. This helps in
// smoothing out spikes when taking a backup.
// Default is 10s: `SdkHttpConfigurationOption.DEFAULT_CONNECTION_ACQUIRE_TIMEOUT`.
.connectionAcquisitionTimeout(Duration.ofSeconds(45))
.build());

builder.overrideConfiguration(cfg -> cfg.retryPolicy(RetryMode.ADAPTIVE));
config.endpoint().ifPresent(endpoint -> builder.endpointOverride(URI.create(endpoint)));
config.region().ifPresent(region -> builder.region(Region.of(region)));
config
Expand All @@ -438,6 +455,9 @@ public static S3AsyncClient buildClient(final S3BackupConfig config) {
StaticCredentialsProvider.create(
AwsBasicCredentials.create(
credentials.accessKey(), credentials.secretKey()))));
config
.apiCallTimeout()
.ifPresent(timeout -> builder.overrideConfiguration(cfg -> cfg.apiCallTimeout(timeout)));
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ private void validateBackupCfg(final BackupStoreCfg backup) {
s3Config.getEndpoint(),
s3Config.getRegion(),
s3Config.getAccessKey(),
s3Config.getSecretKey());
s3Config.getSecretKey(),
s3Config.getApiCallTimeout());
try {
S3BackupStore.validateConfig(storeConfig);
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package io.camunda.zeebe.broker.system.configuration.backup;

import io.camunda.zeebe.broker.system.configuration.ConfigurationEntry;
import java.time.Duration;
import java.util.Objects;

public class S3BackupStoreConfig implements ConfigurationEntry {
Expand All @@ -17,6 +18,7 @@ public class S3BackupStoreConfig implements ConfigurationEntry {
private String region;
private String accessKey;
private String secretKey;
private Duration apiCallTimeout = Duration.ofSeconds(180);

public String getBucketName() {
return bucketName;
Expand Down Expand Up @@ -58,13 +60,22 @@ public void setSecretKey(final String secretKey) {
this.secretKey = secretKey;
}

public Duration getApiCallTimeout() {
return apiCallTimeout;
}

public void setApiCallTimeout(final Duration apiCallTimeout) {
this.apiCallTimeout = apiCallTimeout;
}

@Override
public int hashCode() {
int result = bucketName != null ? bucketName.hashCode() : 0;
result = 31 * result + (endpoint != null ? endpoint.hashCode() : 0);
result = 31 * result + (region != null ? region.hashCode() : 0);
result = 31 * result + (accessKey != null ? accessKey.hashCode() : 0);
result = 31 * result + (secretKey != null ? secretKey.hashCode() : 0);
result = 31 * result + (apiCallTimeout != null ? apiCallTimeout.hashCode() : 0);
return result;
}

Expand All @@ -91,7 +102,10 @@ public boolean equals(final Object o) {
if (!Objects.equals(accessKey, that.accessKey)) {
return false;
}
return Objects.equals(secretKey, that.secretKey);
if (!Objects.equals(secretKey, that.secretKey)) {
return false;
}
return Objects.equals(apiCallTimeout, that.apiCallTimeout);
}

@Override
Expand All @@ -112,6 +126,9 @@ public String toString() {
+ ", secretKey='"
+ "<redacted>"
+ '\''
+ ", apiCallTimeout='"
+ apiCallTimeout
+ '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ private static void installS3Store(
s3Config.getEndpoint(),
s3Config.getRegion(),
s3Config.getAccessKey(),
s3Config.getSecretKey());
s3Config.getSecretKey(),
s3Config.getApiCallTimeout());
final S3BackupStore backupStore = new S3BackupStore(storeConfig);
context.setBackupStore(backupStore);
installed.complete(null);
Expand Down
6 changes: 6 additions & 0 deletions dist/src/main/config/broker.standalone.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,12 @@
# This setting can also be overridden using the environment variable ZEEBE_BROKER_DATA_BACKUP_S3_SECRETKEY
# secretKey:

# Configure a maximum duration for all S3 client API calls.
# Lower values will ensure that failed or slow API calls don't block other backups but may increase the risk
# that backups can't be stored if uploading parts of the backup takes longer than the configured timeout.
# See https://github.com/aws/aws-sdk-java-v2/blob/master/docs/BestPractices.md#utilize-timeout-configurations
# apiCallTimeout: PT180S

# cluster:
# This section contains all cluster related configurations, to setup a zeebe cluster

Expand Down
6 changes: 6 additions & 0 deletions dist/src/main/config/broker.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,12 @@
# This setting can also be overridden using the environment variable ZEEBE_BROKER_DATA_BACKUP_S3_SECRETKEY
# secretKey:

# Configure a maximum duration for all S3 client API calls.
# Lower values will ensure that failed or slow API calls don't block other backups but may increase the risk
# that backups can't be stored if uploading parts of the backup takes longer than the configured timeout.
# See https://github.com/aws/aws-sdk-java-v2/blob/master/docs/BestPractices.md#utilize-timeout-configurations
# apiCallTimeout: PT180S

# cluster:
# This section contains all cluster related configurations, to setup a zeebe cluster

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ private BackupStore buildBackupStore(final BackupStoreCfg backupCfg) {
s3Config.getEndpoint(),
s3Config.getRegion(),
s3Config.getAccessKey(),
s3Config.getSecretKey());
s3Config.getSecretKey(),
s3Config.getApiCallTimeout());
return new S3BackupStore(storeConfig);
} else {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ void beforeEach() {
minio.externalEndpoint(),
minio.region(),
minio.accessKey(),
minio.secretKey());
minio.secretKey(),
Duration.ofSeconds(25));
store = new S3BackupStore(config);

try (final var client = S3BackupStore.buildClient(config)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,12 @@ void createBackupStoreForTest() {
// Create bucket before for storing backups
s3ClientConfig =
S3BackupConfig.from(
bucketName, S3.externalEndpoint(), S3.region(), S3.accessKey(), S3.secretKey());
bucketName,
S3.externalEndpoint(),
S3.region(),
S3.accessKey(),
S3.secretKey(),
Duration.ofSeconds(15));
s3BackupStore = new S3BackupStore(s3ClientConfig);
try (final var s3Client = S3BackupStore.buildClient(s3ClientConfig)) {
s3Client.createBucket(builder -> builder.bucket(bucketName).build()).join();
Expand Down

0 comments on commit 4bbf8b2

Please sign in to comment.