Skip to content

Commit

Permalink
server: don't panic in readonly serializable txn
Browse files Browse the repository at this point in the history
Problem: We pass grpc context down to applier in readonly serializable txn.
This context can be cancelled for example due to timeout.
This will trigger panic inside applyTxn

Solution: Only panic for transactions with write operations

fixes etcd-io#14110

Signed-off-by: Bogdan Kanivets <bkanivets@apple.com>
  • Loading branch information
Bogdan Kanivets committed Aug 3, 2022
1 parent 4f0e92d commit f41515b
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 9 deletions.
32 changes: 24 additions & 8 deletions server/etcdserver/txn/txn.go
Expand Up @@ -17,6 +17,7 @@ package txn
import (
"bytes"
"context"
"fmt"
"sort"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
Expand Down Expand Up @@ -265,7 +266,18 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit
txnWrite.End()
txnWrite = kv.Write(trace)
}
applyTxn(ctx, lg, kv, lessor, txnWrite, rt, txnPath, txnResp)
_, err := applyTxn(ctx, lg, kv, lessor, txnWrite, rt, txnPath, txnResp)
if err != nil {
if isWrite {
// end txn to release locks before panic
txnWrite.End()
// When txn with write operations starts it has to be successful
// We don't have a way to recover state in case of write failure
lg.Panic("unexpected error during txn with writes", zap.Error(err))
} else {
lg.Error("unexpected error during readonly txn", zap.Error(err))
}
}
rev := txnWrite.Rev()
if len(txnWrite.Changes()) != 0 {
rev++
Expand All @@ -277,7 +289,7 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit
traceutil.Field{Key: "number_of_response", Value: len(txnResp.Responses)},
traceutil.Field{Key: "response_revision", Value: txnResp.Header.Revision},
)
return txnResp, trace, nil
return txnResp, trace, err
}

// newTxnResp allocates a txn response for a txn request given a path.
Expand Down Expand Up @@ -311,7 +323,7 @@ func newTxnResp(rt *pb.TxnRequest, txnPath []bool) (txnResp *pb.TxnResponse, txn
return txnResp, txnCount
}

func applyTxn(ctx context.Context, lg *zap.Logger, kv mvcc.KV, lessor lease.Lessor, txnWrite mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int) {
func applyTxn(ctx context.Context, lg *zap.Logger, kv mvcc.KV, lessor lease.Lessor, txnWrite mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int, err error) {
trace := traceutil.Get(ctx)
reqs := rt.Success
if !txnPath[0] {
Expand All @@ -328,7 +340,7 @@ func applyTxn(ctx context.Context, lg *zap.Logger, kv mvcc.KV, lessor lease.Less
traceutil.Field{Key: "range_end", Value: string(tv.RequestRange.RangeEnd)})
resp, err := Range(ctx, lg, kv, txnWrite, tv.RequestRange)
if err != nil {
lg.Panic("unexpected error during txnWrite", zap.Error(err))
return 0, fmt.Errorf("applyTxn: failed Range: %w", err)
}
respi.(*pb.ResponseOp_ResponseRange).ResponseRange = resp
trace.StopSubTrace()
Expand All @@ -339,26 +351,30 @@ func applyTxn(ctx context.Context, lg *zap.Logger, kv mvcc.KV, lessor lease.Less
traceutil.Field{Key: "req_size", Value: tv.RequestPut.Size()})
resp, _, err := Put(ctx, lg, lessor, kv, txnWrite, tv.RequestPut)
if err != nil {
lg.Panic("unexpected error during txnWrite", zap.Error(err))
return 0, fmt.Errorf("applyTxn: failed Put: %w", err)
}
respi.(*pb.ResponseOp_ResponsePut).ResponsePut = resp
trace.StopSubTrace()
case *pb.RequestOp_RequestDeleteRange:
resp, err := DeleteRange(kv, txnWrite, tv.RequestDeleteRange)
if err != nil {
lg.Panic("unexpected error during txnWrite", zap.Error(err))
return 0, fmt.Errorf("applyTxn: failed DeleteRange: %w", err)
}
respi.(*pb.ResponseOp_ResponseDeleteRange).ResponseDeleteRange = resp
case *pb.RequestOp_RequestTxn:
resp := respi.(*pb.ResponseOp_ResponseTxn).ResponseTxn
applyTxns := applyTxn(ctx, lg, kv, lessor, txnWrite, tv.RequestTxn, txnPath[1:], resp)
applyTxns, err := applyTxn(ctx, lg, kv, lessor, txnWrite, tv.RequestTxn, txnPath[1:], resp)
if err != nil {
// don't wrap the error. It's a recursive call and err should be already wrapped
return 0, err
}
txns += applyTxns + 1
txnPath = txnPath[applyTxns+1:]
default:
// empty union
}
}
return txns
return txns, nil
}

