Skip to content

Commit

Permalink
metrics: make gauge_float64 and counter_float64 lock free (#27025)
Browse files Browse the repository at this point in the history
Makes the float-gauges lock-free

name                      old time/op  new time/op  delta
CounterFloat64Parallel-8  1.45µs ±10%  0.85µs ± 6%  -41.65%  (p=0.008 n=5+5)

---------

Co-authored-by: Exca-DK <dev@DESKTOP-RI45P4J.localdomain>
Co-authored-by: Martin Holst Swende <martin@swende.se>
  • Loading branch information
3 people committed Apr 4, 2023
1 parent ab1a404 commit b4dcd1a
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 33 deletions.
38 changes: 20 additions & 18 deletions metrics/counter_float64.go
@@ -1,7 +1,8 @@
package metrics

import (
"sync"
"math"
"sync/atomic"
)

// CounterFloat64 holds a float64 value that can be incremented and decremented.
Expand Down Expand Up @@ -38,13 +39,13 @@ func NewCounterFloat64() CounterFloat64 {
if !Enabled {
return NilCounterFloat64{}
}
return &StandardCounterFloat64{count: 0.0}
return &StandardCounterFloat64{}
}

// NewCounterFloat64Forced constructs a new StandardCounterFloat64 and returns it no matter if
// the global switch is enabled or not.
func NewCounterFloat64Forced() CounterFloat64 {
return &StandardCounterFloat64{count: 0.0}
return &StandardCounterFloat64{}
}

// NewRegisteredCounterFloat64 constructs and registers a new StandardCounterFloat64.
Expand Down Expand Up @@ -113,41 +114,42 @@ func (NilCounterFloat64) Inc(i float64) {}
func (NilCounterFloat64) Snapshot() CounterFloat64 { return NilCounterFloat64{} }

// StandardCounterFloat64 is the standard implementation of a CounterFloat64 and uses the
// sync.Mutex package to manage a single float64 value.
// atomic to manage a single float64 value.
type StandardCounterFloat64 struct {
mutex sync.Mutex
count float64
floatBits atomic.Uint64
}

// Clear sets the counter to zero.
func (c *StandardCounterFloat64) Clear() {
c.mutex.Lock()
defer c.mutex.Unlock()
c.count = 0.0
c.floatBits.Store(0)
}

// Count returns the current value.
func (c *StandardCounterFloat64) Count() float64 {
c.mutex.Lock()
defer c.mutex.Unlock()
return c.count
return math.Float64frombits(c.floatBits.Load())
}

// Dec decrements the counter by the given amount.
func (c *StandardCounterFloat64) Dec(v float64) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.count -= v
atomicAddFloat(&c.floatBits, -v)
}

// Inc increments the counter by the given amount.
func (c *StandardCounterFloat64) Inc(v float64) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.count += v
atomicAddFloat(&c.floatBits, v)
}

// Snapshot returns a read-only copy of the counter.
func (c *StandardCounterFloat64) Snapshot() CounterFloat64 {
return CounterFloat64Snapshot(c.Count())
}

func atomicAddFloat(fbits *atomic.Uint64, v float64) {
for {
loadedBits := fbits.Load()
newBits := math.Float64bits(math.Float64frombits(loadedBits) + v)
if fbits.CompareAndSwap(loadedBits, newBits) {
break
}
}
}
24 changes: 23 additions & 1 deletion metrics/counter_float_64_test.go
@@ -1,6 +1,9 @@
package metrics

import "testing"
import (
"sync"
"testing"
)

func BenchmarkCounterFloat64(b *testing.B) {
c := NewCounterFloat64()
Expand All @@ -10,6 +13,25 @@ func BenchmarkCounterFloat64(b *testing.B) {
}
}

func BenchmarkCounterFloat64Parallel(b *testing.B) {
c := NewCounterFloat64()
b.ResetTimer()
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
for i := 0; i < b.N; i++ {
c.Inc(1.0)
}
wg.Done()
}()
}
wg.Wait()
if have, want := c.Count(), 10.0*float64(b.N); have != want {
b.Fatalf("have %f want %f", have, want)
}
}

func TestCounterFloat64Clear(t *testing.T) {
c := NewCounterFloat64()
c.Inc(1.0)
Expand Down
22 changes: 9 additions & 13 deletions metrics/gauge_float64.go
@@ -1,6 +1,9 @@
package metrics

import "sync"
import (
"math"
"sync/atomic"
)

// GaugeFloat64s hold a float64 value that can be set arbitrarily.
type GaugeFloat64 interface {
Expand All @@ -23,9 +26,7 @@ func NewGaugeFloat64() GaugeFloat64 {
if !Enabled {
return NilGaugeFloat64{}
}
return &StandardGaugeFloat64{
value: 0.0,
}
return &StandardGaugeFloat64{}
}

// NewRegisteredGaugeFloat64 constructs and registers a new StandardGaugeFloat64.
Expand Down Expand Up @@ -83,10 +84,9 @@ func (NilGaugeFloat64) Update(v float64) {}
func (NilGaugeFloat64) Value() float64 { return 0.0 }

// StandardGaugeFloat64 is the standard implementation of a GaugeFloat64 and uses
// sync.Mutex to manage a single float64 value.
// atomic to manage a single float64 value.
type StandardGaugeFloat64 struct {
mutex sync.Mutex
value float64
floatBits atomic.Uint64
}

// Snapshot returns a read-only copy of the gauge.
Expand All @@ -96,16 +96,12 @@ func (g *StandardGaugeFloat64) Snapshot() GaugeFloat64 {

// Update updates the gauge's value.
func (g *StandardGaugeFloat64) Update(v float64) {
g.mutex.Lock()
defer g.mutex.Unlock()
g.value = v
g.floatBits.Store(math.Float64bits(v))
}

// Value returns the gauge's current value.
func (g *StandardGaugeFloat64) Value() float64 {
g.mutex.Lock()
defer g.mutex.Unlock()
return g.value
return math.Float64frombits(g.floatBits.Load())
}

// FunctionalGaugeFloat64 returns value from given function
Expand Down
23 changes: 22 additions & 1 deletion metrics/gauge_float64_test.go
@@ -1,6 +1,9 @@
package metrics

import "testing"
import (
"sync"
"testing"
)

func BenchmarkGaugeFloat64(b *testing.B) {
g := NewGaugeFloat64()
Expand All @@ -10,6 +13,24 @@ func BenchmarkGaugeFloat64(b *testing.B) {
}
}

func BenchmarkGaugeFloat64Parallel(b *testing.B) {
c := NewGaugeFloat64()
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
for i := 0; i < b.N; i++ {
c.Update(float64(i))
}
wg.Done()
}()
}
wg.Wait()
if have, want := c.Value(), float64(b.N-1); have != want {
b.Fatalf("have %f want %f", have, want)
}
}

func TestGaugeFloat64(t *testing.T) {
g := NewGaugeFloat64()
g.Update(47.0)
Expand Down

0 comments on commit b4dcd1a

Please sign in to comment.