Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

batch updater for counter, timer, and dist summary #1003

Merged
merged 3 commits into from Nov 11, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2019 Netflix, Inc.
* Copyright 2014-2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -50,4 +50,49 @@ default long count() {
* often a counter is reset depends on the underlying registry implementation.
*/
double actualCount();

/**
* Returns a helper that can be used to more efficiently update the counter within a
* single thread. For example, if you need to update a meter within a loop where the
* rest of the loop body is fairly cheap, the instrumentation code may add considerable
* overhead if done in the loop body. A batched updater can offset a fair amount of that
* cost, but the updates may be delayed a bit in reaching the meter. The updates will only
* be seen after the updater is explicitly flushed.
*
* The caller should ensure that the updater is closed after using to guarantee any resources
* associated with it are cleaned up. In some cases failure to close the updater could result
* in a memory leak.
*
* @param batchSize
* Number of updates to batch before forcing a flush to the meter.
* @return
* Batch updater implementation for this meter.
*/
default BatchUpdater batchUpdater(int batchSize) {
return new CounterBatchUpdater(this, batchSize);
}

/** See {@link #batchUpdater(int)}. */
interface BatchUpdater extends AutoCloseable {
/** Update the counter by one. */
default void increment() {
add(1.0);
}

/**
* Update the counter by {@code amount}.
*
* @param amount
* Amount to add to the counter.
*/
default void increment(long amount) {
add(amount);
}

/** Update the counter by the specified amount. */
void add(double amount);

/** Push updates to the associated counter. */
void flush();
}
}
@@ -0,0 +1,55 @@
/*
* Copyright 2014-2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spectator.api;

final class CounterBatchUpdater implements Counter.BatchUpdater {

private final Counter counter;
private final int batchSize;

private int count;
private double sum;

CounterBatchUpdater(Counter counter, int batchSize) {
this.counter = counter;
this.batchSize = batchSize;
this.count = 0;
this.sum = 0.0;
}

@Override
public void add(double amount) {
if (Double.isFinite(amount) && amount > 0.0) {
sum += amount;
++count;
if (count >= batchSize) {
flush();
}
}
}

@Override
public void flush() {
counter.add(sum);
sum = 0.0;
count = 0;
}

@Override
public void close() throws Exception {
flush();
}
}
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2019 Netflix, Inc.
* Copyright 2014-2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -44,7 +44,7 @@ final class DefaultTimer extends AbstractTimer {
}

@Override public void record(long amount, TimeUnit unit) {
if (amount >= 0) {
if (amount >= 0L) {
final long nanos = TimeUnit.NANOSECONDS.convert(amount, unit);
totalTime.addAndGet(nanos);
count.incrementAndGet();
Expand Down
@@ -0,0 +1,53 @@
/*
* Copyright 2014-2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spectator.api;

class DistSummaryBatchUpdater implements DistributionSummary.BatchUpdater {

private final DistributionSummary distSummary;
private final int batchSize;

private int count;
private final long[] amounts;

DistSummaryBatchUpdater(DistributionSummary distSummary, int batchSize) {
this.distSummary = distSummary;
this.batchSize = batchSize;
this.count = 0;
this.amounts = new long[batchSize];
}

@Override
public void record(long amount) {
if (amount >= 0) {
amounts[count++] = amount;
if (count >= batchSize) {
flush();
}
}
}

@Override
public void flush() {
distSummary.record(amounts, count);
count = 0;
}

@Override
public void close() throws Exception {
flush();
}
}
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2019 Netflix, Inc.
* Copyright 2014-2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -67,4 +67,40 @@ default void record(long[] amounts, int n) {
* How often a timer is reset depends on the underlying registry implementation.
*/
long totalAmount();

/**
* Returns a helper that can be used to more efficiently update the distribution summary
* within a single thread. For example, if you need to update a meter within a loop where the
* rest of the loop body is fairly cheap, the instrumentation code may add considerable
* overhead if done in the loop body. A batched updater can offset a fair amount of that
* cost, but the updates may be delayed a bit in reaching the meter. The updates will only
* be seen after the updater is explicitly flushed.
*
* The caller should ensure that the updater is closed after using to guarantee any resources
* associated with it are cleaned up. In some cases failure to close the updater could result
* in a memory leak.
*
* @param batchSize
* Number of updates to batch before forcing a flush to the meter.
* @return
* Batch updater implementation for this meter.
*/
default BatchUpdater batchUpdater(int batchSize) {
return new DistSummaryBatchUpdater(this, batchSize);
}

