diff --git a/sd/consul/client_test.go b/sd/consul/client_test.go index bc988b3e2..feeec36f3 100644 --- a/sd/consul/client_test.go +++ b/sd/consul/client_test.go @@ -90,7 +90,7 @@ func (c *testClient) Service(service, tag string, _ bool, opts *stdconsul.QueryO results = append(results, entry) } - return results, &stdconsul.QueryMeta{}, nil + return results, &stdconsul.QueryMeta{LastIndex: opts.WaitIndex}, nil } func (c *testClient) Register(r *stdconsul.AgentServiceRegistration) error { diff --git a/sd/consul/instancer.go b/sd/consul/instancer.go index fc992dfe0..75f4fc642 100644 --- a/sd/consul/instancer.go +++ b/sd/consul/instancer.go @@ -67,7 +67,8 @@ func (s *Instancer) loop(lastIndex uint64) { d time.Duration = 10 * time.Millisecond ) for { - instances, lastIndex, err = s.getInstances(lastIndex, s.quitc) + index := lastIndex + instances, index, err = s.getInstances(lastIndex, s.quitc) switch { case err == errStopped: return // stopped via quitc @@ -76,6 +77,15 @@ func (s *Instancer) loop(lastIndex uint64) { time.Sleep(d) d = conn.Exponential(d) s.cache.Update(sd.Event{Err: err}) + case index == defaultIndex: + s.logger.Log("err", "index is not sane") + time.Sleep(d) + d = conn.Exponential(d) + case index < lastIndex: + s.logger.Log("err", "index is less than previous; reseting to default") + lastIndex = defaultIndex + time.Sleep(d) + d = conn.Exponential(d) default: s.cache.Update(sd.Event{Instances: instances}) d = 10 * time.Millisecond diff --git a/sd/consul/instancer_test.go b/sd/consul/instancer_test.go index 745e1e25e..52b055970 100644 --- a/sd/consul/instancer_test.go +++ b/sd/consul/instancer_test.go @@ -202,3 +202,61 @@ func TestInstancerWithEOF(t *testing.T) { t.Error("failed, to receive call in time") } } + +type badIndexTestClient struct { + client *testClient + called chan struct{} +} + +func newBadIndexTestClient(client *testClient, called chan struct{}) Client { + return &badIndexTestClient{client: client, called: called} +} + +func (c *badIndexTestClient) Register(r *consul.AgentServiceRegistration) error { + return c.client.Register(r) +} + +func (c *badIndexTestClient) Deregister(r *consul.AgentServiceRegistration) error { + return c.client.Deregister(r) +} + +func (c *badIndexTestClient) Service(service, tag string, passingOnly bool, queryOpts *consul.QueryOptions) ([]*consul.ServiceEntry, *consul.QueryMeta, error) { + switch { + case queryOpts.WaitIndex == 0: + queryOpts.WaitIndex = 100 + case queryOpts.WaitIndex == 100: + queryOpts.WaitIndex = 99 + default: + } + c.called <- struct{}{} + return c.client.Service(service, tag, passingOnly, queryOpts) +} + +func TestInstancerWithInvalidIndex(t *testing.T) { + var ( + called = make(chan struct{}, 1) + logger = log.NewNopLogger() + client = newBadIndexTestClient(newTestClient(consulState), called) + ) + + s := NewInstancer(client, logger, "search", []string{"api"}, true) + defer s.Stop() + + select { + case <-called: + case <-time.Tick(time.Millisecond * 500): + t.Error("failed, to receive call") + } + + state := s.cache.State() + if want, have := 2, len(state.Instances); want != have { + t.Errorf("want %d, have %d", want, have) + } + + // loop should continue + select { + case <-called: + case <-time.Tick(time.Millisecond * 500): + t.Error("failed, to receive call in time") + } +}