Skip to content

Commit

Permalink
chore(discoveryManager): expose Discoverer refresh function (promethe…
Browse files Browse the repository at this point in the history
…us#10531)



Signed-off-by: secustor <sebastian@poxhofer.at>
  • Loading branch information
secustor committed Jun 13, 2022
1 parent 5d1756c commit 3f9a9d1
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 16 deletions.
4 changes: 2 additions & 2 deletions discovery/http/http.go
Expand Up @@ -136,12 +136,12 @@ func NewDiscovery(conf *SDConfig, logger log.Logger, clientOpts []config.HTTPCli
logger,
"http",
time.Duration(conf.RefreshInterval),
d.refresh,
d.Refresh,
)
return d, nil
}

func (d *Discovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) {
func (d *Discovery) Refresh(ctx context.Context) ([]*targetgroup.Group, error) {
req, err := http.NewRequest("GET", d.url, nil)
if err != nil {
return nil, err
Expand Down
8 changes: 4 additions & 4 deletions discovery/http/http_test.go
Expand Up @@ -45,7 +45,7 @@ func TestHTTPValidRefresh(t *testing.T) {
require.NoError(t, err)

ctx := context.Background()
tgs, err := d.refresh(ctx)
tgs, err := d.Refresh(ctx)
require.NoError(t, err)

expectedTargets := []*targetgroup.Group{
Expand Down Expand Up @@ -83,7 +83,7 @@ func TestHTTPInvalidCode(t *testing.T) {
require.NoError(t, err)

ctx := context.Background()
_, err = d.refresh(ctx)
_, err = d.Refresh(ctx)
require.EqualError(t, err, "server returned HTTP status 400 Bad Request")
require.Equal(t, 1.0, getFailureCount())
}
Expand All @@ -105,7 +105,7 @@ func TestHTTPInvalidFormat(t *testing.T) {
require.NoError(t, err)

ctx := context.Background()
_, err = d.refresh(ctx)
_, err = d.Refresh(ctx)
require.EqualError(t, err, `unsupported content type "text/plain; charset=utf-8"`)
require.Equal(t, 1.0, getFailureCount())
}
Expand Down Expand Up @@ -423,7 +423,7 @@ func TestSourceDisappeared(t *testing.T) {
ctx := context.Background()
for i, res := range test.responses {
stubResponse = res
tgs, err := d.refresh(ctx)
tgs, err := d.Refresh(ctx)
require.NoError(t, err)
require.Equal(t, test.expectedTargets[i], tgs)
}
Expand Down
34 changes: 24 additions & 10 deletions discovery/manager.go
Expand Up @@ -75,8 +75,8 @@ type poolKey struct {
provider string
}

// provider holds a Discoverer instance, its configuration, cancel func and its subscribers.
type provider struct {
// Provider holds a Discoverer instance, its configuration, cancel func and its subscribers.
type Provider struct {
name string
d Discoverer
config interface{}
Expand All @@ -92,11 +92,20 @@ type provider struct {
newSubs map[string]struct{}
}

// Discoverer return the Discoverer of the provider
func (p *Provider) Discoverer() Discoverer {
return p.d
}

// IsStarted return true if Discoverer is started.
func (p *provider) IsStarted() bool {
func (p *Provider) IsStarted() bool {
return p.cancel != nil
}

func (p *Provider) Config() interface{} {
return p.config
}

// NewManager is the Discovery Manager constructor.
func NewManager(ctx context.Context, logger log.Logger, options ...func(*Manager)) *Manager {
if logger == nil {
Expand Down Expand Up @@ -148,7 +157,7 @@ type Manager struct {
targetsMtx sync.Mutex

// providers keeps track of SD providers.
providers []*provider
providers []*Provider
// The sync channel sends the updates as a map where the key is the job value from the scrape config.
syncCh chan map[string][]*targetgroup.Group

Expand All @@ -163,6 +172,11 @@ type Manager struct {
lastProvider uint
}

// Providers returns the currently configured SD providers.
func (m *Manager) Providers() []*Provider {
return m.providers
}

// Run starts the background processing.
func (m *Manager) Run() error {
go m.sender()
Expand Down Expand Up @@ -194,7 +208,7 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
wg sync.WaitGroup
// keep shows if we keep any providers after reload.
keep bool
newProviders []*provider
newProviders []*Provider
)
for _, prov := range m.providers {
// Cancel obsolete providers.
Expand Down Expand Up @@ -260,7 +274,7 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error {

// StartCustomProvider is used for sdtool. Only use this if you know what you're doing.
func (m *Manager) StartCustomProvider(ctx context.Context, name string, worker Discoverer) {
p := &provider{
p := &Provider{
name: name,
d: worker,
subs: map[string]struct{}{
Expand All @@ -271,7 +285,7 @@ func (m *Manager) StartCustomProvider(ctx context.Context, name string, worker D
m.startProvider(ctx, p)
}

func (m *Manager) startProvider(ctx context.Context, p *provider) {
func (m *Manager) startProvider(ctx context.Context, p *Provider) {
level.Debug(m.logger).Log("msg", "Starting provider", "provider", p.name, "subs", fmt.Sprintf("%v", p.subs))
ctx, cancel := context.WithCancel(ctx)
updates := make(chan []*targetgroup.Group)
Expand All @@ -283,7 +297,7 @@ func (m *Manager) startProvider(ctx context.Context, p *provider) {
}

// cleaner cleans resources associated with provider.
func (m *Manager) cleaner(p *provider) {
func (m *Manager) cleaner(p *Provider) {
m.targetsMtx.Lock()
p.mu.RLock()
for s := range p.subs {
Expand All @@ -296,7 +310,7 @@ func (m *Manager) cleaner(p *provider) {
}
}

func (m *Manager) updater(ctx context.Context, p *provider, updates chan []*targetgroup.Group) {
func (m *Manager) updater(ctx context.Context, p *Provider, updates chan []*targetgroup.Group) {
// Ensure targets from this provider are cleaned up.
defer m.cleaner(p)
for {
Expand Down Expand Up @@ -422,7 +436,7 @@ func (m *Manager) registerProviders(cfgs Configs, setName string) int {
failed++
return
}
m.providers = append(m.providers, &provider{
m.providers = append(m.providers, &Provider{
name: fmt.Sprintf("%s/%d", typ, m.lastProvider),
d: d,
config: cfg,
Expand Down

0 comments on commit 3f9a9d1

Please sign in to comment.