Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed an issue where CloudWatchMetricPublisher might not always complete flushing pending metrics on close. #1984

Merged
merged 1 commit into from
Aug 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"type": "bugfix",
"category": "CloudWatch Metrics Publisher",
"description": "Fixed a bug where `CloudWatchPublisher#close` would not always complete flushing pending metrics before returning."
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import software.amazon.awssdk.annotations.SdkPreviewApi;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.annotations.ThreadSafe;
import software.amazon.awssdk.metrics.MetricCollection;
import software.amazon.awssdk.utils.SdkAutoCloseable;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,17 @@
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import software.amazon.awssdk.annotations.Immutable;
Expand Down Expand Up @@ -236,7 +240,7 @@ private CloudWatchMetricPublisher(Builder builder) {
threadFactory);

long flushFrequencyInMillis = resolveUploadFrequency(builder).toMillis();
this.scheduledExecutor.scheduleAtFixedRate(this::flushMetrics,
this.scheduledExecutor.scheduleAtFixedRate(this::flushMetricsQuietly,
flushFrequencyInMillis, flushFrequencyInMillis, TimeUnit.MILLISECONDS);
}

Expand Down Expand Up @@ -290,51 +294,66 @@ public void publish(MetricCollection metricCollection) {
/**
* Flush the metrics (via a {@link UploadMetricsTasks}). In the event that the {@link #executor} task queue is full, this
* this will retry automatically.
*
* This returns when the {@code UploadMetricsTask} has been submitted to the executor. The returned future is completed
* when the metrics upload to cloudwatch has started. The inner-most future is finally completed when the upload to cloudwatch
* has finished.
*/
private void flushMetrics() {
while (!scheduledExecutor.isShutdown() &&
!executor.isShutdown() &&
!Thread.currentThread().isInterrupted()) {
private Future<CompletableFuture<?>> flushMetrics() throws InterruptedException {
while (!executor.isShutdown()) {
try {
executor.submit(new UploadMetricsTasks(metricAggregator, metricUploader, maximumCallsPerUpload));
break;
return executor.submit(new UploadMetricsTasks(metricAggregator, metricUploader, maximumCallsPerUpload));
} catch (RejectedExecutionException e) {
sleepQuietly(100);
Thread.sleep(100);
}
}

return CompletableFuture.completedFuture(CompletableFuture.completedFuture(null));
}

private void sleepQuietly(int duration) {
private void flushMetricsQuietly() {
try {
Thread.sleep(duration);
flushMetrics();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
METRIC_LOGGER.error(() -> "Interrupted during metric flushing.", e);
}
}

@Override
public void close() {
flushMetrics();

scheduledExecutor.shutdownNow();
executor.shutdown();
try {
scheduledExecutor.shutdownNow();

Future<CompletableFuture<?>> flushFuture = flushMetrics();
executor.shutdown();

flushFuture.get(60, TimeUnit.SECONDS) // Wait for flush to start
.get(60, TimeUnit.SECONDS); // Wait for flush to finish

if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
throw new TimeoutException("Internal executor did not shut down in 60 seconds.");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
executor.shutdownNow();
METRIC_LOGGER.error(() -> "Interrupted during graceful metric publisher shutdown.", e);
} catch (ExecutionException e) {
METRIC_LOGGER.error(() -> "Failed during graceful metric publisher shutdown.", e);
} catch (TimeoutException e) {
METRIC_LOGGER.error(() -> "Timed out during graceful metric publisher shutdown.", e);
} finally {
runQuietly(scheduledExecutor::shutdownNow, "shutting down scheduled executor");
runQuietly(executor::shutdownNow, "shutting down executor");
runQuietly(() -> metricUploader.close(closeClientWithPublisher), "closing metric uploader");
}

metricUploader.close(closeClientWithPublisher);
}

/**
* Returns {@code true} when the internal executor has been shutdown.
*/
public boolean isShutdown() {
return executor.isShutdown() && scheduledExecutor.isShutdown();
private void runQuietly(Runnable runnable, String taskName) {
try {
runnable.run();
} catch (Exception e) {
METRIC_LOGGER.warn(() -> "Failed while " + taskName + ".", e);
}
}

/**
Expand All @@ -351,6 +370,13 @@ public static CloudWatchMetricPublisher create() {
return builder().build();
}

/**
* Returns {@code true} when the internal executors for this publisher are shut down.
*/
boolean isShutdown() {
return scheduledExecutor.isShutdown() && executor.isShutdown();
}

/**
* Builder class to construct {@link CloudWatchMetricPublisher} instances. See the individual properties for which
* configuration settings are available.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,21 @@
import static software.amazon.awssdk.metrics.publishers.cloudwatch.internal.CloudWatchMetricLogger.METRIC_LOGGER;

import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.metrics.publishers.cloudwatch.CloudWatchMetricPublisher;
import software.amazon.awssdk.metrics.publishers.cloudwatch.internal.MetricUploader;
import software.amazon.awssdk.metrics.publishers.cloudwatch.internal.transform.MetricCollectionAggregator;
import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataRequest;
import software.amazon.awssdk.utils.CompletableFutureUtils;

/**
* A task that is executed on the {@link CloudWatchMetricPublisher}'s executor to collect requests from a
* {@link MetricCollectionAggregator} and write them to a {@link MetricUploader}.
*/
@SdkInternalApi
public class UploadMetricsTasks implements Runnable {
public class UploadMetricsTasks implements Callable<CompletableFuture<?>> {
private final MetricCollectionAggregator collectionAggregator;
private final MetricUploader uploader;
private int maximumRequestsPerFlush;
Expand All @@ -43,17 +46,20 @@ public UploadMetricsTasks(MetricCollectionAggregator collectionAggregator,
}

@Override
public void run() {
List<PutMetricDataRequest> allRequests = collectionAggregator.getRequests();
List<PutMetricDataRequest> requests = allRequests;
if (requests.size() > maximumRequestsPerFlush) {
METRIC_LOGGER.warn(() -> "Maximum AWS SDK client-side metric call count exceeded: " + allRequests.size() +
" > " + maximumRequestsPerFlush + ". Some metric requests will be dropped. This occurs when "
+ "the caller has configured too many metrics or too unique of dimensions without an "
+ "associated increase in the maximum-calls-per-upload configured on the publisher.");
requests = requests.subList(0, maximumRequestsPerFlush);
public CompletableFuture<?> call() {
try {
List<PutMetricDataRequest> allRequests = collectionAggregator.getRequests();
List<PutMetricDataRequest> requests = allRequests;
if (requests.size() > maximumRequestsPerFlush) {
METRIC_LOGGER.warn(() -> "Maximum AWS SDK client-side metric call count exceeded: " + allRequests.size() +
" > " + maximumRequestsPerFlush + ". Some metric requests will be dropped. This occurs "
+ "when the caller has configured too many metrics or too unique of dimensions without "
+ "an associated increase in the maximum-calls-per-upload configured on the publisher.");
requests = requests.subList(0, maximumRequestsPerFlush);
}
return uploader.upload(requests);
} catch (Throwable t) {
return CompletableFutureUtils.failedFuture(t);
}

uploader.upload(requests);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@

package software.amazon.awssdk.metrics.publishers.cloudwatch;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.never;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
Expand Down Expand Up @@ -69,8 +74,7 @@ public void interruptedShutdownStillTerminates() {
Thread.currentThread().interrupt();
publisher.close();
assertThat(publisher.isShutdown()).isTrue();

Thread.interrupted(); // Clear interrupt flag
assertThat(Thread.interrupted()).isTrue(); // Clear interrupt flag
}

@Test
Expand All @@ -79,6 +83,50 @@ public void closeDoesNotCloseConfiguredClient() {
Mockito.verify(cloudWatch, never()).close();
}

@Test(timeout = 10_000)
public void closeWaitsForUploadToComplete() throws InterruptedException {
CountDownLatch cloudwatchPutCalledLatch = new CountDownLatch(1);
CompletableFuture<PutMetricDataResponse> result = new CompletableFuture<>();

CloudWatchAsyncClient cloudWatch = Mockito.mock(CloudWatchAsyncClient.class);
try (CloudWatchMetricPublisher publisher = CloudWatchMetricPublisher.builder()
.cloudWatchClient(cloudWatch)
.uploadFrequency(Duration.ofMinutes(60))
.build()) {
MetricCollector collector = newCollector();
collector.reportMetric(HttpMetric.AVAILABLE_CONCURRENCY, 5);
publisher.publish(new FixedTimeMetricCollection(collector.collect()));

Mockito.when(cloudWatch.putMetricData(any(PutMetricDataRequest.class))).thenAnswer(x -> {
cloudwatchPutCalledLatch.countDown();
return result;
});

publisher.publish(MetricCollector.create("test").collect());

Thread closeThread = new Thread(publisher::close);

assertThat(publisher.isShutdown()).isFalse();

closeThread.start();

// Wait until cloudwatch is called
cloudwatchPutCalledLatch.await();

// Wait to make sure the close thread seems to be waiting for the cloudwatch call to complete
Thread.sleep(1_000);

assertThat(closeThread.isAlive()).isTrue();

// Complete the cloudwatch call
result.complete(null);

// Make sure the close thread finishes
closeThread.join(5_000);
assertThat(closeThread.isAlive()).isFalse();
}
}

@Test
public void defaultNamespaceIsCorrect() {
try (CloudWatchMetricPublisher publisher = CloudWatchMetricPublisher.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void extraTasksAboveMaximumAreDropped() {
PutMetricDataRequest.builder().build(),
PutMetricDataRequest.builder().build());
Mockito.when(aggregator.getRequests()).thenReturn(requests);
task.run();
task.call();


ArgumentCaptor<List> captor = ArgumentCaptor.forClass(List.class);
Expand Down