/** See {@link #batchUpdater(int)}. */
interface BatchUpdater extends AutoCloseable {
/**
* Updates the statistics kept by the summary with the specified amount.
*
* @param amount
* Amount for an event being measured. For example, if the size in bytes of responses
* from a server. If the amount is less than 0 the value will be dropped.
*/
void record(long amount);

/** Push updates to the associated timer. */
void flush();
}
}
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2019 Netflix, Inc.
* Copyright 2014-2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -38,4 +38,8 @@ final class SwapCounter extends SwapMeter<Counter> implements Counter {
@Override public double actualCount() {
return get().actualCount();
}

@Override public BatchUpdater batchUpdater(int batchSize) {
return get().batchUpdater(batchSize);
}
}
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2019 Netflix, Inc.
* Copyright 2014-2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -52,4 +52,8 @@ public long count() {
public long totalAmount() {
return get().totalAmount();
}

@Override public BatchUpdater batchUpdater(int batchSize) {
return get().batchUpdater(batchSize);
}
}
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2019 Netflix, Inc.
* Copyright 2014-2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -64,4 +64,8 @@ final class SwapTimer extends SwapMeter<Timer> implements Timer {
@Override public long totalTime() {
return get().totalTime();
}

@Override public BatchUpdater batchUpdater(int batchSize) {
return get().batchUpdater(batchSize);
}
}
52 changes: 50 additions & 2 deletions spectator-api/src/main/java/com/netflix/spectator/api/Timer.java
Expand Up @@ -31,7 +31,7 @@
*/
public interface Timer extends Meter {
/**
* Updates the statistics kept by the counter with the specified amount.
* Updates the statistics kept by the timer with the specified amount.
*
* @param amount
* Duration of a single event being measured by this timer. If the amount is less than 0
Expand All @@ -42,7 +42,7 @@ public interface Timer extends Meter {
void record(long amount, TimeUnit unit);

/**
* Updates the statistics kept by the counter with the specified amount.
* Updates the statistics kept by the timer with the specified amount.
*
* @param amount
* Duration of a single event being measured by this timer.
Expand Down Expand Up @@ -124,4 +124,52 @@ default void record(Duration[] amounts, int n) {
* How often a timer is reset depends on the underlying registry implementation.
*/
long totalTime();

/**
* Returns a helper that can be used to more efficiently update the timer within a
* single thread. For example, if you need to update a meter within a loop where the
* rest of the loop body is fairly cheap, the instrumentation code may add considerable
* overhead if done in the loop body. A batched updater can offset a fair amount of that
* cost, but the updates may be delayed a bit in reaching the meter. The updates will only
* be seen after the updater is explicitly flushed.
*
* The caller should ensure that the updater is closed after using to guarantee any resources
* associated with it are cleaned up. In some cases failure to close the updater could result
* in a memory leak.
*
* @param batchSize
* Number of updates to batch before forcing a flush to the meter.
* @return
* Batch updater implementation for this meter.
*/
default BatchUpdater batchUpdater(int batchSize) {
return new TimerBatchUpdater(this, batchSize);
}

/** See {@link #batchUpdater(int)}. */
interface BatchUpdater extends AutoCloseable {
/**
* Updates the statistics kept by the timer with the specified amount.
*
* @param amount
* Duration of a single event being measured by this timer. If the amount is less than 0
* the value will be dropped.
* @param unit
* Time unit for the amount being recorded.
*/
void record(long amount, TimeUnit unit);

/**
* Updates the statistics kept by the timer with the specified amount.
*
* @param amount
* Duration of a single event being measured by this timer.
*/
default void record(Duration amount) {
record(amount.toNanos(), TimeUnit.NANOSECONDS);
}

/** Push updates to the associated timer. */
void flush();
}
}