diff --git a/etcdserver/api/v3rpc/rpctypes/error.go b/etcdserver/api/v3rpc/rpctypes/error.go index b7570ff0bfa..4047f015e87 100644 --- a/etcdserver/api/v3rpc/rpctypes/error.go +++ b/etcdserver/api/v3rpc/rpctypes/error.go @@ -74,6 +74,7 @@ var ( ErrGRPCTimeout = status.New(codes.Unavailable, "etcdserver: request timed out").Err() ErrGRPCTimeoutDueToLeaderFail = status.New(codes.Unavailable, "etcdserver: request timed out, possibly due to previous leader failure").Err() ErrGRPCTimeoutDueToConnectionLost = status.New(codes.Unavailable, "etcdserver: request timed out, possibly due to connection lost").Err() + ErrGRPCTimeoutWaitAppliedIndex = status.New(codes.Unavailable, "etcdserver: request timed out, waiting for the applied index took too long").Err() ErrGRPCUnhealthy = status.New(codes.Unavailable, "etcdserver: unhealthy cluster").Err() ErrGRPCCorrupt = status.New(codes.DataLoss, "etcdserver: corrupt cluster").Err() ErrGPRCNotSupportedForLearner = status.New(codes.Unavailable, "etcdserver: rpc not supported for learner").Err() @@ -132,6 +133,7 @@ var ( ErrorDesc(ErrGRPCTimeout): ErrGRPCTimeout, ErrorDesc(ErrGRPCTimeoutDueToLeaderFail): ErrGRPCTimeoutDueToLeaderFail, ErrorDesc(ErrGRPCTimeoutDueToConnectionLost): ErrGRPCTimeoutDueToConnectionLost, + ErrorDesc(ErrGRPCTimeoutWaitAppliedIndex): ErrGRPCTimeoutWaitAppliedIndex, ErrorDesc(ErrGRPCUnhealthy): ErrGRPCUnhealthy, ErrorDesc(ErrGRPCCorrupt): ErrGRPCCorrupt, ErrorDesc(ErrGPRCNotSupportedForLearner): ErrGPRCNotSupportedForLearner, @@ -192,6 +194,7 @@ var ( ErrTimeout = Error(ErrGRPCTimeout) ErrTimeoutDueToLeaderFail = Error(ErrGRPCTimeoutDueToLeaderFail) ErrTimeoutDueToConnectionLost = Error(ErrGRPCTimeoutDueToConnectionLost) + ErrTimeoutWaitAppliedIndex = Error(ErrGRPCTimeoutWaitAppliedIndex) ErrUnhealthy = Error(ErrGRPCUnhealthy) ErrCorrupt = Error(ErrGRPCCorrupt) ErrBadLeaderTransferee = Error(ErrGRPCBadLeaderTransferee) diff --git a/etcdserver/api/v3rpc/util.go b/etcdserver/api/v3rpc/util.go index cae348edb88..de78011f8e2 100644 --- a/etcdserver/api/v3rpc/util.go +++ b/etcdserver/api/v3rpc/util.go @@ -53,6 +53,7 @@ var toGRPCErrorMap = map[error]error{ etcdserver.ErrTimeout: rpctypes.ErrGRPCTimeout, etcdserver.ErrTimeoutDueToLeaderFail: rpctypes.ErrGRPCTimeoutDueToLeaderFail, etcdserver.ErrTimeoutDueToConnectionLost: rpctypes.ErrGRPCTimeoutDueToConnectionLost, + etcdserver.ErrTimeoutWaitAppliedIndex: rpctypes.ErrGRPCTimeoutWaitAppliedIndex, etcdserver.ErrUnhealthy: rpctypes.ErrGRPCUnhealthy, etcdserver.ErrKeyNotFound: rpctypes.ErrGRPCKeyNotFound, etcdserver.ErrCorrupt: rpctypes.ErrGRPCCorrupt, diff --git a/etcdserver/errors.go b/etcdserver/errors.go index d0fe28970d1..5a4cd5a87a7 100644 --- a/etcdserver/errors.go +++ b/etcdserver/errors.go @@ -26,6 +26,7 @@ var ( ErrTimeout = errors.New("etcdserver: request timed out") ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure") ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost") + ErrTimeoutWaitAppliedIndex = errors.New("etcdserver: request timed out, waiting for the applied index took too long") ErrTimeoutLeaderTransfer = errors.New("etcdserver: request timed out, leader transfer took too long") ErrLeaderChanged = errors.New("etcdserver: leader changed") ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members") diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 2d10b9b7b4f..0d94b796c14 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -1850,3 +1850,59 @@ func (s *sendMsgAppRespTransporter) Send(m []raftpb.Message) { } s.sendC <- send } + +func TestWaitAppliedIndex(t *testing.T) { + cases := []struct { + name string + appliedIndex uint64 + committedIndex uint64 + action func(s *EtcdServer) + ExpectedError error + }{ + { + name: "The applied Id is already equal to the commitId", + appliedIndex: 10, + committedIndex: 10, + action: func(s *EtcdServer) { + s.applyWait.Trigger(10) + }, + ExpectedError: nil, + }, + { + name: "The etcd server has already stopped", + appliedIndex: 10, + committedIndex: 12, + action: func(s *EtcdServer) { + s.stopping <- struct{}{} + }, + ExpectedError: ErrStopped, + }, + { + name: "Timed out waiting for the applied index", + appliedIndex: 10, + committedIndex: 12, + action: nil, + ExpectedError: ErrTimeoutWaitAppliedIndex, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + s := &EtcdServer{ + appliedIndex: tc.appliedIndex, + committedIndex: tc.committedIndex, + stopping: make(chan struct{}, 1), + applyWait: wait.NewTimeList(), + } + + if tc.action != nil { + go tc.action(s) + } + + err := s.waitAppliedIndex() + + if err != tc.ExpectedError { + t.Errorf("Unexpected error, want (%v), got (%v)", tc.ExpectedError, err) + } + }) + } +} diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index c16e050b70d..6487fd0e909 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -40,6 +40,10 @@ const ( // We should stop accepting new proposals if the gap growing to a certain point. maxGapBetweenApplyAndCommitIndex = 5000 traceThreshold = 100 * time.Millisecond + + // The timeout for the node to catch up its applied index, and is used in + // lease related operations, such as LeaseRenew and LeaseTimeToLive. + applyTimeout = time.Second ) type RaftKV interface { @@ -257,6 +261,18 @@ func (s *EtcdServer) LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (* return resp.(*pb.LeaseGrantResponse), nil } +func (s *EtcdServer) waitAppliedIndex() error { + select { + case <-s.ApplyWait(): + case <-s.stopping: + return ErrStopped + case <-time.After(applyTimeout): + return ErrTimeoutWaitAppliedIndex + } + + return nil +} + func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) { resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseRevoke: r}) if err != nil { @@ -266,26 +282,32 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) } func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) { - ttl, err := s.lessor.Renew(id) - if err == nil { // already requested to primary lessor(leader) - return ttl, nil - } - if err != lease.ErrNotPrimary { - return -1, err + if s.isLeader() { + if err := s.waitAppliedIndex(); err != nil { + return 0, err + } + + ttl, err := s.lessor.Renew(id) + if err == nil { // already requested to primary lessor(leader) + return ttl, nil + } + if err != lease.ErrNotPrimary { + return -1, err + } } cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout()) defer cancel() // renewals don't go through raft; forward to leader manually - for cctx.Err() == nil && err != nil { + for cctx.Err() == nil { leader, lerr := s.waitLeader(cctx) if lerr != nil { return -1, lerr } for _, url := range leader.PeerURLs { lurl := url + leasehttp.LeasePrefix - ttl, err = leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt) + ttl, err := leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt) if err == nil || err == lease.ErrLeaseNotFound { return ttl, err } @@ -299,7 +321,10 @@ func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, e } func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) { - if s.Leader() == s.ID() { + if s.isLeader() { + if err := s.waitAppliedIndex(); err != nil { + return nil, err + } // primary; timetolive directly from leader le := s.lessor.Lookup(lease.LeaseID(r.ID)) if le == nil { diff --git a/integration/v3_lease_test.go b/integration/v3_lease_test.go index 35e705ce1bf..659ebd92459 100644 --- a/integration/v3_lease_test.go +++ b/integration/v3_lease_test.go @@ -362,13 +362,13 @@ func testLeaseStress(t *testing.T, stresser func(context.Context, pb.LeaseClient defer cancel() errc := make(chan error) - for i := 0; i < 30; i++ { + for i := 0; i < 100; i++ { for j := 0; j < 3; j++ { go func(i int) { errc <- stresser(ctx, toGRPC(clus.Client(i)).Lease) }(j) } } - for i := 0; i < 90; i++ { + for i := 0; i < 300; i++ { if err := <-errc; err != nil { t.Fatal(err) }