Skip to content

Commit

Permalink
Allow multi-instrument callbacks to be unregistered (#3522)
Browse files Browse the repository at this point in the history
* Update Meter RegisterCallback method

Return a Registration from the method that can be used by the caller to
unregister their callback.

Update documentation of the method to better explain expectations of
use and implementation.

* Update noop impl

* Update global impl

* Test global Unregister concurrent safe

* Use a map to track reg in global impl

* Update sdk impl

* Use a list for global impl

* Fix prom example

* Lint metric/meter.go

* Fix metric example

* Placeholder for changelog

* Update PR number in changelog

* Update sdk/metric/pipeline.go

Co-authored-by: Aaron Clawson <3766680+MadVikingGod@users.noreply.github.com>

* Add test unregistered callback is not called

Co-authored-by: Aaron Clawson <3766680+MadVikingGod@users.noreply.github.com>
  • Loading branch information
MrAlias and MadVikingGod committed Dec 16, 2022
1 parent ca4cdfe commit 4014204
Show file tree
Hide file tree
Showing 11 changed files with 350 additions and 66 deletions.
10 changes: 6 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## [Unreleased]

### Removed

- The deprecated `go.opentelemetry.io/otel/sdk/metric/view` package is removed. (#3520)

### Added

- Return a `Registration` from the `RegisterCallback` method of a `Meter` in the `go.opentelemetry.io/otel/metric` package.
This `Registration` can be used to unregister callbacks. (#3522)
- Add `Producer` interface and `Reader.RegisterProducer(Producer)` to `go.opentelemetry.io/otel/sdk/metric` to enable external metric Producers. (#3524)

### Removed

- The deprecated `go.opentelemetry.io/otel/sdk/metric/view` package is removed. (#3520)

### Changed

- Global error handler uses an atomic value instead of a mutex. (#3543)
Expand Down
2 changes: 1 addition & 1 deletion example/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func main() {
if err != nil {
log.Fatal(err)
}
err = meter.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) {
_, err = meter.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) {
n := -10. + rand.Float64()*(90.) // [-10, 100)
gauge.Observe(ctx, n, attrs...)
})
Expand Down
4 changes: 2 additions & 2 deletions metric/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func ExampleMeter_asynchronous_single() {
panic(err)
}

err = meter.RegisterCallback([]instrument.Asynchronous{memoryUsage},
_, err = meter.RegisterCallback([]instrument.Asynchronous{memoryUsage},
func(ctx context.Context) {
// instrument.WithCallbackFunc(func(ctx context.Context) {
//Do Work to get the real memoryUsage
Expand All @@ -86,7 +86,7 @@ func ExampleMeter_asynchronous_multiple() {
gcCount, _ := meter.AsyncInt64().Counter("gcCount")
gcPause, _ := meter.SyncFloat64().Histogram("gcPause")

err := meter.RegisterCallback([]instrument.Asynchronous{
_, err := meter.RegisterCallback([]instrument.Asynchronous{
heapAlloc,
gcCount,
},
Expand Down
63 changes: 49 additions & 14 deletions metric/internal/global/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package global // import "go.opentelemetry.io/otel/metric/internal/global"

import (
"container/list"
"context"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -109,7 +110,8 @@ type meter struct {

mtx sync.Mutex
instruments []delegatedInstrument
callbacks []delegatedCallback

registry list.List

delegate atomic.Value // metric.Meter
}
Expand All @@ -135,12 +137,14 @@ func (m *meter) setDelegate(provider metric.MeterProvider) {
inst.setDelegate(meter)
}

for _, callback := range m.callbacks {
callback.setDelegate(meter)
for e := m.registry.Front(); e != nil; e = e.Next() {
r := e.Value.(*registration)
r.setDelegate(meter)
m.registry.Remove(e)
}

m.instruments = nil
m.callbacks = nil
m.registry.Init()
}

// AsyncInt64 is the namespace for the Asynchronous Integer instruments.
Expand All @@ -167,20 +171,24 @@ func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider {
//
// It is only valid to call Observe within the scope of the passed function,
// and only on the instruments that were registered with this call.
func (m *meter) RegisterCallback(insts []instrument.Asynchronous, function func(context.Context)) error {
func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) (metric.Registration, error) {
if del, ok := m.delegate.Load().(metric.Meter); ok {
insts = unwrapInstruments(insts)
return del.RegisterCallback(insts, function)
return del.RegisterCallback(insts, f)
}

m.mtx.Lock()
defer m.mtx.Unlock()
m.callbacks = append(m.callbacks, delegatedCallback{
instruments: insts,
function: function,
})

return nil
reg := &registration{instruments: insts, function: f}
e := m.registry.PushBack(reg)
reg.unreg = func() error {
m.mtx.Lock()
_ = m.registry.Remove(e)
m.mtx.Unlock()
return nil
}
return reg, nil
}

type wrapped interface {
Expand Down Expand Up @@ -217,17 +225,44 @@ func (m *meter) SyncFloat64() syncfloat64.InstrumentProvider {
return (*sfInstProvider)(m)
}

type delegatedCallback struct {
type registration struct {
instruments []instrument.Asynchronous
function func(context.Context)

unreg func() error
unregMu sync.Mutex
}

func (c *delegatedCallback) setDelegate(m metric.Meter) {
func (c *registration) setDelegate(m metric.Meter) {
insts := unwrapInstruments(c.instruments)
err := m.RegisterCallback(insts, c.function)

c.unregMu.Lock()
defer c.unregMu.Unlock()

if c.unreg == nil {
// Unregister already called.
return
}

reg, err := m.RegisterCallback(insts, c.function)
if err != nil {
otel.Handle(err)
}

c.unreg = reg.Unregister
}

func (c *registration) Unregister() error {
c.unregMu.Lock()
defer c.unregMu.Unlock()
if c.unreg == nil {
// Unregister already called.
return nil
}

var err error
err, c.unreg = c.unreg(), nil
return err
}

type afInstProvider meter
Expand Down
84 changes: 81 additions & 3 deletions metric/internal/global/meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestMeterRace(t *testing.T) {
_, _ = mtr.SyncInt64().Counter(name)
_, _ = mtr.SyncInt64().UpDownCounter(name)
_, _ = mtr.SyncInt64().Histogram(name)
_ = mtr.RegisterCallback(nil, func(ctx context.Context) {})
_, _ = mtr.RegisterCallback(nil, func(ctx context.Context) {})
if !once {
wg.Done()
once = true
Expand All @@ -86,6 +86,35 @@ func TestMeterRace(t *testing.T) {
close(finish)
}

func TestUnregisterRace(t *testing.T) {
mtr := &meter{}
reg, err := mtr.RegisterCallback(nil, func(ctx context.Context) {})
require.NoError(t, err)

wg := &sync.WaitGroup{}
wg.Add(1)
finish := make(chan struct{})
go func() {
for i, once := 0, false; ; i++ {
_ = reg.Unregister()
if !once {
wg.Done()
once = true
}
select {
case <-finish:
return
default:
}
}
}()
_ = reg.Unregister()

wg.Wait()
mtr.setDelegate(metric.NewNoopMeterProvider())
close(finish)
}

func testSetupAllInstrumentTypes(t *testing.T, m metric.Meter) (syncfloat64.Counter, asyncfloat64.Counter) {
afcounter, err := m.AsyncFloat64().Counter("test_Async_Counter")
require.NoError(t, err)
Expand All @@ -101,9 +130,10 @@ func testSetupAllInstrumentTypes(t *testing.T, m metric.Meter) (syncfloat64.Coun
_, err = m.AsyncInt64().Gauge("test_Async_Gauge")
assert.NoError(t, err)

require.NoError(t, m.RegisterCallback([]instrument.Asynchronous{afcounter}, func(ctx context.Context) {
_, err = m.RegisterCallback([]instrument.Asynchronous{afcounter}, func(ctx context.Context) {
afcounter.Observe(ctx, 3)
}))
})
require.NoError(t, err)

sfcounter, err := m.SyncFloat64().Counter("test_Async_Counter")
require.NoError(t, err)
Expand Down Expand Up @@ -257,3 +287,51 @@ func TestMeterDefersDelegations(t *testing.T) {
assert.IsType(t, &afCounter{}, actr)
assert.Equal(t, 1, mp.count)
}

func TestRegistrationDelegation(t *testing.T) {
// globalMeterProvider := otel.GetMeterProvider
globalMeterProvider := &meterProvider{}

m := globalMeterProvider.Meter("go.opentelemetry.io/otel/metric/internal/global/meter_test")
require.IsType(t, &meter{}, m)
mImpl := m.(*meter)

actr, err := m.AsyncFloat64().Counter("test_Async_Counter")
require.NoError(t, err)

var called0 bool
reg0, err := m.RegisterCallback([]instrument.Asynchronous{actr}, func(context.Context) {
called0 = true
})
require.NoError(t, err)
require.Equal(t, 1, mImpl.registry.Len(), "callback not registered")
// This means reg0 should not be delegated.
assert.NoError(t, reg0.Unregister())
assert.Equal(t, 0, mImpl.registry.Len(), "callback not unregistered")

var called1 bool
reg1, err := m.RegisterCallback([]instrument.Asynchronous{actr}, func(context.Context) {
called1 = true
})
require.NoError(t, err)
require.Equal(t, 1, mImpl.registry.Len(), "second callback not registered")

mp := &testMeterProvider{}

// otel.SetMeterProvider(mp)
globalMeterProvider.setDelegate(mp)

testCollect(t, m) // This is a hacky way to emulate a read from an exporter
require.False(t, called0, "pre-delegation unregistered callback called")
require.True(t, called1, "callback not called")

called1 = false
assert.NoError(t, reg1.Unregister(), "unregister second callback")

testCollect(t, m) // This is a hacky way to emulate a read from an exporter
assert.False(t, called1, "unregistered callback called")

assert.NotPanics(t, func() {
assert.NoError(t, reg1.Unregister(), "duplicate unregister calls")
})
}
21 changes: 19 additions & 2 deletions metric/internal/global/meter_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,21 @@ func (m *testMeter) AsyncFloat64() asyncfloat64.InstrumentProvider {
//
// It is only valid to call Observe within the scope of the passed function,
// and only on the instruments that were registered with this call.
func (m *testMeter) RegisterCallback(insts []instrument.Asynchronous, function func(context.Context)) error {
m.callbacks = append(m.callbacks, function)
func (m *testMeter) RegisterCallback(i []instrument.Asynchronous, f func(context.Context)) (metric.Registration, error) {
m.callbacks = append(m.callbacks, f)
return testReg{
f: func(idx int) func() {
return func() { m.callbacks[idx] = nil }
}(len(m.callbacks) - 1),
}, nil
}

type testReg struct {
f func()
}

func (r testReg) Unregister() error {
r.f()
return nil
}

Expand All @@ -85,6 +98,10 @@ func (m *testMeter) SyncFloat64() syncfloat64.InstrumentProvider {
func (m *testMeter) collect() {
ctx := context.Background()
for _, f := range m.callbacks {
if f == nil {
// Unregister.
continue
}
f(ctx)
}
}
Expand Down
28 changes: 22 additions & 6 deletions metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,30 @@ type Meter interface {
// To Observe data with instruments it must be registered in a callback.
AsyncFloat64() asyncfloat64.InstrumentProvider

// RegisterCallback captures the function that will be called during Collect.
//
// It is only valid to call Observe within the scope of the passed function,
// and only on the instruments that were registered with this call.
RegisterCallback(insts []instrument.Asynchronous, function func(context.Context)) error

// SyncInt64 is the namespace for the Synchronous Integer instruments
SyncInt64() syncint64.InstrumentProvider
// SyncFloat64 is the namespace for the Synchronous Float instruments
SyncFloat64() syncfloat64.InstrumentProvider

// RegisterCallback registers f to be called during the collection of a
// measurement cycle.
//
// If Unregister of the returned Registration is called, f needs to be
// unregistered and not called during collection.
//
// The instruments f is registered with are the only instruments that f may
// observe values for.
//
// If no instruments are passed, f should not be registered nor called
// during collection.
RegisterCallback(instruments []instrument.Asynchronous, f func(context.Context)) (Registration, error)
}

// Registration is an token representing the unique registration of a callback
// for a set of instruments with a Meter.
type Registration interface {
// Unregister removes the callback registration from a Meter.
//
// This method needs to be idempotent and concurrent safe.
Unregister() error
}
8 changes: 6 additions & 2 deletions metric/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,14 @@ func (noopMeter) SyncFloat64() syncfloat64.InstrumentProvider {
}

// RegisterCallback creates a register callback that does not record any metrics.
func (noopMeter) RegisterCallback([]instrument.Asynchronous, func(context.Context)) error {
return nil
func (noopMeter) RegisterCallback([]instrument.Asynchronous, func(context.Context)) (Registration, error) {
return noopReg{}, nil
}

type noopReg struct{}

func (noopReg) Unregister() error { return nil }

type nonrecordingAsyncFloat64Instrument struct {
instrument.Asynchronous
}
Expand Down
15 changes: 11 additions & 4 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider {

// RegisterCallback registers the function f to be called when any of the
// insts Collect method is called.
func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) error {
func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) (metric.Registration, error) {
for _, inst := range insts {
// Only register if at least one instrument has a non-drop aggregation.
// Otherwise, calling f during collection will be wasted computation.
Expand All @@ -91,14 +91,21 @@ func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context
}
}
// All insts use drop aggregation.
return nil
return noopRegister{}, nil
}

func (m *meter) registerCallback(f func(context.Context)) error {
m.pipes.registerCallback(f)
type noopRegister struct{}

func (noopRegister) Unregister() error {
return nil
}

type callback func(context.Context)

func (m *meter) registerCallback(c callback) (metric.Registration, error) {
return m.pipes.registerCallback(c), nil
}

// SyncInt64 returns the synchronous integer instrument provider.
func (m *meter) SyncInt64() syncint64.InstrumentProvider {
return syncInt64Provider{m.instProviderInt64}
Expand Down

0 comments on commit 4014204

Please sign in to comment.