//---------------------------------------------------------
Expand Down
87 changes: 87 additions & 0 deletions server/etcdserver/txn/txn_test.go
@@ -0,0 +1,87 @@
package txn

import (
"context"
"os"
"strings"
"testing"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/server/v3/lease"
"go.etcd.io/etcd/server/v3/storage/backend"
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
"go.etcd.io/etcd/server/v3/storage/mvcc"
"go.uber.org/zap/zaptest"

"github.com/stretchr/testify/assert"
)

func TestReadonlyTxnError(t *testing.T) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := mvcc.NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, mvcc.StoreConfig{})
defer cleanupStore(s, b, tmpPath)

// setup cancelled context
ctx, cancel := context.WithCancel(context.TODO())
cancel()

// put some data to prevent early termination in rangeKeys
// we are expecting failure on cancelled context check
s.Put([]byte("foo"), []byte("bar"), lease.NoLease)

txn := &pb.TxnRequest{
Success: []*pb.RequestOp{
{
Request: &pb.RequestOp_RequestRange{
RequestRange: &pb.RangeRequest{
Key: []byte("foo"),
},
},
},
},
}

_, _, err := Txn(ctx, zaptest.NewLogger(t), txn, false, s, &lease.FakeLessor{})
if err == nil || !strings.Contains(err.Error(), "applyTxn: failed Range: rangeKeys: context cancelled: context canceled") {
t.Fatalf("Expected context canceled error, got %v", err)
}
}

func TestWriteTxnPanic(t *testing.T) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := mvcc.NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, mvcc.StoreConfig{})
defer cleanupStore(s, b, tmpPath)

// setup cancelled context
ctx, cancel := context.WithCancel(context.TODO())
cancel()

// write txn that puts some data and then fails in range due to cancelled context
txn := &pb.TxnRequest{
Success: []*pb.RequestOp{
{
Request: &pb.RequestOp_RequestPut{
RequestPut: &pb.PutRequest{
Key: []byte("foo"),
Value: []byte("bar"),
},
},
},
{
Request: &pb.RequestOp_RequestRange{
RequestRange: &pb.RangeRequest{
Key: []byte("foo"),
},
},
},
},
}

assert.Panics(t, func() { Txn(ctx, zaptest.NewLogger(t), txn, false, s, &lease.FakeLessor{}) }, "Expected panic in applyTxn")
}

func cleanupStore(s mvcc.KV, b backend.Backend, path string) {
s.Close()
b.Close()
os.Remove(path)
}
3 changes: 2 additions & 1 deletion server/storage/mvcc/kvstore_txn.go
Expand Up @@ -16,6 +16,7 @@ package mvcc

import (
"context"
"fmt"

"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/pkg/v3/traceutil"
Expand Down Expand Up @@ -94,7 +95,7 @@ func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev i
for i, revpair := range revpairs[:len(kvs)] {
select {
case <-ctx.Done():
return nil, ctx.Err()
return nil, fmt.Errorf("rangeKeys: context cancelled: %w", ctx.Err())
default:
}
revToBytes(revpair, revBytes)
Expand Down

0 comments on commit f41515b

Please sign in to comment.