From bf209018b3dffed8e9e99825baeacf8886bab2c1 Mon Sep 17 00:00:00 2001 From: Brian Harrington Date: Thu, 10 Nov 2022 19:06:10 -0600 Subject: [PATCH 1/3] batch updater for counter, timer, and dist summary Adds a helper that can be used to batch updates to the delegate meter. The main use-case for this is timers or distributions summaries that need to be updated from a single thread with a high throughput. For counters, this can easily be achieved by the user with a local variable to track the count and then calling increment with the amount when complete or at the desired batch interval. However, for timers and distribution summaries that doesn't work because the implementation needs the raw samples. In #941 a method was added so the samples could be accumulated in an array and posted in a batch. This helps a bit, but it is cumbersome to use, incurs memory overhead, and the full set of samples have to be traversed and processed when recorded. This change introduces batch updaters that can be customized for a given implementation. So for registries like Atlas where it is possible to accumulate the stats without the memory overhead, it can provide an optimized implementation to do so. --- .../com/netflix/spectator/api/Counter.java | 47 +++++- .../spectator/api/CounterBatchUpdater.java | 55 +++++++ .../netflix/spectator/api/DefaultTimer.java | 4 +- .../api/DistSummaryBatchUpdater.java | 53 +++++++ .../spectator/api/DistributionSummary.java | 38 ++++- .../netflix/spectator/api/SwapCounter.java | 6 +- .../api/SwapDistributionSummary.java | 6 +- .../com/netflix/spectator/api/SwapTimer.java | 6 +- .../java/com/netflix/spectator/api/Timer.java | 52 ++++++- .../spectator/api/TimerBatchUpdater.java | 55 +++++++ .../spectator/api/DefaultCounterTest.java | 62 +++++++- .../api/DefaultDistributionSummaryTest.java | 36 ++++- .../spectator/api/DefaultTimerTest.java | 46 +++++- .../netflix/spectator/atlas/BatchUpdates.java | 143 ++++++++++++++++++ .../atlas/AtlasDistSummaryBatchUpdater.java | 70 +++++++++ .../atlas/AtlasDistributionSummary.java | 17 ++- .../netflix/spectator/atlas/AtlasMeter.java | 17 ++- .../netflix/spectator/atlas/AtlasTimer.java | 15 ++ .../atlas/AtlasTimerBatchUpdater.java | 73 +++++++++ .../atlas/AtlasDistributionSummaryTest.java | 21 +++ .../spectator/atlas/AtlasTimerTest.java | 21 +++ 21 files changed, 829 insertions(+), 14 deletions(-) create mode 100644 spectator-api/src/main/java/com/netflix/spectator/api/CounterBatchUpdater.java create mode 100644 spectator-api/src/main/java/com/netflix/spectator/api/DistSummaryBatchUpdater.java create mode 100644 spectator-api/src/main/java/com/netflix/spectator/api/TimerBatchUpdater.java create mode 100644 spectator-reg-atlas/src/jmh/java/com/netflix/spectator/atlas/BatchUpdates.java create mode 100644 spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasDistSummaryBatchUpdater.java create mode 100644 spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasTimerBatchUpdater.java diff --git a/spectator-api/src/main/java/com/netflix/spectator/api/Counter.java b/spectator-api/src/main/java/com/netflix/spectator/api/Counter.java index 2a2df0595..bef9e9c15 100644 --- a/spectator-api/src/main/java/com/netflix/spectator/api/Counter.java +++ b/spectator-api/src/main/java/com/netflix/spectator/api/Counter.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2019 Netflix, Inc. + * Copyright 2014-2022 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -50,4 +50,49 @@ default long count() { * often a counter is reset depends on the underlying registry implementation. */ double actualCount(); + + /** + * Returns a helper that can be used to more efficiently update the counter within a + * single thread. For example, if you need to update a meter within a loop where the + * rest of the loop body is fairly cheap, the instrumentation code may add considerable + * overhead if done in the loop body. A batched updater can offset a fair amount of that + * cost, but the updates may be delayed a bit in reaching the meter. The updates will only + * be seen after the updater is explicitly flushed. + * + * The caller should ensure that the updater is closed after using to guarantee any resources + * associated with it are cleaned up. In some cases failure to close the updater could result + * in a memory leak. + * + * @param batchSize + * Number of updates to batch before forcing a flush to the meter. + * @return + * Batch updater implementation for this meter. + */ + default BatchUpdater batchUpdater(int batchSize) { + return new CounterBatchUpdater(this, batchSize); + } + + /** See {@link #batchUpdater(int)}. */ + interface BatchUpdater extends AutoCloseable { + /** Update the counter by one. */ + default void increment() { + add(1.0); + } + + /** + * Update the counter by {@code amount}. + * + * @param amount + * Amount to add to the counter. + */ + default void increment(long amount) { + add(amount); + } + + /** Update the counter by the specified amount. */ + void add(double amount); + + /** Push updates to the associated counter. */ + void flush(); + } } diff --git a/spectator-api/src/main/java/com/netflix/spectator/api/CounterBatchUpdater.java b/spectator-api/src/main/java/com/netflix/spectator/api/CounterBatchUpdater.java new file mode 100644 index 000000000..9e50c5c17 --- /dev/null +++ b/spectator-api/src/main/java/com/netflix/spectator/api/CounterBatchUpdater.java @@ -0,0 +1,55 @@ +/* + * Copyright 2014-2022 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spectator.api; + +final class CounterBatchUpdater implements Counter.BatchUpdater { + + private final Counter counter; + private final int batchSize; + + private int count; + private double sum; + + CounterBatchUpdater(Counter counter, int batchSize) { + this.counter = counter; + this.batchSize = batchSize; + this.count = 0; + this.sum = 0.0; + } + + @Override + public void add(double amount) { + if (Double.isFinite(amount) && amount > 0.0) { + sum += amount; + ++count; + if (count >= batchSize) { + flush(); + } + } + } + + @Override + public void flush() { + counter.add(sum); + sum = 0.0; + count = 0; + } + + @Override + public void close() throws Exception { + flush(); + } +} diff --git a/spectator-api/src/main/java/com/netflix/spectator/api/DefaultTimer.java b/spectator-api/src/main/java/com/netflix/spectator/api/DefaultTimer.java index 9d3392ade..07bba0de3 100644 --- a/spectator-api/src/main/java/com/netflix/spectator/api/DefaultTimer.java +++ b/spectator-api/src/main/java/com/netflix/spectator/api/DefaultTimer.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2019 Netflix, Inc. + * Copyright 2014-2022 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,7 +44,7 @@ final class DefaultTimer extends AbstractTimer { } @Override public void record(long amount, TimeUnit unit) { - if (amount >= 0) { + if (amount >= 0L) { final long nanos = TimeUnit.NANOSECONDS.convert(amount, unit); totalTime.addAndGet(nanos); count.incrementAndGet(); diff --git a/spectator-api/src/main/java/com/netflix/spectator/api/DistSummaryBatchUpdater.java b/spectator-api/src/main/java/com/netflix/spectator/api/DistSummaryBatchUpdater.java new file mode 100644 index 000000000..12165eb2e --- /dev/null +++ b/spectator-api/src/main/java/com/netflix/spectator/api/DistSummaryBatchUpdater.java @@ -0,0 +1,53 @@ +/* + * Copyright 2014-2022 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spectator.api; + +class DistSummaryBatchUpdater implements DistributionSummary.BatchUpdater { + + private final DistributionSummary distSummary; + private final int batchSize; + + private int count; + private final long[] amounts; + + DistSummaryBatchUpdater(DistributionSummary distSummary, int batchSize) { + this.distSummary = distSummary; + this.batchSize = batchSize; + this.count = 0; + this.amounts = new long[batchSize]; + } + + @Override + public void record(long amount) { + if (amount >= 0) { + amounts[count++] = amount; + if (count >= batchSize) { + flush(); + } + } + } + + @Override + public void flush() { + distSummary.record(amounts, count); + count = 0; + } + + @Override + public void close() throws Exception { + flush(); + } +} diff --git a/spectator-api/src/main/java/com/netflix/spectator/api/DistributionSummary.java b/spectator-api/src/main/java/com/netflix/spectator/api/DistributionSummary.java index 897ec8992..95a90d9eb 100644 --- a/spectator-api/src/main/java/com/netflix/spectator/api/DistributionSummary.java +++ b/spectator-api/src/main/java/com/netflix/spectator/api/DistributionSummary.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2019 Netflix, Inc. + * Copyright 2014-2022 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -67,4 +67,40 @@ default void record(long[] amounts, int n) { * How often a timer is reset depends on the underlying registry implementation. */ long totalAmount(); + + /** + * Returns a helper that can be used to more efficiently update the distribution summary + * within a single thread. For example, if you need to update a meter within a loop where the + * rest of the loop body is fairly cheap, the instrumentation code may add considerable + * overhead if done in the loop body. A batched updater can offset a fair amount of that + * cost, but the updates may be delayed a bit in reaching the meter. The updates will only + * be seen after the updater is explicitly flushed. + * + * The caller should ensure that the updater is closed after using to guarantee any resources + * associated with it are cleaned up. In some cases failure to close the updater could result + * in a memory leak. + * + * @param batchSize + * Number of updates to batch before forcing a flush to the meter. + * @return + * Batch updater implementation for this meter. + */ + default BatchUpdater batchUpdater(int batchSize) { + return new DistSummaryBatchUpdater(this, batchSize); + } + + /** See {@link #batchUpdater(int)}. */ + interface BatchUpdater extends AutoCloseable { + /** + * Updates the statistics kept by the summary with the specified amount. + * + * @param amount + * Amount for an event being measured. For example, if the size in bytes of responses + * from a server. If the amount is less than 0 the value will be dropped. + */ + void record(long amount); + + /** Push updates to the associated timer. */ + void flush(); + } } diff --git a/spectator-api/src/main/java/com/netflix/spectator/api/SwapCounter.java b/spectator-api/src/main/java/com/netflix/spectator/api/SwapCounter.java index 45869ab46..7ec984fa6 100644 --- a/spectator-api/src/main/java/com/netflix/spectator/api/SwapCounter.java +++ b/spectator-api/src/main/java/com/netflix/spectator/api/SwapCounter.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2019 Netflix, Inc. + * Copyright 2014-2022 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,4 +38,8 @@ final class SwapCounter extends SwapMeter implements Counter { @Override public double actualCount() { return get().actualCount(); } + + @Override public BatchUpdater batchUpdater(int batchSize) { + return get().batchUpdater(batchSize); + } } diff --git a/spectator-api/src/main/java/com/netflix/spectator/api/SwapDistributionSummary.java b/spectator-api/src/main/java/com/netflix/spectator/api/SwapDistributionSummary.java index 644813796..f62360511 100644 --- a/spectator-api/src/main/java/com/netflix/spectator/api/SwapDistributionSummary.java +++ b/spectator-api/src/main/java/com/netflix/spectator/api/SwapDistributionSummary.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2019 Netflix, Inc. + * Copyright 2014-2022 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -52,4 +52,8 @@ public long count() { public long totalAmount() { return get().totalAmount(); } + + @Override public BatchUpdater batchUpdater(int batchSize) { + return get().batchUpdater(batchSize); + } } diff --git a/spectator-api/src/main/java/com/netflix/spectator/api/SwapTimer.java b/spectator-api/src/main/java/com/netflix/spectator/api/SwapTimer.java index 6873c29f7..f371deebb 100644 --- a/spectator-api/src/main/java/com/netflix/spectator/api/SwapTimer.java +++ b/spectator-api/src/main/java/com/netflix/spectator/api/SwapTimer.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2019 Netflix, Inc. + * Copyright 2014-2022 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -64,4 +64,8 @@ final class SwapTimer extends SwapMeter implements Timer { @Override public long totalTime() { return get().totalTime(); } + + @Override public BatchUpdater batchUpdater(int batchSize) { + return get().batchUpdater(batchSize); + } } diff --git a/spectator-api/src/main/java/com/netflix/spectator/api/Timer.java b/spectator-api/src/main/java/com/netflix/spectator/api/Timer.java index 96134257b..8eed5eb51 100644 --- a/spectator-api/src/main/java/com/netflix/spectator/api/Timer.java +++ b/spectator-api/src/main/java/com/netflix/spectator/api/Timer.java @@ -31,7 +31,7 @@ */ public interface Timer extends Meter { /** - * Updates the statistics kept by the counter with the specified amount. + * Updates the statistics kept by the timer with the specified amount. * * @param amount * Duration of a single event being measured by this timer. If the amount is less than 0 @@ -42,7 +42,7 @@ public interface Timer extends Meter { void record(long amount, TimeUnit unit); /** - * Updates the statistics kept by the counter with the specified amount. + * Updates the statistics kept by the timer with the specified amount. * * @param amount * Duration of a single event being measured by this timer. @@ -124,4 +124,52 @@ default void record(Duration[] amounts, int n) { * How often a timer is reset depends on the underlying registry implementation. */ long totalTime(); + + /** + * Returns a helper that can be used to more efficiently update the timer within a + * single thread. For example, if you need to update a meter within a loop where the + * rest of the loop body is fairly cheap, the instrumentation code may add considerable + * overhead if done in the loop body. A batched updater can offset a fair amount of that + * cost, but the updates may be delayed a bit in reaching the meter. The updates will only + * be seen after the updater is explicitly flushed. + * + * The caller should ensure that the updater is closed after using to guarantee any resources + * associated with it are cleaned up. In some cases failure to close the updater could result + * in a memory leak. + * + * @param batchSize + * Number of updates to batch before forcing a flush to the meter. + * @return + * Batch updater implementation for this meter. + */ + default BatchUpdater batchUpdater(int batchSize) { + return new TimerBatchUpdater(this, batchSize); + } + + /** See {@link #batchUpdater(int)}. */ + interface BatchUpdater extends AutoCloseable { + /** + * Updates the statistics kept by the timer with the specified amount. + * + * @param amount + * Duration of a single event being measured by this timer. If the amount is less than 0 + * the value will be dropped. + * @param unit + * Time unit for the amount being recorded. + */ + void record(long amount, TimeUnit unit); + + /** + * Updates the statistics kept by the timer with the specified amount. + * + * @param amount + * Duration of a single event being measured by this timer. + */ + default void record(Duration amount) { + record(amount.toNanos(), TimeUnit.NANOSECONDS); + } + + /** Push updates to the associated timer. */ + void flush(); + } } diff --git a/spectator-api/src/main/java/com/netflix/spectator/api/TimerBatchUpdater.java b/spectator-api/src/main/java/com/netflix/spectator/api/TimerBatchUpdater.java new file mode 100644 index 000000000..f1d58bfb9 --- /dev/null +++ b/spectator-api/src/main/java/com/netflix/spectator/api/TimerBatchUpdater.java @@ -0,0 +1,55 @@ +/* + * Copyright 2014-2022 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spectator.api; + +import java.util.concurrent.TimeUnit; + +final class TimerBatchUpdater implements Timer.BatchUpdater { + + private final Timer timer; + private final int batchSize; + + private int count; + private final long[] amounts; + + TimerBatchUpdater(Timer timer, int batchSize) { + this.timer = timer; + this.batchSize = batchSize; + this.count = 0; + this.amounts = new long[batchSize]; + } + + @Override + public void record(long amount, TimeUnit unit) { + if (amount >= 0L) { + amounts[count++] = unit.toNanos(amount); + if (count >= batchSize) { + flush(); + } + } + } + + @Override + public void flush() { + timer.record(amounts, count, TimeUnit.NANOSECONDS); + count = 0; + } + + @Override + public void close() throws Exception { + flush(); + } +} diff --git a/spectator-api/src/test/java/com/netflix/spectator/api/DefaultCounterTest.java b/spectator-api/src/test/java/com/netflix/spectator/api/DefaultCounterTest.java index 1fbff8832..1ced810cf 100644 --- a/spectator-api/src/test/java/com/netflix/spectator/api/DefaultCounterTest.java +++ b/spectator-api/src/test/java/com/netflix/spectator/api/DefaultCounterTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2019 Netflix, Inc. + * Copyright 2014-2022 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,6 +38,20 @@ public void testIncrement() { Assertions.assertEquals(c.count(), 3L); } + @Test + public void testIncrementBatch() throws Exception { + Counter c = new DefaultCounter(clock, NoopId.INSTANCE); + try (Counter.BatchUpdater b = c.batchUpdater(2)) { + b.increment(); + Assertions.assertEquals(c.count(), 0L); + b.increment(); + Assertions.assertEquals(c.count(), 2L); + b.increment(); + Assertions.assertEquals(c.count(), 2L); + } + Assertions.assertEquals(c.count(), 3L); + } + @Test public void testIncrementAmount() { Counter c = new DefaultCounter(clock, NoopId.INSTANCE); @@ -45,6 +59,15 @@ public void testIncrementAmount() { Assertions.assertEquals(c.count(), 42L); } + @Test + public void testIncrementAmountBatch() throws Exception { + Counter c = new DefaultCounter(clock, NoopId.INSTANCE); + try (Counter.BatchUpdater b = c.batchUpdater(2)) { + b.increment(42); + } + Assertions.assertEquals(c.count(), 42L); + } + @Test public void testAddAmount() { Counter c = new DefaultCounter(clock, NoopId.INSTANCE); @@ -52,6 +75,15 @@ public void testAddAmount() { Assertions.assertEquals(c.actualCount(), 42.0, 1e-12); } + @Test + public void testAddAmountBatch() throws Exception { + Counter c = new DefaultCounter(clock, NoopId.INSTANCE); + try (Counter.BatchUpdater b = c.batchUpdater(2)) { + b.add(42.0); + } + Assertions.assertEquals(c.actualCount(), 42.0, 1e-12); + } + @Test public void testAddNegativeAmount() { Counter c = new DefaultCounter(clock, NoopId.INSTANCE); @@ -59,6 +91,15 @@ public void testAddNegativeAmount() { Assertions.assertEquals(c.actualCount(), 0.0, 1e-12); } + @Test + public void testAddNegativeAmountBatch() throws Exception { + Counter c = new DefaultCounter(clock, NoopId.INSTANCE); + try (Counter.BatchUpdater b = c.batchUpdater(2)) { + b.add(-42.0); + } + Assertions.assertEquals(c.actualCount(), 0.0, 1e-12); + } + @Test public void testAddNaN() { Counter c = new DefaultCounter(clock, NoopId.INSTANCE); @@ -67,6 +108,16 @@ public void testAddNaN() { Assertions.assertEquals(c.actualCount(), 1.0, 1e-12); } + @Test + public void testAddNaNBatch() throws Exception { + Counter c = new DefaultCounter(clock, NoopId.INSTANCE); + try (Counter.BatchUpdater b = c.batchUpdater(2)) { + b.add(1.0); + b.add(Double.NaN); + } + Assertions.assertEquals(c.actualCount(), 1.0, 1e-12); + } + @Test public void testAddInfinity() { Counter c = new DefaultCounter(clock, NoopId.INSTANCE); @@ -74,6 +125,15 @@ public void testAddInfinity() { Assertions.assertEquals(c.actualCount(), 0.0, 1e-12); } + @Test + public void testAddInfinityBatch() throws Exception { + Counter c = new DefaultCounter(clock, NoopId.INSTANCE); + try (Counter.BatchUpdater b = c.batchUpdater(2)) { + b.add(Double.POSITIVE_INFINITY); + } + Assertions.assertEquals(c.actualCount(), 0.0, 1e-12); + } + @Test public void testMeasure() { Counter c = new DefaultCounter(clock, NoopId.INSTANCE); diff --git a/spectator-api/src/test/java/com/netflix/spectator/api/DefaultDistributionSummaryTest.java b/spectator-api/src/test/java/com/netflix/spectator/api/DefaultDistributionSummaryTest.java index 92d4cd956..5d5c88400 100644 --- a/spectator-api/src/test/java/com/netflix/spectator/api/DefaultDistributionSummaryTest.java +++ b/spectator-api/src/test/java/com/netflix/spectator/api/DefaultDistributionSummaryTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2019 Netflix, Inc. + * Copyright 2014-2022 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,6 +38,20 @@ public void testRecord() { Assertions.assertEquals(t.totalAmount(), 42L); } + @Test + public void testRecordBatch() throws Exception { + DistributionSummary t = new DefaultDistributionSummary(clock, NoopId.INSTANCE); + try (DistributionSummary.BatchUpdater b = t.batchUpdater(2)) { + b.record(42); + b.record(42); + Assertions.assertEquals(t.count(), 2L); + Assertions.assertEquals(t.totalAmount(), 84L); + b.record(1); + } + Assertions.assertEquals(t.count(), 3L); + Assertions.assertEquals(t.totalAmount(), 85L); + } + @Test public void testRecordNegative() { DistributionSummary t = new DefaultDistributionSummary(clock, NoopId.INSTANCE); @@ -46,6 +60,16 @@ public void testRecordNegative() { Assertions.assertEquals(t.totalAmount(), 0L); } + @Test + public void testRecordNegativeBatch() throws Exception { + DistributionSummary t = new DefaultDistributionSummary(clock, NoopId.INSTANCE); + try (DistributionSummary.BatchUpdater b = t.batchUpdater(2)) { + b.record(-42); + } + Assertions.assertEquals(t.count(), 0L); + Assertions.assertEquals(t.totalAmount(), 0L); + } + @Test public void testRecordZero() { DistributionSummary t = new DefaultDistributionSummary(clock, NoopId.INSTANCE); @@ -54,6 +78,16 @@ public void testRecordZero() { Assertions.assertEquals(t.totalAmount(), 0L); } + @Test + public void testRecordZeroBatch() throws Exception { + DistributionSummary t = new DefaultDistributionSummary(clock, NoopId.INSTANCE); + try (DistributionSummary.BatchUpdater b = t.batchUpdater(2)) { + b.record(0); + } + Assertions.assertEquals(t.count(), 1L); + Assertions.assertEquals(t.totalAmount(), 0L); + } + @Test public void testMeasure() { DistributionSummary t = new DefaultDistributionSummary(clock, new DefaultId("foo")); diff --git a/spectator-api/src/test/java/com/netflix/spectator/api/DefaultTimerTest.java b/spectator-api/src/test/java/com/netflix/spectator/api/DefaultTimerTest.java index 8507ca405..fde213a3c 100644 --- a/spectator-api/src/test/java/com/netflix/spectator/api/DefaultTimerTest.java +++ b/spectator-api/src/test/java/com/netflix/spectator/api/DefaultTimerTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2019 Netflix, Inc. + * Copyright 2014-2022 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -41,6 +41,20 @@ public void testRecord() { Assertions.assertEquals(t.totalTime(), 42000000L); } + @Test + public void testRecordBatch() throws Exception { + Timer t = new DefaultTimer(clock, NoopId.INSTANCE); + try (Timer.BatchUpdater b = t.batchUpdater(2)) { + b.record(42, TimeUnit.MILLISECONDS); + b.record(42, TimeUnit.MILLISECONDS); + Assertions.assertEquals(t.count(), 2L); + Assertions.assertEquals(t.totalTime(), 84000000L); + b.record(1, TimeUnit.MILLISECONDS); + } + Assertions.assertEquals(t.count(), 3L); + Assertions.assertEquals(t.totalTime(), 85000000L); + } + @Test public void testRecordDuration() { Timer t = new DefaultTimer(clock, NoopId.INSTANCE); @@ -49,6 +63,16 @@ public void testRecordDuration() { Assertions.assertEquals(t.totalTime(), 42000000L); } + @Test + public void testRecordDurationBatch() throws Exception { + Timer t = new DefaultTimer(clock, NoopId.INSTANCE); + try (Timer.BatchUpdater b = t.batchUpdater(2)) { + b.record(Duration.ofMillis(42)); + } + Assertions.assertEquals(t.count(), 1L); + Assertions.assertEquals(t.totalTime(), 42000000L); + } + @Test public void testRecordNegative() { Timer t = new DefaultTimer(clock, NoopId.INSTANCE); @@ -57,6 +81,16 @@ public void testRecordNegative() { Assertions.assertEquals(t.totalTime(), 0L); } + @Test + public void testRecordNegativeBatch() throws Exception { + Timer t = new DefaultTimer(clock, NoopId.INSTANCE); + try (Timer.BatchUpdater b = t.batchUpdater(2)) { + b.record(-42, TimeUnit.MILLISECONDS); + } + Assertions.assertEquals(t.count(), 0L); + Assertions.assertEquals(t.totalTime(), 0L); + } + @Test public void testRecordZero() { Timer t = new DefaultTimer(clock, NoopId.INSTANCE); @@ -65,6 +99,16 @@ public void testRecordZero() { Assertions.assertEquals(t.totalTime(), 0L); } + @Test + public void testRecordZeroBatch() throws Exception { + Timer t = new DefaultTimer(clock, NoopId.INSTANCE); + try (Timer.BatchUpdater b = t.batchUpdater(2)) { + b.record(0, TimeUnit.MILLISECONDS); + } + Assertions.assertEquals(t.count(), 1L); + Assertions.assertEquals(t.totalTime(), 0L); + } + @Test public void testRecordCallable() throws Exception { Timer t = new DefaultTimer(clock, NoopId.INSTANCE); diff --git a/spectator-reg-atlas/src/jmh/java/com/netflix/spectator/atlas/BatchUpdates.java b/spectator-reg-atlas/src/jmh/java/com/netflix/spectator/atlas/BatchUpdates.java new file mode 100644 index 000000000..d3c3ad834 --- /dev/null +++ b/spectator-reg-atlas/src/jmh/java/com/netflix/spectator/atlas/BatchUpdates.java @@ -0,0 +1,143 @@ +/* + * Copyright 2014-2022 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spectator.atlas; + +import com.netflix.spectator.api.Clock; +import com.netflix.spectator.api.Counter; +import com.netflix.spectator.api.DistributionSummary; +import com.netflix.spectator.api.Registry; +import com.netflix.spectator.api.Timer; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.infra.Blackhole; + +import java.util.concurrent.TimeUnit; + +/** + *
+ * Benchmark                          Mode  Cnt      Score      Error   Units
+ *
+ * noInstrumentation                 thrpt    5   3978.248 ±  136.863   ops/s
+ *
+ * counter                           thrpt    5     14.138 ±    0.229   ops/s
+ * counterBatch                      thrpt    5    464.445 ±    8.175   ops/s
+ *
+ * distSummary                       thrpt    5      9.383 ±    0.732   ops/s
+ * distSummaryBatch                  thrpt    5    353.769 ±   10.698   ops/s
+ *
+ * timer                             thrpt    5     10.505 ±    0.170   ops/s
+ * timerBatch                        thrpt    5    336.505 ±    3.538   ops/s
+ * 
+ */ +@State(Scope.Thread) +public class BatchUpdates { + + private Registry registry; + + @Setup + public void setup() { + registry = new AtlasRegistry(Clock.SYSTEM, System::getProperty); + } + + @TearDown + public void tearDown() { + registry = null; + } + + @Benchmark + public void noInstrumentation(Blackhole bh) { + long sum = 0L; + for (int i = 0; i < 1_000_000; ++i) { + sum += i; + } + bh.consume(sum); + } + + @Benchmark + public void counter(Blackhole bh) { + Counter c = registry.counter("test"); + long sum = 0L; + for (int i = 0; i < 1_000_000; ++i) { + sum += i; + c.increment(); + } + bh.consume(sum); + } + + @Benchmark + public void counterBatch(Blackhole bh) throws Exception { + Counter c = registry.counter("test"); + try (Counter.BatchUpdater b = c.batchUpdater(100_000)) { + long sum = 0L; + for (int i = 0; i < 1_000_000; ++i) { + sum += i; + b.increment(); + } + bh.consume(sum); + } + } + + @Benchmark + public void timer(Blackhole bh) { + Timer t = registry.timer("test"); + long sum = 0L; + for (int i = 0; i < 1_000_000; ++i) { + sum += i; + t.record(i, TimeUnit.MILLISECONDS); + } + bh.consume(sum); + } + + @Benchmark + public void timerBatch(Blackhole bh) throws Exception { + Timer t = registry.timer("test"); + try (Timer.BatchUpdater b = t.batchUpdater(100_000)) { + long sum = 0L; + for (int i = 0; i < 1_000_000; ++i) { + sum += i; + b.record(i, TimeUnit.MILLISECONDS); + } + bh.consume(sum); + } + } + + @Benchmark + public void distSummary(Blackhole bh) { + DistributionSummary d = registry.distributionSummary("test"); + long sum = 0L; + for (int i = 0; i < 1_000_000; ++i) { + sum += i; + d.record(i); + } + bh.consume(sum); + } + + @Benchmark + public void distSummaryBatch(Blackhole bh) throws Exception { + DistributionSummary d = registry.distributionSummary("test"); + try (DistributionSummary.BatchUpdater b = d.batchUpdater(100_000)) { + long sum = 0L; + for (int i = 0; i < 1_000_000; ++i) { + sum += i; + b.record(i); + } + bh.consume(sum); + } + } +} diff --git a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasDistSummaryBatchUpdater.java b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasDistSummaryBatchUpdater.java new file mode 100644 index 000000000..cee7e5353 --- /dev/null +++ b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasDistSummaryBatchUpdater.java @@ -0,0 +1,70 @@ +/* + * Copyright 2014-2022 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spectator.atlas; + +import com.netflix.spectator.api.DistributionSummary; + +final class AtlasDistSummaryBatchUpdater implements DistributionSummary.BatchUpdater { + + private AtlasDistributionSummary distSummary; + private final int batchSize; + + private int count; + private long total; + private double totalOfSquares; + private long max; + + AtlasDistSummaryBatchUpdater(AtlasDistributionSummary distSummary, int batchSize) { + this.distSummary = distSummary; + this.batchSize = batchSize; + distSummary.updateRefCount(1); + } + + @Override + public void record(long amount) { + ++count; + if (amount > 0L) { + total += amount; + totalOfSquares += (double) amount * amount; + if (amount > max) { + max = amount; + } + } + if (count >= batchSize) { + flush(); + } + } + + @Override + public void flush() { + if (distSummary != null) { + distSummary.update(count, total, totalOfSquares, max); + count = 0; + total = 0L; + totalOfSquares = 0.0; + max = 0L; + } + } + + @Override + public void close() throws Exception { + if (distSummary != null) { + flush(); + distSummary.updateRefCount(-1); + distSummary = null; + } + } +} diff --git a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasDistributionSummary.java b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasDistributionSummary.java index 639b3f057..092839cee 100644 --- a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasDistributionSummary.java +++ b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasDistributionSummary.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2019 Netflix, Inc. + * Copyright 2014-2022 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -142,4 +142,19 @@ private void updateMax(AtomicLong maxValue, long v) { @Override public long totalAmount() { return total.poll(); } + + @Override public BatchUpdater batchUpdater(int batchSize) { + return new AtlasDistSummaryBatchUpdater(this, batchSize); + } + + /** + * Helper to allow the batch updater to directly update the individual stats. + */ + void update(long count, long total, double totalOfSquares, long max) { + long now = clock.wallTime(); + this.count.getCurrent(now).addAndGet(count); + this.total.getCurrent(now).addAndGet(total); + this.totalOfSquares.getCurrent(now).addAndGet(totalOfSquares); + updateMax(this.max.getCurrent(now), max); + } } diff --git a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasMeter.java b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasMeter.java index 4b40996a9..a51f30f2b 100644 --- a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasMeter.java +++ b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasMeter.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; /** Base class for core meter types used by AtlasRegistry. */ abstract class AtlasMeter implements Meter { @@ -38,12 +39,19 @@ abstract class AtlasMeter implements Meter { /** Last time this meter was updated. */ private volatile long lastUpdated; + /** + * Reference count for batch updaters. The meter cannot be expired while a batch updater + * instance is around. + */ + private AtomicInteger batchUpdaterRefCount; + /** Create a new instance. */ AtlasMeter(Id id, Clock clock, long ttl) { this.id = id; this.clock = clock; this.ttl = ttl; lastUpdated = clock.wallTime(); + batchUpdaterRefCount = new AtomicInteger(0); } /** @@ -54,12 +62,19 @@ void updateLastModTime(long now) { lastUpdated = now; } + /** + * Updates the reference count for the number of batch updaters. + */ + void updateRefCount(int amount) { + batchUpdaterRefCount.addAndGet(amount); + } + @Override public Id id() { return id; } @Override public boolean hasExpired() { - return clock.wallTime() - lastUpdated > ttl; + return clock.wallTime() - lastUpdated > ttl && batchUpdaterRefCount.get() <= 0; } @Override public Iterable measure() { diff --git a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasTimer.java b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasTimer.java index 7cfba4bea..21b37c9e0 100644 --- a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasTimer.java +++ b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasTimer.java @@ -194,4 +194,19 @@ private void updateMax(AtomicLong maxValue, long v) { // unit tests so it is rarely a problem in practice. API can be revisited in 2.0. return (long) total.poll(); } + + @Override public BatchUpdater batchUpdater(int batchSize) { + return new AtlasTimerBatchUpdater(this, batchSize); + } + + /** + * Helper to allow the batch updater to directly update the individual stats. + */ + void update(long count, double total, double totalOfSquares, long max) { + long now = clock.wallTime(); + this.count.getCurrent(now).addAndGet(count); + this.total.getCurrent(now).addAndGet(total); + this.totalOfSquares.getCurrent(now).addAndGet(totalOfSquares); + updateMax(this.max.getCurrent(now), max); + } } diff --git a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasTimerBatchUpdater.java b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasTimerBatchUpdater.java new file mode 100644 index 000000000..bae0446ef --- /dev/null +++ b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasTimerBatchUpdater.java @@ -0,0 +1,73 @@ +/* + * Copyright 2014-2022 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spectator.atlas; + +import com.netflix.spectator.api.Timer; + +import java.util.concurrent.TimeUnit; + +final class AtlasTimerBatchUpdater implements Timer.BatchUpdater { + + private AtlasTimer timer; + private final int batchSize; + + private int count; + private double total; + private double totalOfSquares; + private long max; + + AtlasTimerBatchUpdater(AtlasTimer timer, int batchSize) { + this.timer = timer; + this.batchSize = batchSize; + timer.updateRefCount(1); + } + + @Override + public void record(long amount, TimeUnit unit) { + ++count; + if (amount > 0L) { + final long nanos = unit.toNanos(amount); + total += nanos; + totalOfSquares += (double) nanos * nanos; + if (nanos > max) { + max = nanos; + } + } + if (count >= batchSize) { + flush(); + } + } + + @Override + public void flush() { + if (timer != null) { + timer.update(count, total, totalOfSquares, max); + count = 0; + total = 0.0; + totalOfSquares = 0.0; + max = 0L; + } + } + + @Override + public void close() throws Exception { + if (timer != null) { + flush(); + timer.updateRefCount(-1); + timer = null; + } + } +} diff --git a/spectator-reg-atlas/src/test/java/com/netflix/spectator/atlas/AtlasDistributionSummaryTest.java b/spectator-reg-atlas/src/test/java/com/netflix/spectator/atlas/AtlasDistributionSummaryTest.java index 6da965966..c4c2fb6db 100644 --- a/spectator-reg-atlas/src/test/java/com/netflix/spectator/atlas/AtlasDistributionSummaryTest.java +++ b/spectator-reg-atlas/src/test/java/com/netflix/spectator/atlas/AtlasDistributionSummaryTest.java @@ -18,6 +18,7 @@ import java.util.Arrays; import com.netflix.spectator.api.DefaultRegistry; +import com.netflix.spectator.api.DistributionSummary; import com.netflix.spectator.api.Id; import com.netflix.spectator.api.Measurement; import com.netflix.spectator.api.Registry; @@ -110,6 +111,26 @@ public void recordSeveralValues() { checkValue(4, 1 + 2 + 3 + 1, 1 + 4 + 9 + 1, 3); } + public void recordSeveralValuesBatch(int batchSize) throws Exception { + try (DistributionSummary.BatchUpdater b = dist.batchUpdater(batchSize)) { + b.record(1); + b.record(2); + b.record(3); + b.record(1); + } + clock.setWallTime(step + 1); + checkValue(4, 1 + 2 + 3 + 1, 1 + 4 + 9 + 1, 3); + } + + @Test + public void recordSeveralValuesBatch() throws Exception { + recordSeveralValuesBatch(1); + recordSeveralValuesBatch(2); + recordSeveralValuesBatch(3); + recordSeveralValuesBatch(4); + recordSeveralValuesBatch(5); + } + @Test public void recordBatchMismatchedLengths() { dist.record(new long[0], 1); diff --git a/spectator-reg-atlas/src/test/java/com/netflix/spectator/atlas/AtlasTimerTest.java b/spectator-reg-atlas/src/test/java/com/netflix/spectator/atlas/AtlasTimerTest.java index 72dd59a6f..977e51856 100644 --- a/spectator-reg-atlas/src/test/java/com/netflix/spectator/atlas/AtlasTimerTest.java +++ b/spectator-reg-atlas/src/test/java/com/netflix/spectator/atlas/AtlasTimerTest.java @@ -19,6 +19,7 @@ import com.netflix.spectator.api.Id; import com.netflix.spectator.api.Measurement; import com.netflix.spectator.api.Registry; +import com.netflix.spectator.api.Timer; import com.netflix.spectator.api.Utils; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -144,6 +145,26 @@ public void recordSeveralValues() { checkValue(4, 1 + 2 + 3 + 1, 1 + 4 + 9 + 1, 3); } + public void recordSeveralValuesBatch(int batchSize) throws Exception { + try (Timer.BatchUpdater b = dist.batchUpdater(batchSize)) { + b.record(1, TimeUnit.NANOSECONDS); + b.record(2, TimeUnit.NANOSECONDS); + b.record(3, TimeUnit.NANOSECONDS); + b.record(1, TimeUnit.NANOSECONDS); + } + clock.setWallTime(step + 1); + checkValue(4, 1 + 2 + 3 + 1, 1 + 4 + 9 + 1, 3); + } + + @Test + public void recordSeveralValuesBatch() throws Exception { + recordSeveralValuesBatch(1); + recordSeveralValuesBatch(2); + recordSeveralValuesBatch(3); + recordSeveralValuesBatch(4); + recordSeveralValuesBatch(5); + } + @Test public void recordBatchMismatchedLengths() { dist.record(new long[0], 1, TimeUnit.NANOSECONDS); From 363aeccaa545d7a29307b0353b4de19ab8af3bac Mon Sep 17 00:00:00 2001 From: Brian Harrington Date: Fri, 11 Nov 2022 09:22:08 -0600 Subject: [PATCH 2/3] avoid ref count Swap implementations will check if updater type can take a supplier to resolve the meter to avoid issues with holding on to one that has expired. --- .../netflix/spectator/api/SwapCounter.java | 12 ++- .../api/SwapDistributionSummary.java | 12 ++- .../com/netflix/spectator/api/SwapTimer.java | 12 ++- .../spectator/api/ExpiringRegistry.java | 76 +++++++++++++++++++ .../netflix/spectator/api/SwapMeterTest.java | 63 +++++++++++++++ .../atlas/AtlasDistSummaryBatchUpdater.java | 31 +++++--- .../atlas/AtlasDistributionSummary.java | 4 +- .../netflix/spectator/atlas/AtlasMeter.java | 17 +---- .../netflix/spectator/atlas/AtlasTimer.java | 4 +- .../atlas/AtlasTimerBatchUpdater.java | 29 ++++--- .../atlas/AtlasDistributionSummaryTest.java | 3 +- .../spectator/atlas/AtlasTimerTest.java | 5 +- 12 files changed, 221 insertions(+), 47 deletions(-) diff --git a/spectator-api/src/main/java/com/netflix/spectator/api/SwapCounter.java b/spectator-api/src/main/java/com/netflix/spectator/api/SwapCounter.java index 7ec984fa6..7ecfbf732 100644 --- a/spectator-api/src/main/java/com/netflix/spectator/api/SwapCounter.java +++ b/spectator-api/src/main/java/com/netflix/spectator/api/SwapCounter.java @@ -17,7 +17,9 @@ import com.netflix.spectator.impl.SwapMeter; +import java.util.function.Consumer; import java.util.function.LongSupplier; +import java.util.function.Supplier; /** Wraps another counter allowing the underlying type to be swapped. */ final class SwapCounter extends SwapMeter implements Counter { @@ -39,7 +41,15 @@ final class SwapCounter extends SwapMeter implements Counter { return get().actualCount(); } + @SuppressWarnings("unchecked") @Override public BatchUpdater batchUpdater(int batchSize) { - return get().batchUpdater(batchSize); + BatchUpdater updater = get().batchUpdater(batchSize); + // Registry implementations can implement `Consumer>` to allow the + // meter to be resolved when flushed and avoid needing to hold on to a particular + // instance of the meter that might have expired and been removed from the registry. + if (updater instanceof Consumer) { + ((Consumer>) updater).accept(this::get); + } + return updater; } } diff --git a/spectator-api/src/main/java/com/netflix/spectator/api/SwapDistributionSummary.java b/spectator-api/src/main/java/com/netflix/spectator/api/SwapDistributionSummary.java index f62360511..a1bb02b8c 100644 --- a/spectator-api/src/main/java/com/netflix/spectator/api/SwapDistributionSummary.java +++ b/spectator-api/src/main/java/com/netflix/spectator/api/SwapDistributionSummary.java @@ -17,7 +17,9 @@ import com.netflix.spectator.impl.SwapMeter; +import java.util.function.Consumer; import java.util.function.LongSupplier; +import java.util.function.Supplier; /** Wraps another distribution summary allowing the underlying type to be swapped. */ final class SwapDistributionSummary extends SwapMeter implements DistributionSummary { @@ -53,7 +55,15 @@ public long totalAmount() { return get().totalAmount(); } + @SuppressWarnings("unchecked") @Override public BatchUpdater batchUpdater(int batchSize) { - return get().batchUpdater(batchSize); + BatchUpdater updater = get().batchUpdater(batchSize); + // Registry implementations can implement `Consumer>` to + // allow the meter to be resolved when flushed and avoid needing to hold on to a particular + // instance of the meter that might have expired and been removed from the registry. + if (updater instanceof Consumer) { + ((Consumer>) updater).accept(this::get); + } + return updater; } } diff --git a/spectator-api/src/main/java/com/netflix/spectator/api/SwapTimer.java b/spectator-api/src/main/java/com/netflix/spectator/api/SwapTimer.java index f371deebb..1e5ff4823 100644 --- a/spectator-api/src/main/java/com/netflix/spectator/api/SwapTimer.java +++ b/spectator-api/src/main/java/com/netflix/spectator/api/SwapTimer.java @@ -19,7 +19,9 @@ import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.function.LongSupplier; +import java.util.function.Supplier; /** Wraps another timer allowing the underlying type to be swapped. */ final class SwapTimer extends SwapMeter implements Timer { @@ -65,7 +67,15 @@ final class SwapTimer extends SwapMeter implements Timer { return get().totalTime(); } + @SuppressWarnings("unchecked") @Override public BatchUpdater batchUpdater(int batchSize) { - return get().batchUpdater(batchSize); + BatchUpdater updater = get().batchUpdater(batchSize); + // Registry implementations can implement `Consumer>` to allow the + // meter to be resolved when flushed and avoid needing to hold on to a particular + // instance of the meter that might have expired and been removed from the registry. + if (updater instanceof Consumer) { + ((Consumer>) updater).accept(this::get); + } + return updater; } } diff --git a/spectator-api/src/test/java/com/netflix/spectator/api/ExpiringRegistry.java b/spectator-api/src/test/java/com/netflix/spectator/api/ExpiringRegistry.java index c916442e3..7aff950b0 100644 --- a/spectator-api/src/test/java/com/netflix/spectator/api/ExpiringRegistry.java +++ b/spectator-api/src/test/java/com/netflix/spectator/api/ExpiringRegistry.java @@ -17,6 +17,8 @@ import java.util.Collections; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Supplier; public class ExpiringRegistry extends AbstractRegistry { @@ -48,9 +50,36 @@ public ExpiringRegistry(Clock clock) { @Override public boolean hasExpired() { return clock().wallTime() > creationTime; } + + @Override public BatchUpdater batchUpdater(int batchSize) { + return new CounterUpdater(); + } }; } + private static class CounterUpdater implements Counter.BatchUpdater, Consumer> { + + private Supplier counterSupplier; + private double sum; + + @Override public void add(double amount) { + sum += amount; + } + + @Override public void flush() { + counterSupplier.get().add(sum); + sum = 0.0; + } + + @Override public void close() throws Exception { + flush(); + } + + @Override public void accept(Supplier counterSupplier) { + this.counterSupplier = counterSupplier; + } + } + @Override protected DistributionSummary newDistributionSummary(Id id) { return new DistributionSummary() { private final long creationTime = clock().wallTime(); @@ -79,9 +108,33 @@ public ExpiringRegistry(Clock clock) { @Override public boolean hasExpired() { return clock().wallTime() > creationTime; } + + @Override public BatchUpdater batchUpdater(int batchSize) { + return new DistributionSummaryUpdater(); + } }; } + private static class DistributionSummaryUpdater + implements DistributionSummary.BatchUpdater, Consumer> { + + private Supplier distSummarySupplier; + + @Override public void record(long amount) { + distSummarySupplier.get().record(amount); + } + + @Override public void flush() { + } + + @Override public void close() throws Exception { + } + + @Override public void accept(Supplier distSummarySupplier) { + this.distSummarySupplier = distSummarySupplier; + } + } + @Override protected Timer newTimer(Id id) { return new AbstractTimer(clock()) { private final long creationTime = clock().wallTime(); @@ -110,9 +163,32 @@ public ExpiringRegistry(Clock clock) { @Override public boolean hasExpired() { return clock().wallTime() > creationTime; } + + @Override public BatchUpdater batchUpdater(int batchSize) { + return new TimerUpdater(); + } }; } + private static class TimerUpdater implements Timer.BatchUpdater, Consumer> { + + private Supplier timerSupplier; + + @Override public void record(long amount, TimeUnit unit) { + timerSupplier.get().record(amount, unit); + } + + @Override public void flush() { + } + + @Override public void close() throws Exception { + } + + @Override public void accept(Supplier timerSupplier) { + this.timerSupplier = timerSupplier; + } + } + @Override protected Gauge newGauge(Id id) { return new Gauge() { private final long creationTime = clock().wallTime(); diff --git a/spectator-api/src/test/java/com/netflix/spectator/api/SwapMeterTest.java b/spectator-api/src/test/java/com/netflix/spectator/api/SwapMeterTest.java index 188f70c75..de26dd5c5 100644 --- a/spectator-api/src/test/java/com/netflix/spectator/api/SwapMeterTest.java +++ b/spectator-api/src/test/java/com/netflix/spectator/api/SwapMeterTest.java @@ -60,6 +60,69 @@ public void wrapExpiredCounter() { Assertions.assertEquals(1, s1.count()); } + @Test + public void wrappedCounterBatchUpdater() throws Exception { + Counter c = new DefaultCounter(clock, counterId); + SwapCounter sc = new SwapCounter(registry, VERSION, counterId, c); + try (Counter.BatchUpdater b = sc.batchUpdater(2)) { + b.increment(); + } + Assertions.assertEquals(1, c.count()); + Assertions.assertEquals(1, sc.count()); + } + + @Test + public void wrappedCounterBatchUpdaterCustom() throws Exception { + ExpiringRegistry registry = new ExpiringRegistry(clock); + Counter c = registry.counter(counterId); + try (Counter.BatchUpdater b = c.batchUpdater(2)) { + b.increment(); + } + Assertions.assertEquals(1, c.count()); + } + + @Test + public void wrappedTimerBatchUpdater() throws Exception { + Timer t = new DefaultTimer(clock, timerId); + SwapTimer st = new SwapTimer(registry, VERSION, timerId, t); + try (Timer.BatchUpdater b = st.batchUpdater(2)) { + b.record(1, TimeUnit.NANOSECONDS); + } + Assertions.assertEquals(1, t.count()); + Assertions.assertEquals(1, st.count()); + } + + @Test + public void wrappedTimerBatchUpdaterCustom() throws Exception { + ExpiringRegistry registry = new ExpiringRegistry(clock); + Timer t = registry.timer(timerId); + try (Timer.BatchUpdater b = t.batchUpdater(2)) { + b.record(1, TimeUnit.NANOSECONDS); + } + Assertions.assertEquals(1, t.count()); + } + + @Test + public void wrappedDistributionSummaryBatchUpdater() throws Exception { + DistributionSummary d = new DefaultDistributionSummary(clock, distSummaryId); + SwapDistributionSummary sd = new SwapDistributionSummary(registry, VERSION, distSummaryId, d); + try (DistributionSummary.BatchUpdater b = sd.batchUpdater(2)) { + b.record(1); + } + Assertions.assertEquals(1, d.count()); + Assertions.assertEquals(1, sd.count()); + } + + @Test + public void wrappedDistributionSummaryBatchUpdaterCustom() throws Exception { + ExpiringRegistry registry = new ExpiringRegistry(clock); + DistributionSummary d = registry.distributionSummary(distSummaryId); + try (DistributionSummary.BatchUpdater b = d.batchUpdater(2)) { + b.record(1); + } + Assertions.assertEquals(1, d.count()); + } + @Test public void wrapExpiredTimer() { ExpiringRegistry registry = new ExpiringRegistry(clock); diff --git a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasDistSummaryBatchUpdater.java b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasDistSummaryBatchUpdater.java index cee7e5353..52c20be57 100644 --- a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasDistSummaryBatchUpdater.java +++ b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasDistSummaryBatchUpdater.java @@ -17,9 +17,13 @@ import com.netflix.spectator.api.DistributionSummary; -final class AtlasDistSummaryBatchUpdater implements DistributionSummary.BatchUpdater { +import java.util.function.Consumer; +import java.util.function.Supplier; - private AtlasDistributionSummary distSummary; +final class AtlasDistSummaryBatchUpdater + implements DistributionSummary.BatchUpdater, Consumer> { + + private Supplier distSummarySupplier; private final int batchSize; private int count; @@ -27,10 +31,20 @@ final class AtlasDistSummaryBatchUpdater implements DistributionSummary.BatchUpd private double totalOfSquares; private long max; - AtlasDistSummaryBatchUpdater(AtlasDistributionSummary distSummary, int batchSize) { - this.distSummary = distSummary; + AtlasDistSummaryBatchUpdater(int batchSize) { this.batchSize = batchSize; - distSummary.updateRefCount(1); + } + + public void accept(Supplier distSummarySupplier) { + this.distSummarySupplier = distSummarySupplier; + } + + private AtlasDistributionSummary getDistributionSummary() { + if (distSummarySupplier != null) { + DistributionSummary d = distSummarySupplier.get(); + return (d instanceof AtlasDistributionSummary) ? (AtlasDistributionSummary) d : null; + } + return null; } @Override @@ -50,6 +64,7 @@ public void record(long amount) { @Override public void flush() { + AtlasDistributionSummary distSummary = getDistributionSummary(); if (distSummary != null) { distSummary.update(count, total, totalOfSquares, max); count = 0; @@ -61,10 +76,6 @@ public void flush() { @Override public void close() throws Exception { - if (distSummary != null) { - flush(); - distSummary.updateRefCount(-1); - distSummary = null; - } + flush(); } } diff --git a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasDistributionSummary.java b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasDistributionSummary.java index 092839cee..b67ae54ad 100644 --- a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasDistributionSummary.java +++ b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasDistributionSummary.java @@ -144,7 +144,9 @@ private void updateMax(AtomicLong maxValue, long v) { } @Override public BatchUpdater batchUpdater(int batchSize) { - return new AtlasDistSummaryBatchUpdater(this, batchSize); + AtlasDistSummaryBatchUpdater updater = new AtlasDistSummaryBatchUpdater(batchSize); + updater.accept(() -> this); + return updater; } /** diff --git a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasMeter.java b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasMeter.java index a51f30f2b..4b40996a9 100644 --- a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasMeter.java +++ b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasMeter.java @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; /** Base class for core meter types used by AtlasRegistry. */ abstract class AtlasMeter implements Meter { @@ -39,19 +38,12 @@ abstract class AtlasMeter implements Meter { /** Last time this meter was updated. */ private volatile long lastUpdated; - /** - * Reference count for batch updaters. The meter cannot be expired while a batch updater - * instance is around. - */ - private AtomicInteger batchUpdaterRefCount; - /** Create a new instance. */ AtlasMeter(Id id, Clock clock, long ttl) { this.id = id; this.clock = clock; this.ttl = ttl; lastUpdated = clock.wallTime(); - batchUpdaterRefCount = new AtomicInteger(0); } /** @@ -62,19 +54,12 @@ void updateLastModTime(long now) { lastUpdated = now; } - /** - * Updates the reference count for the number of batch updaters. - */ - void updateRefCount(int amount) { - batchUpdaterRefCount.addAndGet(amount); - } - @Override public Id id() { return id; } @Override public boolean hasExpired() { - return clock.wallTime() - lastUpdated > ttl && batchUpdaterRefCount.get() <= 0; + return clock.wallTime() - lastUpdated > ttl; } @Override public Iterable measure() { diff --git a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasTimer.java b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasTimer.java index 21b37c9e0..a8eb0c952 100644 --- a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasTimer.java +++ b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasTimer.java @@ -196,7 +196,9 @@ private void updateMax(AtomicLong maxValue, long v) { } @Override public BatchUpdater batchUpdater(int batchSize) { - return new AtlasTimerBatchUpdater(this, batchSize); + AtlasTimerBatchUpdater updater = new AtlasTimerBatchUpdater(batchSize); + updater.accept(() -> this); + return updater; } /** diff --git a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasTimerBatchUpdater.java b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasTimerBatchUpdater.java index bae0446ef..53bc08802 100644 --- a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasTimerBatchUpdater.java +++ b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasTimerBatchUpdater.java @@ -18,10 +18,12 @@ import com.netflix.spectator.api.Timer; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Supplier; -final class AtlasTimerBatchUpdater implements Timer.BatchUpdater { +final class AtlasTimerBatchUpdater implements Timer.BatchUpdater, Consumer> { - private AtlasTimer timer; + private Supplier timerSupplier; private final int batchSize; private int count; @@ -29,10 +31,20 @@ final class AtlasTimerBatchUpdater implements Timer.BatchUpdater { private double totalOfSquares; private long max; - AtlasTimerBatchUpdater(AtlasTimer timer, int batchSize) { - this.timer = timer; + AtlasTimerBatchUpdater(int batchSize) { this.batchSize = batchSize; - timer.updateRefCount(1); + } + + public void accept(Supplier timerSupplier) { + this.timerSupplier = timerSupplier; + } + + private AtlasTimer getTimer() { + if (timerSupplier != null) { + Timer t = timerSupplier.get(); + return (t instanceof AtlasTimer) ? (AtlasTimer) t : null; + } + return null; } @Override @@ -53,6 +65,7 @@ public void record(long amount, TimeUnit unit) { @Override public void flush() { + AtlasTimer timer = getTimer(); if (timer != null) { timer.update(count, total, totalOfSquares, max); count = 0; @@ -64,10 +77,6 @@ public void flush() { @Override public void close() throws Exception { - if (timer != null) { - flush(); - timer.updateRefCount(-1); - timer = null; - } + flush(); } } diff --git a/spectator-reg-atlas/src/test/java/com/netflix/spectator/atlas/AtlasDistributionSummaryTest.java b/spectator-reg-atlas/src/test/java/com/netflix/spectator/atlas/AtlasDistributionSummaryTest.java index c4c2fb6db..e0d09e2a9 100644 --- a/spectator-reg-atlas/src/test/java/com/netflix/spectator/atlas/AtlasDistributionSummaryTest.java +++ b/spectator-reg-atlas/src/test/java/com/netflix/spectator/atlas/AtlasDistributionSummaryTest.java @@ -30,9 +30,8 @@ public class AtlasDistributionSummaryTest { private final CountingManualClock clock = new CountingManualClock(); - private final Registry registry = new DefaultRegistry(); private final long step = 10000L; - private final AtlasDistributionSummary dist = new AtlasDistributionSummary(registry.createId("test"), clock, step, step); + private final AtlasDistributionSummary dist = new AtlasDistributionSummary(Id.create("test"), clock, step, step); private void checkValue(long count, long amount, long square, long max) { int num = 0; diff --git a/spectator-reg-atlas/src/test/java/com/netflix/spectator/atlas/AtlasTimerTest.java b/spectator-reg-atlas/src/test/java/com/netflix/spectator/atlas/AtlasTimerTest.java index 977e51856..a13274ec2 100644 --- a/spectator-reg-atlas/src/test/java/com/netflix/spectator/atlas/AtlasTimerTest.java +++ b/spectator-reg-atlas/src/test/java/com/netflix/spectator/atlas/AtlasTimerTest.java @@ -15,10 +15,8 @@ */ package com.netflix.spectator.atlas; -import com.netflix.spectator.api.DefaultRegistry; import com.netflix.spectator.api.Id; import com.netflix.spectator.api.Measurement; -import com.netflix.spectator.api.Registry; import com.netflix.spectator.api.Timer; import com.netflix.spectator.api.Utils; import org.junit.jupiter.api.Assertions; @@ -31,9 +29,8 @@ public class AtlasTimerTest { private final CountingManualClock clock = new CountingManualClock(); - private final Registry registry = new DefaultRegistry(); private final long step = 10000L; - private final AtlasTimer dist = new AtlasTimer(registry.createId("test"), clock, step, step); + private final AtlasTimer dist = new AtlasTimer(Id.create("test"), clock, step, step); private void checkValue(long count, double amount, double square, long max) { int num = 0; From 725de96ea4dcd4db2a66e7591c79cd066ed7e425 Mon Sep 17 00:00:00 2001 From: Brian Harrington Date: Fri, 11 Nov 2022 09:31:43 -0600 Subject: [PATCH 3/3] fix pmd warnings --- .../netflix/spectator/atlas/AtlasDistSummaryBatchUpdater.java | 1 + .../java/com/netflix/spectator/atlas/AtlasTimerBatchUpdater.java | 1 + 2 files changed, 2 insertions(+) diff --git a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasDistSummaryBatchUpdater.java b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasDistSummaryBatchUpdater.java index 52c20be57..8eecc1e34 100644 --- a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasDistSummaryBatchUpdater.java +++ b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasDistSummaryBatchUpdater.java @@ -35,6 +35,7 @@ final class AtlasDistSummaryBatchUpdater this.batchSize = batchSize; } + @Override public void accept(Supplier distSummarySupplier) { this.distSummarySupplier = distSummarySupplier; } diff --git a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasTimerBatchUpdater.java b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasTimerBatchUpdater.java index 53bc08802..859f920be 100644 --- a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasTimerBatchUpdater.java +++ b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasTimerBatchUpdater.java @@ -35,6 +35,7 @@ final class AtlasTimerBatchUpdater implements Timer.BatchUpdater, Consumer timerSupplier) { this.timerSupplier = timerSupplier; }