Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve inconsistent Consul SD index handling. #999

Merged
merged 1 commit into from Jul 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion sd/consul/client_test.go
Expand Up @@ -90,7 +90,7 @@ func (c *testClient) Service(service, tag string, _ bool, opts *stdconsul.QueryO
results = append(results, entry)
}

return results, &stdconsul.QueryMeta{}, nil
return results, &stdconsul.QueryMeta{LastIndex: opts.WaitIndex}, nil
}

func (c *testClient) Register(r *stdconsul.AgentServiceRegistration) error {
Expand Down
12 changes: 11 additions & 1 deletion sd/consul/instancer.go
Expand Up @@ -67,7 +67,8 @@ func (s *Instancer) loop(lastIndex uint64) {
d time.Duration = 10 * time.Millisecond
)
for {
instances, lastIndex, err = s.getInstances(lastIndex, s.quitc)
index := lastIndex
instances, index, err = s.getInstances(lastIndex, s.quitc)
switch {
case err == errStopped:
return // stopped via quitc
Expand All @@ -76,6 +77,15 @@ func (s *Instancer) loop(lastIndex uint64) {
time.Sleep(d)
d = conn.Exponential(d)
s.cache.Update(sd.Event{Err: err})
case index == defaultIndex:
s.logger.Log("err", "index is not sane")
time.Sleep(d)
d = conn.Exponential(d)
case index < lastIndex:
s.logger.Log("err", "index is less than previous; reseting to default")
lastIndex = defaultIndex
time.Sleep(d)
d = conn.Exponential(d)
default:
s.cache.Update(sd.Event{Instances: instances})
d = 10 * time.Millisecond
Expand Down
58 changes: 58 additions & 0 deletions sd/consul/instancer_test.go
Expand Up @@ -202,3 +202,61 @@ func TestInstancerWithEOF(t *testing.T) {
t.Error("failed, to receive call in time")
}
}

type badIndexTestClient struct {
client *testClient
called chan struct{}
}

func newBadIndexTestClient(client *testClient, called chan struct{}) Client {
return &badIndexTestClient{client: client, called: called}
}

func (c *badIndexTestClient) Register(r *consul.AgentServiceRegistration) error {
return c.client.Register(r)
}

func (c *badIndexTestClient) Deregister(r *consul.AgentServiceRegistration) error {
return c.client.Deregister(r)
}

func (c *badIndexTestClient) Service(service, tag string, passingOnly bool, queryOpts *consul.QueryOptions) ([]*consul.ServiceEntry, *consul.QueryMeta, error) {
switch {
case queryOpts.WaitIndex == 0:
queryOpts.WaitIndex = 100
case queryOpts.WaitIndex == 100:
queryOpts.WaitIndex = 99
default:
}
c.called <- struct{}{}
return c.client.Service(service, tag, passingOnly, queryOpts)
}

func TestInstancerWithInvalidIndex(t *testing.T) {
var (
called = make(chan struct{}, 1)
logger = log.NewNopLogger()
client = newBadIndexTestClient(newTestClient(consulState), called)
)

s := NewInstancer(client, logger, "search", []string{"api"}, true)
defer s.Stop()

select {
case <-called:
case <-time.Tick(time.Millisecond * 500):
t.Error("failed, to receive call")
}

state := s.cache.State()
if want, have := 2, len(state.Instances); want != have {
t.Errorf("want %d, have %d", want, have)
}

// loop should continue
select {
case <-called:
case <-time.Tick(time.Millisecond * 500):
t.Error("failed, to receive call in time")
}
}