From bc5307de955b19b0fcd513d4d7270038de6391ea Mon Sep 17 00:00:00 2001 From: ahrtr Date: Tue, 12 Apr 2022 14:01:14 +0800 Subject: [PATCH] support linearizable renew lease When etcdserver receives a LeaseRenew request, it may be still in progress of processing the LeaseGrantRequest on exact the same leaseID. Accordingly it may return a TTL=0 to client due to the leaseID not found error. So the leader should wait for the appliedID to be available before processing client requests. --- api/v3rpc/rpctypes/error.go | 2 ++ server/etcdserver/api/v3rpc/util.go | 1 + server/etcdserver/errors.go | 1 + server/etcdserver/server_test.go | 56 +++++++++++++++++++++++++++++ server/etcdserver/v3_server.go | 43 +++++++++++++++++----- tests/integration/cluster.go | 2 +- tests/integration/v3_lease_test.go | 41 ++++++++++++++++----- 7 files changed, 128 insertions(+), 18 deletions(-) diff --git a/api/v3rpc/rpctypes/error.go b/api/v3rpc/rpctypes/error.go index b9f4842da25..ae112ae131b 100644 --- a/api/v3rpc/rpctypes/error.go +++ b/api/v3rpc/rpctypes/error.go @@ -75,6 +75,7 @@ var ( ErrGRPCTimeout = status.New(codes.Unavailable, "etcdserver: request timed out").Err() ErrGRPCTimeoutDueToLeaderFail = status.New(codes.Unavailable, "etcdserver: request timed out, possibly due to previous leader failure").Err() ErrGRPCTimeoutDueToConnectionLost = status.New(codes.Unavailable, "etcdserver: request timed out, possibly due to connection lost").Err() + ErrGRPCTimeoutWaitAppliedIndex = status.New(codes.Unavailable, "etcdserver: request timed out, waiting for the applied index took too long").Err() ErrGRPCUnhealthy = status.New(codes.Unavailable, "etcdserver: unhealthy cluster").Err() ErrGRPCCorrupt = status.New(codes.DataLoss, "etcdserver: corrupt cluster").Err() ErrGPRCNotSupportedForLearner = status.New(codes.Unavailable, "etcdserver: rpc not supported for learner").Err() @@ -208,6 +209,7 @@ var ( ErrTimeout = Error(ErrGRPCTimeout) ErrTimeoutDueToLeaderFail = Error(ErrGRPCTimeoutDueToLeaderFail) ErrTimeoutDueToConnectionLost = Error(ErrGRPCTimeoutDueToConnectionLost) + ErrTimeoutWaitAppliedIndex = Error(ErrGRPCTimeoutWaitAppliedIndex) ErrUnhealthy = Error(ErrGRPCUnhealthy) ErrCorrupt = Error(ErrGRPCCorrupt) ErrBadLeaderTransferee = Error(ErrGRPCBadLeaderTransferee) diff --git a/server/etcdserver/api/v3rpc/util.go b/server/etcdserver/api/v3rpc/util.go index 91072b70358..b2389342686 100644 --- a/server/etcdserver/api/v3rpc/util.go +++ b/server/etcdserver/api/v3rpc/util.go @@ -53,6 +53,7 @@ var toGRPCErrorMap = map[error]error{ etcdserver.ErrTimeout: rpctypes.ErrGRPCTimeout, etcdserver.ErrTimeoutDueToLeaderFail: rpctypes.ErrGRPCTimeoutDueToLeaderFail, etcdserver.ErrTimeoutDueToConnectionLost: rpctypes.ErrGRPCTimeoutDueToConnectionLost, + etcdserver.ErrTimeoutWaitAppliedIndex: rpctypes.ErrGRPCTimeoutWaitAppliedIndex, etcdserver.ErrUnhealthy: rpctypes.ErrGRPCUnhealthy, etcdserver.ErrKeyNotFound: rpctypes.ErrGRPCKeyNotFound, etcdserver.ErrCorrupt: rpctypes.ErrGRPCCorrupt, diff --git a/server/etcdserver/errors.go b/server/etcdserver/errors.go index dc2a85fdd47..f6d77fe72a3 100644 --- a/server/etcdserver/errors.go +++ b/server/etcdserver/errors.go @@ -27,6 +27,7 @@ var ( ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure") ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost") ErrTimeoutLeaderTransfer = errors.New("etcdserver: request timed out, leader transfer took too long") + ErrTimeoutWaitAppliedIndex = errors.New("etcdserver: request timed out, waiting for the applied index took too long") ErrLeaderChanged = errors.New("etcdserver: leader changed") ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members") ErrLearnerNotReady = errors.New("etcdserver: can only promote a learner member which is in sync with leader") diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 5127e5c71b5..70c2956dbd2 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -2036,3 +2036,59 @@ func (s *sendMsgAppRespTransporter) Send(m []raftpb.Message) { } s.sendC <- send } + +func TestWaitAppliedIndex(t *testing.T) { + cases := []struct { + name string + appliedIndex uint64 + committedIndex uint64 + action func(s *EtcdServer) + ExpectedError error + }{ + { + name: "The applied Id is already equal to the commitId", + appliedIndex: 10, + committedIndex: 10, + action: func(s *EtcdServer) { + s.applyWait.Trigger(10) + }, + ExpectedError: nil, + }, + { + name: "The etcd server has already stopped", + appliedIndex: 10, + committedIndex: 12, + action: func(s *EtcdServer) { + s.stopping <- struct{}{} + }, + ExpectedError: ErrStopped, + }, + { + name: "Timed out waiting for the applied index", + appliedIndex: 10, + committedIndex: 12, + action: nil, + ExpectedError: ErrTimeoutWaitAppliedIndex, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + s := &EtcdServer{ + appliedIndex: tc.appliedIndex, + committedIndex: tc.committedIndex, + stopping: make(chan struct{}, 1), + applyWait: wait.NewTimeList(), + } + + if tc.action != nil { + go tc.action(s) + } + + err := s.waitAppliedIndex() + + if err != tc.ExpectedError { + t.Errorf("Unexpected error, want (%v), got (%v)", tc.ExpectedError, err) + } + }) + } +} diff --git a/server/etcdserver/v3_server.go b/server/etcdserver/v3_server.go index 442288a6ee2..0184b8d18b8 100644 --- a/server/etcdserver/v3_server.go +++ b/server/etcdserver/v3_server.go @@ -45,6 +45,10 @@ const ( maxGapBetweenApplyAndCommitIndex = 5000 traceThreshold = 100 * time.Millisecond readIndexRetryTime = 500 * time.Millisecond + + // The timeout for the node to catch up its applied index, and is used in + // lease related operations, such as LeaseRenew and LeaseTimeToLive. + applyTimeout = time.Second ) type RaftKV interface { @@ -271,6 +275,18 @@ func (s *EtcdServer) LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (* return resp.(*pb.LeaseGrantResponse), nil } +func (s *EtcdServer) waitAppliedIndex() error { + select { + case <-s.ApplyWait(): + case <-s.stopping: + return ErrStopped + case <-time.After(applyTimeout): + return ErrTimeoutWaitAppliedIndex + } + + return nil +} + func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) { resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseRevoke: r}) if err != nil { @@ -280,26 +296,32 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) } func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) { - ttl, err := s.lessor.Renew(id) - if err == nil { // already requested to primary lessor(leader) - return ttl, nil - } - if err != lease.ErrNotPrimary { - return -1, err + if s.isLeader() { + if err := s.waitAppliedIndex(); err != nil { + return 0, err + } + + ttl, err := s.lessor.Renew(id) + if err == nil { // already requested to primary lessor(leader) + return ttl, nil + } + if err != lease.ErrNotPrimary { + return -1, err + } } cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout()) defer cancel() // renewals don't go through raft; forward to leader manually - for cctx.Err() == nil && err != nil { + for cctx.Err() == nil { leader, lerr := s.waitLeader(cctx) if lerr != nil { return -1, lerr } for _, url := range leader.PeerURLs { lurl := url + leasehttp.LeasePrefix - ttl, err = leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt) + ttl, err := leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt) if err == nil || err == lease.ErrLeaseNotFound { return ttl, err } @@ -315,7 +337,10 @@ func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, e } func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) { - if s.Leader() == s.ID() { + if s.isLeader() { + if err := s.waitAppliedIndex(); err != nil { + return nil, err + } // primary; timetolive directly from leader le := s.lessor.Lookup(lease.LeaseID(r.ID)) if le == nil { diff --git a/tests/integration/cluster.go b/tests/integration/cluster.go index 8b2cb53f3e9..fedad797ae9 100644 --- a/tests/integration/cluster.go +++ b/tests/integration/cluster.go @@ -1454,7 +1454,7 @@ func (c *ClusterV3) Client(i int) *clientv3.Client { func (c *ClusterV3) ClusterClient() (client *clientv3.Client, err error) { if c.clusterClient == nil { - endpoints := []string{} + var endpoints []string for _, m := range c.Members { endpoints = append(endpoints, m.grpcURL) } diff --git a/tests/integration/v3_lease_test.go b/tests/integration/v3_lease_test.go index 29b612e2320..72a44fd9018 100644 --- a/tests/integration/v3_lease_test.go +++ b/tests/integration/v3_lease_test.go @@ -16,6 +16,7 @@ package integration import ( "context" + "errors" "fmt" "math" "testing" @@ -498,17 +499,31 @@ func TestV3LeaseLeases(t *testing.T) { // it was oberserved that the immediate lease renewal after granting a lease from follower resulted lease not found. // related issue https://github.com/etcd-io/etcd/issues/6978 func TestV3LeaseRenewStress(t *testing.T) { - testLeaseStress(t, stressLeaseRenew) + testLeaseStress(t, stressLeaseRenew, false) +} + +// TestV3LeaseRenewStressWithClusterClient is similar to TestV3LeaseRenewStress, +// but it uses a cluster client instead of a specific member's client. +// The related issue is https://github.com/etcd-io/etcd/issues/13675. +func TestV3LeaseRenewStressWithClusterClient(t *testing.T) { + testLeaseStress(t, stressLeaseRenew, true) } // TestV3LeaseTimeToLiveStress keeps creating lease and retrieving it immediately to ensure the lease can be retrieved. // it was oberserved that the immediate lease retrieval after granting a lease from follower resulted lease not found. // related issue https://github.com/etcd-io/etcd/issues/6978 func TestV3LeaseTimeToLiveStress(t *testing.T) { - testLeaseStress(t, stressLeaseTimeToLive) + testLeaseStress(t, stressLeaseTimeToLive, false) } -func testLeaseStress(t *testing.T, stresser func(context.Context, pb.LeaseClient) error) { +// TestV3LeaseTimeToLiveStressWithClusterClient is similar to TestV3LeaseTimeToLiveStress, +// but it uses a cluster client instead of a specific member's client. +// The related issue is https://github.com/etcd-io/etcd/issues/13675. +func TestV3LeaseTimeToLiveStressWithClusterClient(t *testing.T) { + testLeaseStress(t, stressLeaseTimeToLive, true) +} + +func testLeaseStress(t *testing.T, stresser func(context.Context, pb.LeaseClient) error, useClusterClient bool) { BeforeTest(t) clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) @@ -517,13 +532,23 @@ func testLeaseStress(t *testing.T, stresser func(context.Context, pb.LeaseClient defer cancel() errc := make(chan error) - for i := 0; i < 30; i++ { - for j := 0; j < 3; j++ { - go func(i int) { errc <- stresser(ctx, toGRPC(clus.Client(i)).Lease) }(j) + if useClusterClient { + for i := 0; i < 300; i++ { + clusterClient, err := clus.ClusterClient() + if err != nil { + t.Fatal(err) + } + go func(i int) { errc <- stresser(ctx, toGRPC(clusterClient).Lease) }(i) + } + } else { + for i := 0; i < 100; i++ { + for j := 0; j < 3; j++ { + go func(i int) { errc <- stresser(ctx, toGRPC(clus.Client(i)).Lease) }(j) + } } } - for i := 0; i < 90; i++ { + for i := 0; i < 300; i++ { if err := <-errc; err != nil { t.Fatal(err) } @@ -554,7 +579,7 @@ func stressLeaseRenew(tctx context.Context, lc pb.LeaseClient) (reterr error) { continue } if rresp.TTL == 0 { - return fmt.Errorf("TTL shouldn't be 0 so soon") + return errors.New("TTL shouldn't be 0 so soon") } } return nil