Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.5] Support linearizable renew lease #13932

Merged
merged 1 commit into from Apr 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/v3rpc/rpctypes/error.go
Expand Up @@ -75,6 +75,7 @@ var (
ErrGRPCTimeout = status.New(codes.Unavailable, "etcdserver: request timed out").Err()
ErrGRPCTimeoutDueToLeaderFail = status.New(codes.Unavailable, "etcdserver: request timed out, possibly due to previous leader failure").Err()
ErrGRPCTimeoutDueToConnectionLost = status.New(codes.Unavailable, "etcdserver: request timed out, possibly due to connection lost").Err()
ErrGRPCTimeoutWaitAppliedIndex = status.New(codes.Unavailable, "etcdserver: request timed out, waiting for the applied index took too long").Err()
ErrGRPCUnhealthy = status.New(codes.Unavailable, "etcdserver: unhealthy cluster").Err()
ErrGRPCCorrupt = status.New(codes.DataLoss, "etcdserver: corrupt cluster").Err()
ErrGPRCNotSupportedForLearner = status.New(codes.Unavailable, "etcdserver: rpc not supported for learner").Err()
Expand Down Expand Up @@ -208,6 +209,7 @@ var (
ErrTimeout = Error(ErrGRPCTimeout)
ErrTimeoutDueToLeaderFail = Error(ErrGRPCTimeoutDueToLeaderFail)
ErrTimeoutDueToConnectionLost = Error(ErrGRPCTimeoutDueToConnectionLost)
ErrTimeoutWaitAppliedIndex = Error(ErrGRPCTimeoutWaitAppliedIndex)
ErrUnhealthy = Error(ErrGRPCUnhealthy)
ErrCorrupt = Error(ErrGRPCCorrupt)
ErrBadLeaderTransferee = Error(ErrGRPCBadLeaderTransferee)
Expand Down
1 change: 1 addition & 0 deletions server/etcdserver/api/v3rpc/util.go
Expand Up @@ -53,6 +53,7 @@ var toGRPCErrorMap = map[error]error{
etcdserver.ErrTimeout: rpctypes.ErrGRPCTimeout,
etcdserver.ErrTimeoutDueToLeaderFail: rpctypes.ErrGRPCTimeoutDueToLeaderFail,
etcdserver.ErrTimeoutDueToConnectionLost: rpctypes.ErrGRPCTimeoutDueToConnectionLost,
etcdserver.ErrTimeoutWaitAppliedIndex: rpctypes.ErrGRPCTimeoutWaitAppliedIndex,
etcdserver.ErrUnhealthy: rpctypes.ErrGRPCUnhealthy,
etcdserver.ErrKeyNotFound: rpctypes.ErrGRPCKeyNotFound,
etcdserver.ErrCorrupt: rpctypes.ErrGRPCCorrupt,
Expand Down
1 change: 1 addition & 0 deletions server/etcdserver/errors.go
Expand Up @@ -27,6 +27,7 @@ var (
ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure")
ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost")
ErrTimeoutLeaderTransfer = errors.New("etcdserver: request timed out, leader transfer took too long")
ErrTimeoutWaitAppliedIndex = errors.New("etcdserver: request timed out, waiting for the applied index took too long")
ErrLeaderChanged = errors.New("etcdserver: leader changed")
ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members")
ErrLearnerNotReady = errors.New("etcdserver: can only promote a learner member which is in sync with leader")
Expand Down
56 changes: 56 additions & 0 deletions server/etcdserver/server_test.go
Expand Up @@ -2036,3 +2036,59 @@ func (s *sendMsgAppRespTransporter) Send(m []raftpb.Message) {
}
s.sendC <- send
}

func TestWaitAppliedIndex(t *testing.T) {
cases := []struct {
name string
appliedIndex uint64
committedIndex uint64
action func(s *EtcdServer)
ExpectedError error
}{
{
name: "The applied Id is already equal to the commitId",
appliedIndex: 10,
committedIndex: 10,
action: func(s *EtcdServer) {
s.applyWait.Trigger(10)
},
ExpectedError: nil,
},
{
name: "The etcd server has already stopped",
appliedIndex: 10,
committedIndex: 12,
action: func(s *EtcdServer) {
s.stopping <- struct{}{}
},
ExpectedError: ErrStopped,
},
{
name: "Timed out waiting for the applied index",
appliedIndex: 10,
committedIndex: 12,
action: nil,
ExpectedError: ErrTimeoutWaitAppliedIndex,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
s := &EtcdServer{
appliedIndex: tc.appliedIndex,
committedIndex: tc.committedIndex,
stopping: make(chan struct{}, 1),
applyWait: wait.NewTimeList(),
}

if tc.action != nil {
go tc.action(s)
}

err := s.waitAppliedIndex()

if err != tc.ExpectedError {
t.Errorf("Unexpected error, want (%v), got (%v)", tc.ExpectedError, err)
}
})
}
}
43 changes: 34 additions & 9 deletions server/etcdserver/v3_server.go
Expand Up @@ -45,6 +45,10 @@ const (
maxGapBetweenApplyAndCommitIndex = 5000
traceThreshold = 100 * time.Millisecond
readIndexRetryTime = 500 * time.Millisecond

// The timeout for the node to catch up its applied index, and is used in
// lease related operations, such as LeaseRenew and LeaseTimeToLive.
applyTimeout = time.Second
)

type RaftKV interface {
Expand Down Expand Up @@ -271,6 +275,18 @@ func (s *EtcdServer) LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*
return resp.(*pb.LeaseGrantResponse), nil
}

func (s *EtcdServer) waitAppliedIndex() error {
select {
case <-s.ApplyWait():
case <-s.stopping:
return ErrStopped
case <-time.After(applyTimeout):
return ErrTimeoutWaitAppliedIndex
}

return nil
}

func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseRevoke: r})
if err != nil {
Expand All @@ -280,26 +296,32 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest)
}

