Skip to content

Commit

Permalink
Fixed an issue where CloudWatchMetricPublisher might not always com…
Browse files Browse the repository at this point in the history
…plete flushing pending metrics on `close`.
  • Loading branch information
millems committed Aug 12, 2020
1 parent 09c7d03 commit 901a51f
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 39 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"type": "bugfix",
"category": "CloudWatch Metrics Publisher",
"description": "Fixed a bug where `CloudWatchPublisher#close` would not always complete flushing pending metrics before returning."
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import software.amazon.awssdk.annotations.SdkPreviewApi;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.annotations.ThreadSafe;
import software.amazon.awssdk.metrics.MetricCollection;
import software.amazon.awssdk.utils.SdkAutoCloseable;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,17 @@
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import software.amazon.awssdk.annotations.Immutable;
Expand Down Expand Up @@ -236,7 +240,7 @@ private CloudWatchMetricPublisher(Builder builder) {
threadFactory);

long flushFrequencyInMillis = resolveUploadFrequency(builder).toMillis();
this.scheduledExecutor.scheduleAtFixedRate(this::flushMetrics,
this.scheduledExecutor.scheduleAtFixedRate(this::flushMetricsQuietly,
flushFrequencyInMillis, flushFrequencyInMillis, TimeUnit.MILLISECONDS);
}

