Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.5] server: don't panic in readonly serializable txn #14178

Merged
merged 1 commit into from Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
33 changes: 24 additions & 9 deletions server/etcdserver/apply.go
Expand Up @@ -428,6 +428,7 @@ func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.Ra
}

func (a *applierV3backend) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
lg := a.s.Logger()
trace := traceutil.Get(ctx)
if trace.IsEmpty() {
trace = traceutil.New("transaction", a.s.Logger())
Expand Down Expand Up @@ -474,7 +475,18 @@ func (a *applierV3backend) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnR
txn.End()
txn = a.s.KV().Write(trace)
}
a.applyTxn(ctx, txn, rt, txnPath, txnResp)
_, err := a.applyTxn(ctx, txn, rt, txnPath, txnResp)
if err != nil {
if isWrite {
// end txn to release locks before panic
txn.End()
// When txn with write operations starts it has to be successful
// We don't have a way to recover state in case of write failure
lg.Panic("unexpected error during txn with writes", zap.Error(err))
} else {
lg.Error("unexpected error during readonly txn", zap.Error(err))
}
}
rev := txn.Rev()
if len(txn.Changes()) != 0 {
rev++
Expand All @@ -486,7 +498,7 @@ func (a *applierV3backend) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnR
traceutil.Field{Key: "number_of_response", Value: len(txnResp.Responses)},
traceutil.Field{Key: "response_revision", Value: txnResp.Header.Revision},
)
return txnResp, trace, nil
return txnResp, trace, err
}

// newTxnResp allocates a txn response for a txn request given a path.
Expand Down Expand Up @@ -617,14 +629,13 @@ func compareKV(c *pb.Compare, ckv mvccpb.KeyValue) bool {
return true
}

func (a *applierV3backend) applyTxn(ctx context.Context, txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int) {
func (a *applierV3backend) applyTxn(ctx context.Context, txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int, err error) {
trace := traceutil.Get(ctx)
reqs := rt.Success
if !txnPath[0] {
reqs = rt.Failure
}

lg := a.s.Logger()
for i, req := range reqs {
respi := tresp.Responses[i].Response
switch tv := req.Request.(type) {
Expand All @@ -635,7 +646,7 @@ func (a *applierV3backend) applyTxn(ctx context.Context, txn mvcc.TxnWrite, rt *
traceutil.Field{Key: "range_end", Value: string(tv.RequestRange.RangeEnd)})
resp, err := a.Range(ctx, txn, tv.RequestRange)
if err != nil {
lg.Panic("unexpected error during txn", zap.Error(err))
return 0, fmt.Errorf("applyTxn: failed Range: %w", err)
}
respi.(*pb.ResponseOp_ResponseRange).ResponseRange = resp
trace.StopSubTrace()
Expand All @@ -646,26 +657,30 @@ func (a *applierV3backend) applyTxn(ctx context.Context, txn mvcc.TxnWrite, rt *
traceutil.Field{Key: "req_size", Value: tv.RequestPut.Size()})
resp, _, err := a.Put(ctx, txn, tv.RequestPut)
if err != nil {
lg.Panic("unexpected error during txn", zap.Error(err))
return 0, fmt.Errorf("applyTxn: failed Put: %w", err)
}
respi.(*pb.ResponseOp_ResponsePut).ResponsePut = resp
trace.StopSubTrace()
case *pb.RequestOp_RequestDeleteRange:
resp, err := a.DeleteRange(txn, tv.RequestDeleteRange)
if err != nil {
lg.Panic("unexpected error during txn", zap.Error(err))
return 0, fmt.Errorf("applyTxn: failed DeleteRange: %w", err)
}
respi.(*pb.ResponseOp_ResponseDeleteRange).ResponseDeleteRange = resp
case *pb.RequestOp_RequestTxn:
resp := respi.(*pb.ResponseOp_ResponseTxn).ResponseTxn
applyTxns := a.applyTxn(ctx, txn, tv.RequestTxn, txnPath[1:], resp)
applyTxns, err := a.applyTxn(ctx, txn, tv.RequestTxn, txnPath[1:], resp)
if err != nil {
// don't wrap the error. It's a recursive call and err should be already wrapped
return 0, err
}
txns += applyTxns + 1
txnPath = txnPath[applyTxns+1:]
default:
// empty union
}
}
return txns
return txns, nil
}

func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) {
Expand Down
96 changes: 96 additions & 0 deletions server/etcdserver/apply_test.go
@@ -0,0 +1,96 @@
package etcdserver

import (
"context"
"strings"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/server/v3/lease"
"go.etcd.io/etcd/server/v3/mvcc"
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
)

func TestReadonlyTxnError(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, b)
s := mvcc.New(zap.NewExample(), b, &lease.FakeLessor{}, mvcc.StoreConfig{})
defer s.Close()

// setup minimal server to get access to applier
srv := &EtcdServer{lgMu: new(sync.RWMutex), lg: zap.NewExample(), r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: newNodeRecorder()})}
srv.kv = s
srv.be = b

a := srv.newApplierV3Backend()

// setup cancelled context
ctx, cancel := context.WithCancel(context.TODO())
cancel()

// put some data to prevent early termination in rangeKeys
// we are expecting failure on cancelled context check
s.Put([]byte("foo"), []byte("bar"), lease.NoLease)

txn := &pb.TxnRequest{
Success: []*pb.RequestOp{
{
Request: &pb.RequestOp_RequestRange{
RequestRange: &pb.RangeRequest{
Key: []byte("foo"),
},
},
},
},
}

_, _, err := a.Txn(ctx, txn)
if err == nil || !strings.Contains(err.Error(), "applyTxn: failed Range: rangeKeys: context cancelled: context canceled") {
t.Fatalf("Expected context canceled error, got %v", err)
}
}

func TestWriteTxnPanic(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, b)
s := mvcc.New(zap.NewExample(), b, &lease.FakeLessor{}, mvcc.StoreConfig{})
defer s.Close()

// setup minimal server to get access to applier
srv := &EtcdServer{lgMu: new(sync.RWMutex), lg: zap.NewExample(), r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: newNodeRecorder()})}
srv.kv = s
srv.be = b

a := srv.newApplierV3Backend()

// setup cancelled context
ctx, cancel := context.WithCancel(context.TODO())
cancel()

// write txn that puts some data and then fails in range due to cancelled context
txn := &pb.TxnRequest{
Success: []*pb.RequestOp{
{
Request: &pb.RequestOp_RequestPut{
RequestPut: &pb.PutRequest{
Key: []byte("foo"),
Value: []byte("bar"),
},
},
},
{
Request: &pb.RequestOp_RequestRange{
RequestRange: &pb.RangeRequest{
Key: []byte("foo"),
},
},
},
},
}

assert.Panics(t, func() { a.Txn(ctx, txn) }, "Expected panic in Txn with writes")
}
3 changes: 2 additions & 1 deletion server/mvcc/kvstore_txn.go
Expand Up @@ -16,6 +16,7 @@ package mvcc

import (
"context"
"fmt"

"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/pkg/v3/traceutil"
Expand Down Expand Up @@ -156,7 +157,7 @@ func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev i
for i, revpair := range revpairs[:len(kvs)] {
select {
case <-ctx.Done():
return nil, ctx.Err()
return nil, fmt.Errorf("rangeKeys: context cancelled: %w", ctx.Err())
default:
}
revToBytes(revpair, revBytes)
Expand Down