func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) {
ttl, err := s.lessor.Renew(id)
if err == nil { // already requested to primary lessor(leader)
return ttl, nil
}
if err != lease.ErrNotPrimary {
return -1, err
if s.isLeader() {
if err := s.waitAppliedIndex(); err != nil {
return 0, err
}

ttl, err := s.lessor.Renew(id)
if err == nil { // already requested to primary lessor(leader)
return ttl, nil
}
if err != lease.ErrNotPrimary {
return -1, err
}
}

cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
defer cancel()

// renewals don't go through raft; forward to leader manually
for cctx.Err() == nil && err != nil {
for cctx.Err() == nil {
leader, lerr := s.waitLeader(cctx)
if lerr != nil {
return -1, lerr
}
for _, url := range leader.PeerURLs {
lurl := url + leasehttp.LeasePrefix
ttl, err = leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt)
ttl, err := leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt)
if err == nil || err == lease.ErrLeaseNotFound {
return ttl, err
}
Expand All @@ -315,7 +337,10 @@ func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, e
}

func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
if s.Leader() == s.ID() {
if s.isLeader() {
if err := s.waitAppliedIndex(); err != nil {
return nil, err
}
// primary; timetolive directly from leader
le := s.lessor.Lookup(lease.LeaseID(r.ID))
if le == nil {
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/cluster.go
Expand Up @@ -1454,7 +1454,7 @@ func (c *ClusterV3) Client(i int) *clientv3.Client {

func (c *ClusterV3) ClusterClient() (client *clientv3.Client, err error) {
if c.clusterClient == nil {
endpoints := []string{}
var endpoints []string
for _, m := range c.Members {
endpoints = append(endpoints, m.grpcURL)
}
Expand Down
41 changes: 33 additions & 8 deletions tests/integration/v3_lease_test.go
Expand Up @@ -16,6 +16,7 @@ package integration

import (
"context"
"errors"
"fmt"
"math"
"testing"
Expand Down Expand Up @@ -498,17 +499,31 @@ func TestV3LeaseLeases(t *testing.T) {
// it was oberserved that the immediate lease renewal after granting a lease from follower resulted lease not found.
// related issue https://github.com/etcd-io/etcd/issues/6978
func TestV3LeaseRenewStress(t *testing.T) {
testLeaseStress(t, stressLeaseRenew)
testLeaseStress(t, stressLeaseRenew, false)
}

// TestV3LeaseRenewStressWithClusterClient is similar to TestV3LeaseRenewStress,
// but it uses a cluster client instead of a specific member's client.
// The related issue is https://github.com/etcd-io/etcd/issues/13675.
func TestV3LeaseRenewStressWithClusterClient(t *testing.T) {
testLeaseStress(t, stressLeaseRenew, true)
}

// TestV3LeaseTimeToLiveStress keeps creating lease and retrieving it immediately to ensure the lease can be retrieved.
// it was oberserved that the immediate lease retrieval after granting a lease from follower resulted lease not found.
// related issue https://github.com/etcd-io/etcd/issues/6978
func TestV3LeaseTimeToLiveStress(t *testing.T) {
testLeaseStress(t, stressLeaseTimeToLive)
testLeaseStress(t, stressLeaseTimeToLive, false)
}

func testLeaseStress(t *testing.T, stresser func(context.Context, pb.LeaseClient) error) {
// TestV3LeaseTimeToLiveStressWithClusterClient is similar to TestV3LeaseTimeToLiveStress,
// but it uses a cluster client instead of a specific member's client.
// The related issue is https://github.com/etcd-io/etcd/issues/13675.
func TestV3LeaseTimeToLiveStressWithClusterClient(t *testing.T) {
testLeaseStress(t, stressLeaseTimeToLive, true)
}

func testLeaseStress(t *testing.T, stresser func(context.Context, pb.LeaseClient) error, useClusterClient bool) {
BeforeTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
Expand All @@ -517,13 +532,23 @@ func testLeaseStress(t *testing.T, stresser func(context.Context, pb.LeaseClient
defer cancel()
errc := make(chan error)

for i := 0; i < 30; i++ {
for j := 0; j < 3; j++ {
go func(i int) { errc <- stresser(ctx, toGRPC(clus.Client(i)).Lease) }(j)
if useClusterClient {
for i := 0; i < 300; i++ {
clusterClient, err := clus.ClusterClient()
if err != nil {
t.Fatal(err)
}
go func(i int) { errc <- stresser(ctx, toGRPC(clusterClient).Lease) }(i)
}
} else {
for i := 0; i < 100; i++ {
for j := 0; j < 3; j++ {
go func(i int) { errc <- stresser(ctx, toGRPC(clus.Client(i)).Lease) }(j)
}
}
}

for i := 0; i < 90; i++ {
for i := 0; i < 300; i++ {
if err := <-errc; err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -554,7 +579,7 @@ func stressLeaseRenew(tctx context.Context, lc pb.LeaseClient) (reterr error) {
continue
}
if rresp.TTL == 0 {
return fmt.Errorf("TTL shouldn't be 0 so soon")
return errors.New("TTL shouldn't be 0 so soon")
}
}
return nil
Expand Down