Expand Down Expand Up @@ -290,51 +294,66 @@ public void publish(MetricCollection metricCollection) {
/**
* Flush the metrics (via a {@link UploadMetricsTasks}). In the event that the {@link #executor} task queue is full, this
* this will retry automatically.
*
* This returns when the {@code UploadMetricsTask} has been submitted to the executor. The returned future is completed
* when the metrics upload to cloudwatch has started. The inner-most future is finally completed when the upload to cloudwatch
* has finished.
*/
private void flushMetrics() {
while (!scheduledExecutor.isShutdown() &&
!executor.isShutdown() &&
!Thread.currentThread().isInterrupted()) {
private Future<CompletableFuture<?>> flushMetrics() throws InterruptedException {
while (!executor.isShutdown()) {
try {
executor.submit(new UploadMetricsTasks(metricAggregator, metricUploader, maximumCallsPerUpload));
break;
return executor.submit(new UploadMetricsTasks(metricAggregator, metricUploader, maximumCallsPerUpload));
} catch (RejectedExecutionException e) {
sleepQuietly(100);
Thread.sleep(100);
}
}

return CompletableFuture.completedFuture(CompletableFuture.completedFuture(null));
}

private void sleepQuietly(int duration) {
private void flushMetricsQuietly() {
try {
Thread.sleep(duration);
flushMetrics();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
METRIC_LOGGER.error(() -> "Interrupted during metric flushing.", e);
}
}

@Override
public void close() {
flushMetrics();

scheduledExecutor.shutdownNow();
executor.shutdown();
try {
scheduledExecutor.shutdownNow();

Future<CompletableFuture<?>> flushFuture = flushMetrics();
executor.shutdown();

flushFuture.get(60, TimeUnit.SECONDS) // Wait for flush to start
.get(60, TimeUnit.SECONDS); // Wait for flush to finish

if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
throw new TimeoutException("Internal executor did not shut down in 60 seconds.");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
executor.shutdownNow();
METRIC_LOGGER.error(() -> "Interrupted during graceful metric publisher shutdown.", e);
} catch (ExecutionException e) {
METRIC_LOGGER.error(() -> "Failed during graceful metric publisher shutdown.", e);
} catch (TimeoutException e) {
METRIC_LOGGER.error(() -> "Timed out during graceful metric publisher shutdown.", e);
} finally {
runQuietly(scheduledExecutor::shutdownNow, "shutting down scheduled executor");
runQuietly(executor::shutdownNow, "shutting down executor");
runQuietly(() -> metricUploader.close(closeClientWithPublisher), "closing metric uploader");
}

metricUploader.close(closeClientWithPublisher);
}

/**
* Returns {@code true} when the internal executor has been shutdown.
*/
public boolean isShutdown() {
return executor.isShutdown() && scheduledExecutor.isShutdown();
private void runQuietly(Runnable runnable, String taskName) {
try {
runnable.run();
} catch (Exception e) {
METRIC_LOGGER.warn(() -> "Failed while " + taskName + ".", e);
}
}

/**
Expand All @@ -351,6 +370,13 @@ public static CloudWatchMetricPublisher create() {
return builder().build();
}

/**
* Returns {@code true} when the internal executors for this publisher are shut down.
*/
boolean isShutdown() {
return scheduledExecutor.isShutdown() && executor.isShutdown();
}

/**
* Builder class to construct {@link CloudWatchMetricPublisher} instances. See the individual properties for which
* configuration settings are available.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,21 @@
import static software.amazon.awssdk.metrics.publishers.cloudwatch.internal.CloudWatchMetricLogger.METRIC_LOGGER;

import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.metrics.publishers.cloudwatch.CloudWatchMetricPublisher;
import software.amazon.awssdk.metrics.publishers.cloudwatch.internal.MetricUploader;
import software.amazon.awssdk.metrics.publishers.cloudwatch.internal.transform.MetricCollectionAggregator;
import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataRequest;
import software.amazon.awssdk.utils.CompletableFutureUtils;

/**
* A task that is executed on the {@link CloudWatchMetricPublisher}'s executor to collect requests from a
* {@link MetricCollectionAggregator} and write them to a {@link MetricUploader}.
*/
@SdkInternalApi
public class UploadMetricsTasks implements Runnable {
public class UploadMetricsTasks implements Callable<CompletableFuture<?>> {
private final MetricCollectionAggregator collectionAggregator;
private final MetricUploader uploader;
private int maximumRequestsPerFlush;
Expand All @@ -43,17 +46,20 @@ public UploadMetricsTasks(MetricCollectionAggregator collectionAggregator,
}

@Override
public void run() {
List<PutMetricDataRequest> allRequests = collectionAggregator.getRequests();
List<PutMetricDataRequest> requests = allRequests;
if (requests.size() > maximumRequestsPerFlush) {
METRIC_LOGGER.warn(() -> "Maximum AWS SDK client-side metric call count exceeded: " + allRequests.size() +
" > " + maximumRequestsPerFlush + ". Some metric requests will be dropped. This occurs when "
+ "the caller has configured too many metrics or too unique of dimensions without an "
+ "associated increase in the maximum-calls-per-upload configured on the publisher.");
requests = requests.subList(0, maximumRequestsPerFlush);
public CompletableFuture<?> call() {
try {
List<PutMetricDataRequest> allRequests = collectionAggregator.getRequests();
List<PutMetricDataRequest> requests = allRequests;
if (requests.size() > maximumRequestsPerFlush) {
METRIC_LOGGER.warn(() -> "Maximum AWS SDK client-side metric call count exceeded: " + allRequests.size() +
" > " + maximumRequestsPerFlush + ". Some metric requests will be dropped. This occurs "
+ "when the caller has configured too many metrics or too unique of dimensions without "
+ "an associated increase in the maximum-calls-per-upload configured on the publisher.");
requests = requests.subList(0, maximumRequestsPerFlush);
}
return uploader.upload(requests);
} catch (Throwable t) {
return CompletableFutureUtils.failedFuture(t);
}

uploader.upload(requests);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@

package software.amazon.awssdk.metrics.publishers.cloudwatch;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.never;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
Expand Down Expand Up @@ -69,8 +74,7 @@ public void interruptedShutdownStillTerminates() {
Thread.currentThread().interrupt();
publisher.close();
assertThat(publisher.isShutdown()).isTrue();

Thread.interrupted(); // Clear interrupt flag
assertThat(Thread.interrupted()).isTrue(); // Clear interrupt flag
}

@Test
Expand All @@ -79,6 +83,50 @@ public void closeDoesNotCloseConfiguredClient() {
Mockito.verify(cloudWatch, never()).close();
}

@Test(timeout = 10_000)
public void closeWaitsForUploadToComplete() throws InterruptedException {
CountDownLatch cloudwatchPutCalledLatch = new CountDownLatch(1);
CompletableFuture<PutMetricDataResponse> result = new CompletableFuture<>();

CloudWatchAsyncClient cloudWatch = Mockito.mock(CloudWatchAsyncClient.class);
try (CloudWatchMetricPublisher publisher = CloudWatchMetricPublisher.builder()
.cloudWatchClient(cloudWatch)
.uploadFrequency(Duration.ofMinutes(60))
.build()) {
MetricCollector collector = newCollector();
collector.reportMetric(HttpMetric.AVAILABLE_CONCURRENCY, 5);
publisher.publish(new FixedTimeMetricCollection(collector.collect()));

Mockito.when(cloudWatch.putMetricData(any(PutMetricDataRequest.class))).thenAnswer(x -> {
cloudwatchPutCalledLatch.countDown();
return result;
});

publisher.publish(MetricCollector.create("test").collect());

Thread closeThread = new Thread(publisher::close);

assertThat(publisher.isShutdown()).isFalse();

closeThread.start();

// Wait until cloudwatch is called
cloudwatchPutCalledLatch.await();

// Wait to make sure the close thread seems to be waiting for the cloudwatch call to complete
Thread.sleep(1_000);

assertThat(closeThread.isAlive()).isTrue();

// Complete the cloudwatch call
result.complete(null);

// Make sure the close thread finishes
closeThread.join(5_000);
assertThat(closeThread.isAlive()).isFalse();
}
}

@Test
public void defaultNamespaceIsCorrect() {
try (CloudWatchMetricPublisher publisher = CloudWatchMetricPublisher.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void extraTasksAboveMaximumAreDropped() {
PutMetricDataRequest.builder().build(),
PutMetricDataRequest.builder().build());
Mockito.when(aggregator.getRequests()).thenReturn(requests);
task.run();
task.call();


ArgumentCaptor<List> captor = ArgumentCaptor.forClass(List.class);
Expand Down

0 comments on commit 901a51f

Please sign in to comment.