Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add registry for external sinks #65

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 13 additions & 2 deletions circonus/circonus.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
// Circonus Metrics Sink

// Circonus Metrics Sink provides an interface to forward metrics to Circonus.
//
// It also registers a factory that can be invoked by using
// `metrics.NewMetricSinkFromURL` addressed by a URL with scheme `circonus://`.
// The rest of the URL is ignored.
package circonus

import (
"net/url"
"strings"

"github.com/armon/go-metrics"
Expand Down Expand Up @@ -46,6 +50,13 @@ func NewCirconusSink(cc *Config) (*CirconusSink, error) {
}, nil
}

func init() {
metrics.RegisterSinkURLFactory("circonus",
func(u *url.URL) (metrics.MetricSink, error) {
return NewCirconusSink(nil)
})
}

// Start submitting metrics to Circonus (flush every SubmitInterval)
func (s *CirconusSink) Start() {
s.metrics.Start()
Expand Down
14 changes: 14 additions & 0 deletions datadog/dogstatsd.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
// This package implements a go-metrics sink that that can be used
// with a dogstatsd server
//
// It also registers a factory that can be invoked by using
// `metrics.NewMetricSinkFromURL` addressed by a URL like:
// dogstatsds://<dogstatsd-host>:<dogstatsd-port>[?hostname=<hostname-to-report>]`.
package datadog

import (
"fmt"
"net/url"
"strings"

"github.com/DataDog/datadog-go/statsd"
Expand Down Expand Up @@ -30,6 +37,13 @@ func NewDogStatsdSink(addr string, hostName string) (*DogStatsdSink, error) {
return sink, nil
}

func init() {
metrics.RegisterSinkURLFactory("dogstatsd",
func(u *url.URL) (metrics.MetricSink, error) {
return NewDogStatsdSink(u.Host, u.Query().Get("hostname"))
})
}

// SetTags sets common tags on the Dogstatsd Client that will be sent
// along with all dogstatsd packets.
// Ref: http://docs.datadoghq.com/guides/dogstatsd/#tags
Expand Down
16 changes: 16 additions & 0 deletions datadog/dogstatsd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,19 @@ func assertServerMatchesExpected(t *testing.T, server *net.UDPConn, buf []byte,
t.Fatalf("Line %s does not match expected: %s", string(msg), expected)
}
}

func TestFactory(t *testing.T) {
sink, err := metrics.NewMetricSinkFromURL("dogstatsd://example.com:8888?hostname=foo")
if err != nil {
t.Fatalf("Factory failed: %s", err)
}
dogStatsDSink, ok := sink.(*DogStatsdSink)
if !ok {
t.Fatalf("Factory returned wrong sink type: %s", reflect.TypeOf(sink))
}
if dogStatsDSink.hostName != "foo" {
t.Fatalf("Factory set hostName = '%s', want 'foo'", dogStatsDSink.hostName)
}
// No real way to peak inside the statsd client to assert it got the right hostname/port...
// I sanity checked it at least
}
9 changes: 9 additions & 0 deletions prometheus/prometheus.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
// +build go1.3

// This package implements a go-metrics sink that Prometheus can scrape.
//
// It also registers a factory that can be invoked by using
// `metrics.NewMetricSinkFromURL` addressed by a URL scheme of `prometheus://`.
// The rest of the URL is ignored.
package prometheus

import (
Expand Down Expand Up @@ -38,6 +43,10 @@ type PrometheusSink struct {
expiration time.Duration
}

func init() {
metrics.RegisterSinkURLFactory("prometheus", NewPrometheusSink)
}

// NewPrometheusSink creates a new PrometheusSink using the default options.
func NewPrometheusSink() (*PrometheusSink, error) {
return NewPrometheusSinkFrom(DefaultPrometheusOpts)
Expand Down
22 changes: 18 additions & 4 deletions sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,27 @@ func (fh FanoutSink) AddSampleWithLabels(key []string, val float32, labels []Lab
}
}

// sinkURLFactoryFunc is an generic interface around the *SinkFromURL() function provided
// by each sink type
type sinkURLFactoryFunc func(*url.URL) (MetricSink, error)
// SinkURLFactoryFunc is a generic interface around the *SinkFromURL() function
// provided by each sink type
type SinkURLFactoryFunc func(*url.URL) (MetricSink, error)

// sinkRegistry supports the generic NewMetricSink function by mapping URL
// schemes to metric sink factory functions
var sinkRegistry = map[string]sinkURLFactoryFunc{
var sinkRegistry = map[string]SinkURLFactoryFunc{
"statsd": NewStatsdSinkFromURL,
"statsite": NewStatsiteSinkFromURL,
"inmem": NewInmemSinkFromURL,
}

// RegisterSinkURLFactory adds the given factory to the global registry used by
// NewMetricSinkFromURL. It is **not thread safe** and assumes the caller will
// serialize calls to this and NewMetricSinkFromURL. Note that the Go spec
// requires that init functions for all packages are executed in serial so it is
// always safe to call this in your package's init function.
func RegisterSinkURLFactory(scheme string, factory SinkURLFactoryFunc) {
sinkRegistry[scheme] = factory
}

// NewMetricSinkFromURL allows a generic URL input to configure any of the
// supported sinks. The scheme of the URL identifies the type of the sink, the
// and query parameters are used to set options.
Expand All @@ -99,6 +108,11 @@ var sinkRegistry = map[string]sinkURLFactoryFunc{
// "inmem://" - Initializes an InmemSink. The host and port are ignored. The
// "interval" and "duration" query parameters must be specified with valid
// durations, see NewInmemSink for details.
//
// Custom sink implementations in other packages can hook into this using
// RegisterSinkURLFactory. The additional sink sub-packages in this repository
// all register factories on custom schemes and can be used by default. See
// sub-package docs for details.
func NewMetricSinkFromURL(urlStr string) (MetricSink, error) {
u, err := url.Parse(urlStr)
if err != nil {
Expand Down
50 changes: 50 additions & 0 deletions sink_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package metrics

import (
"net/url"
"reflect"
"strings"
"testing"
Expand Down Expand Up @@ -270,3 +271,52 @@ func TestNewMetricSinkFromURL(t *testing.T) {
})
}
}

func TestRegisterSinkURLFactory(t *testing.T) {
for _, tc := range []struct {
desc string
scheme string
factory SinkURLFactoryFunc
input string
expect reflect.Type
expectErr string
}{
{
desc: "custom scheme yields a Custom Sink",
scheme: "test",
factory: func(*url.URL) (MetricSink, error) {
return &MockSink{}, nil
},
input: "test://someserver:123",
expect: reflect.TypeOf(&MockSink{}),
},
{
desc: "statsd scheme (still) yields a StatsdSink",
scheme: "test",
factory: func(*url.URL) (MetricSink, error) {
return &MockSink{}, nil
},
input: "statsd://someserver:123",
expect: reflect.TypeOf(&StatsdSink{}),
},
} {
t.Run(tc.desc, func(t *testing.T) {
RegisterSinkURLFactory(tc.scheme, tc.factory)

ms, err := NewMetricSinkFromURL(tc.input)
if tc.expectErr != "" {
if !strings.Contains(err.Error(), tc.expectErr) {
t.Fatalf("expected err: %q to contain: %q", err, tc.expectErr)
}
} else {
if err != nil {
t.Fatalf("unexpected err: %s", err)
}
got := reflect.TypeOf(ms)
if got != tc.expect {
t.Fatalf("expected return type to be %v, got: %v", tc.expect, got)
}
}
})
}
}