From 3a069916b9168b5616948e0ea7ad7eec90dfe8c6 Mon Sep 17 00:00:00 2001 From: Nick Cabatoff Date: Thu, 19 Aug 2021 18:03:56 +0200 Subject: [PATCH] Check to make sure context isn't expired before doing a raft operation. (#12162) --- changelog/12162.txt | 3 +++ physical/raft/raft.go | 54 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+) create mode 100644 changelog/12162.txt diff --git a/changelog/12162.txt b/changelog/12162.txt new file mode 100644 index 0000000000000..70c943ac86b14 --- /dev/null +++ b/changelog/12162.txt @@ -0,0 +1,3 @@ +```release-note:improvement +storage/raft: Best-effort handling of cancelled contexts. +``` diff --git a/physical/raft/raft.go b/physical/raft/raft.go index b865697bbd435..47b96d7c0dcbe 100644 --- a/physical/raft/raft.go +++ b/physical/raft/raft.go @@ -978,6 +978,10 @@ func (b *RaftBackend) RemovePeer(ctx context.Context, peerID string) error { b.l.RLock() defer b.l.RUnlock() + if err := ctx.Err(); err != nil { + return err + } + if b.disableAutopilot { if b.raft == nil { return errors.New("raft storage is not initialized") @@ -1034,6 +1038,10 @@ func (b *RaftBackend) GetConfigurationOffline() (*RaftConfigurationResponse, err } func (b *RaftBackend) GetConfiguration(ctx context.Context) (*RaftConfigurationResponse, error) { + if err := ctx.Err(); err != nil { + return nil, err + } + b.l.RLock() defer b.l.RUnlock() @@ -1068,6 +1076,10 @@ func (b *RaftBackend) GetConfiguration(ctx context.Context) (*RaftConfigurationR // AddPeer adds a new server to the raft cluster func (b *RaftBackend) AddPeer(ctx context.Context, peerID, clusterAddr string) error { + if err := ctx.Err(); err != nil { + return err + } + b.l.RLock() defer b.l.RUnlock() @@ -1096,6 +1108,10 @@ func (b *RaftBackend) AddPeer(ctx context.Context, peerID, clusterAddr string) e // Peers returns all the servers present in the raft cluster func (b *RaftBackend) Peers(ctx context.Context) ([]Peer, error) { + if err := ctx.Err(); err != nil { + return nil, err + } + b.l.RLock() defer b.l.RUnlock() @@ -1180,6 +1196,10 @@ func (b *RaftBackend) WriteSnapshotToTemp(in io.ReadCloser, access *seal.Access) // RestoreSnapshot applies the provided snapshot metadata and snapshot data to // raft. func (b *RaftBackend) RestoreSnapshot(ctx context.Context, metadata raft.SnapshotMeta, snap io.Reader) error { + if err := ctx.Err(); err != nil { + return err + } + b.l.RLock() defer b.l.RUnlock() @@ -1214,6 +1234,11 @@ func (b *RaftBackend) RestoreSnapshot(ctx context.Context, metadata raft.Snapsho // Delete inserts an entry in the log to delete the given path func (b *RaftBackend) Delete(ctx context.Context, path string) error { defer metrics.MeasureSince([]string{"raft-storage", "delete"}, time.Now()) + + if err := ctx.Err(); err != nil { + return err + } + command := &LogData{ Operations: []*LogOperation{ { @@ -1238,9 +1263,17 @@ func (b *RaftBackend) Get(ctx context.Context, path string) (*physical.Entry, er return nil, errors.New("raft: fsm not configured") } + if err := ctx.Err(); err != nil { + return nil, err + } + b.permitPool.Acquire() defer b.permitPool.Release() + if err := ctx.Err(); err != nil { + return nil, err + } + entry, err := b.fsm.Get(ctx, path) if entry != nil { valueLen := len(entry.Value) @@ -1258,6 +1291,11 @@ func (b *RaftBackend) Get(ctx context.Context, path string) (*physical.Entry, er // or if the call to applyLog fails. func (b *RaftBackend) Put(ctx context.Context, entry *physical.Entry) error { defer metrics.MeasureSince([]string{"raft-storage", "put"}, time.Now()) + + if err := ctx.Err(); err != nil { + return err + } + command := &LogData{ Operations: []*LogOperation{ { @@ -1284,9 +1322,17 @@ func (b *RaftBackend) List(ctx context.Context, prefix string) ([]string, error) return nil, errors.New("raft: fsm not configured") } + if err := ctx.Err(); err != nil { + return nil, err + } + b.permitPool.Acquire() defer b.permitPool.Release() + if err := ctx.Err(); err != nil { + return nil, err + } + return b.fsm.List(ctx, prefix) } @@ -1294,6 +1340,11 @@ func (b *RaftBackend) List(ctx context.Context, prefix string) ([]string, error) // applies it. func (b *RaftBackend) Transaction(ctx context.Context, txns []*physical.TxnEntry) error { defer metrics.MeasureSince([]string{"raft-storage", "transaction"}, time.Now()) + + if err := ctx.Err(); err != nil { + return err + } + command := &LogData{ Operations: make([]*LogOperation, len(txns)), } @@ -1330,6 +1381,9 @@ func (b *RaftBackend) applyLog(ctx context.Context, command *LogData) error { if b.raft == nil { return errors.New("raft storage is not initialized") } + if err := ctx.Err(); err != nil { + return err + } commandBytes, err := proto.Marshal(command) if err != nil {