Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow shutdown during network snapshot transfer #588

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
58 changes: 38 additions & 20 deletions raft.go
Expand Up @@ -1750,27 +1750,46 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
return
}

// Separately track the progress of streaming a snapshot over the network
// because this too can take a long time.
countingRPCReader := newCountingReader(rpc.Reader)

// Spill the remote snapshot to disk
transferMonitor := startSnapshotRestoreMonitor(r.logger, countingRPCReader, req.Size, true)
n, err := io.Copy(sink, countingRPCReader)
transferMonitor.StopAndWait()
if err != nil {
sink.Cancel()
r.logger.Error("failed to copy snapshot", "error", err)
rpcErr = err
return
}
// Spill the remote snapshot to disk.
// Spawn a goroutine to copy the snapshot, so we can handle a
// shutdown signal as well.
diskCopyErrCh := make(chan error, 1)
go func() {
// Separately track the progress of streaming a snapshot over the network
// because this too can take a long time.
countingRPCReader := newCountingReader(rpc.Reader)
transferMonitor := startSnapshotRestoreMonitor(r.logger, countingRPCReader, req.Size, true)
n, err := io.Copy(sink, countingRPCReader)
transferMonitor.StopAndWait()
if err != nil {
r.logger.Error("failed to copy snapshot", "error", err)
diskCopyErrCh <- err
return
}

// Check that we received it all
if n != req.Size {
// Check that we received it all
if n != req.Size {
r.logger.Error("failed to receive whole snapshot",
"received", hclog.Fmt("%d / %d", n, req.Size))
diskCopyErrCh <- fmt.Errorf("short read")
return
}

r.logger.Info("copied to local snapshot", "bytes", n)
diskCopyErrCh <- nil
}()

// Wait for snapshot transfer or shutdown
select {
case err := <-diskCopyErrCh:
if err != nil {
sink.Cancel()
rpcErr = err
return
}
case <-r.shutdownCh:
sink.Cancel()
r.logger.Error("failed to receive whole snapshot",
"received", hclog.Fmt("%d / %d", n, req.Size))
rpcErr = fmt.Errorf("short read")
rpcErr = ErrRaftShutdown
return
}

Expand All @@ -1780,7 +1799,6 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
rpcErr = err
return
}
r.logger.Info("copied to local snapshot", "bytes", n)

// Restore snapshot
future := &restoreFuture{ID: sink.ID()}
Expand Down