Skip to content

Commit

Permalink
Improve inconsistent Consul SD index handling. (#999)
Browse files Browse the repository at this point in the history
Consul has guidelines for how the index movement should be handled.

https://www.consul.io/api-docs/features/blocking#implementation-details

Generally the index should always increase but if it does not increase this
patch implements the suggested solutions for recovery.
  • Loading branch information
schmidtw committed Jul 20, 2020
1 parent 02c7c01 commit 5561005
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 2 deletions.
2 changes: 1 addition & 1 deletion sd/consul/client_test.go
Expand Up @@ -90,7 +90,7 @@ func (c *testClient) Service(service, tag string, _ bool, opts *stdconsul.QueryO
results = append(results, entry)
}

return results, &stdconsul.QueryMeta{}, nil
return results, &stdconsul.QueryMeta{LastIndex: opts.WaitIndex}, nil
}

func (c *testClient) Register(r *stdconsul.AgentServiceRegistration) error {
Expand Down
12 changes: 11 additions & 1 deletion sd/consul/instancer.go
Expand Up @@ -67,7 +67,8 @@ func (s *Instancer) loop(lastIndex uint64) {
d time.Duration = 10 * time.Millisecond
)
for {
instances, lastIndex, err = s.getInstances(lastIndex, s.quitc)
index := lastIndex
instances, index, err = s.getInstances(lastIndex, s.quitc)
switch {
case err == errStopped:
return // stopped via quitc
Expand All @@ -76,6 +77,15 @@ func (s *Instancer) loop(lastIndex uint64) {
time.Sleep(d)
d = conn.Exponential(d)
s.cache.Update(sd.Event{Err: err})
case index == defaultIndex:
s.logger.Log("err", "index is not sane")
time.Sleep(d)
d = conn.Exponential(d)
case index < lastIndex:
s.logger.Log("err", "index is less than previous; reseting to default")
lastIndex = defaultIndex
time.Sleep(d)
d = conn.Exponential(d)
default:
s.cache.Update(sd.Event{Instances: instances})
d = 10 * time.Millisecond
Expand Down
58 changes: 58 additions & 0 deletions sd/consul/instancer_test.go
Expand Up @@ -202,3 +202,61 @@ func TestInstancerWithEOF(t *testing.T) {
t.Error("failed, to receive call in time")
}
}

type badIndexTestClient struct {
client *testClient
called chan struct{}
}

func newBadIndexTestClient(client *testClient, called chan struct{}) Client {
return &badIndexTestClient{client: client, called: called}
}

func (c *badIndexTestClient) Register(r *consul.AgentServiceRegistration) error {
return c.client.Register(r)
}

func (c *badIndexTestClient) Deregister(r *consul.AgentServiceRegistration) error {
return c.client.Deregister(r)
}

func (c *badIndexTestClient) Service(service, tag string, passingOnly bool, queryOpts *consul.QueryOptions) ([]*consul.ServiceEntry, *consul.QueryMeta, error) {
switch {
case queryOpts.WaitIndex == 0:
queryOpts.WaitIndex = 100
case queryOpts.WaitIndex == 100:
queryOpts.WaitIndex = 99
default:
}
c.called <- struct{}{}
return c.client.Service(service, tag, passingOnly, queryOpts)
}

func TestInstancerWithInvalidIndex(t *testing.T) {
var (
called = make(chan struct{}, 1)
logger = log.NewNopLogger()
client = newBadIndexTestClient(newTestClient(consulState), called)
)

s := NewInstancer(client, logger, "search", []string{"api"}, true)
defer s.Stop()

select {
case <-called:
case <-time.Tick(time.Millisecond * 500):
t.Error("failed, to receive call")
}

state := s.cache.State()
if want, have := 2, len(state.Instances); want != have {
t.Errorf("want %d, have %d", want, have)
}

// loop should continue
select {
case <-called:
case <-time.Tick(time.Millisecond * 500):
t.Error("failed, to receive call in time")
}
}

0 comments on commit 5561005

Please sign in to comment.