From 5299537f69dd33faefca721f866cda3166c241be Mon Sep 17 00:00:00 2001 From: Brian Kassouf Date: Thu, 23 Apr 2020 11:11:08 -0700 Subject: [PATCH] storage/raft: Fix memory allocation issue and Metadata tracking issues with snapshots (#8793) * storage/raft: Split snapshot restore disk write into batches * Work on snapshot consistency * make sure tests send a snapshot * Fix comment * Don't remove metrics * Fix comment --- go.sum | 1 + physical/raft/fsm.go | 124 +++++++++++++++++++++++++-------- physical/raft/raft_test.go | 67 ++++++++++++++++-- physical/raft/snapshot.go | 21 ++++-- physical/raft/snapshot_test.go | 82 ++++++++++++++++++++++ 5 files changed, 255 insertions(+), 40 deletions(-) diff --git a/go.sum b/go.sum index 06af0e1f304bc..eed22a25adbc8 100644 --- a/go.sum +++ b/go.sum @@ -150,6 +150,7 @@ github.com/coreos/go-oidc v2.1.0+incompatible h1:sdJrfw8akMnCuUlaZU3tE/uYXFgfqom github.com/coreos/go-oidc v2.1.0+incompatible/go.mod h1:CgnwVTmzoESiwO9qyAFEMiHoZ1nMCKZlZ9V6mm3/LKc= github.com/coreos/go-semver v0.2.0 h1:3Jm3tLmsgAYcjC+4Up7hJrFBPr+n7rAqYeSw/SZazuY= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= +github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7 h1:u9SHYsPQNyt5tgDm3YN7+9dYrpK96E5wFilTFWIDZOM= github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e h1:Wf6HqHfScWJN9/ZjdUKyjop4mf3Qdd+1TvvltAvM3m8= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= diff --git a/physical/raft/fsm.go b/physical/raft/fsm.go index 22f525624f15f..7c49fbadd0546 100644 --- a/physical/raft/fsm.go +++ b/physical/raft/fsm.go @@ -88,6 +88,10 @@ type FSM struct { storeLatestState bool chunker *raftchunking.ChunkingBatchingFSM + + // testSnapshotRestoreError is used in tests to simulate an error while + // restoring a snapshot. + testSnapshotRestoreError bool } // NewFSM constructs a FSM using the given directory @@ -193,12 +197,12 @@ func (f *FSM) witnessIndex(i *IndexValue) { } } -func (f *FSM) witnessSnapshot(index, term, configurationIndex uint64, configuration raft.Configuration) error { +func (f *FSM) witnessSnapshot(metadata *raft.SnapshotMeta) error { var indexBytes []byte latestIndex, _ := f.LatestState() - latestIndex.Index = index - latestIndex.Term = term + latestIndex.Index = metadata.Index + latestIndex.Term = metadata.Term var err error indexBytes, err = proto.Marshal(latestIndex) @@ -206,7 +210,7 @@ func (f *FSM) witnessSnapshot(index, term, configurationIndex uint64, configurat return err } - protoConfig := raftConfigurationToProtoConfiguration(configurationIndex, configuration) + protoConfig := raftConfigurationToProtoConfiguration(metadata.ConfigurationIndex, metadata.Configuration) configBytes, err := proto.Marshal(protoConfig) if err != nil { return err @@ -232,8 +236,8 @@ func (f *FSM) witnessSnapshot(index, term, configurationIndex uint64, configurat } } - atomic.StoreUint64(f.latestIndex, index) - atomic.StoreUint64(f.latestTerm, term) + atomic.StoreUint64(f.latestIndex, metadata.Index) + atomic.StoreUint64(f.latestTerm, metadata.Term) f.latestConfig.Store(protoConfig) return nil @@ -241,7 +245,7 @@ func (f *FSM) witnessSnapshot(index, term, configurationIndex uint64, configurat // Delete deletes the given key from the bolt file. func (f *FSM) Delete(ctx context.Context, path string) error { - defer metrics.MeasureSince([]string{"raft", "delete"}, time.Now()) + defer metrics.MeasureSince([]string{"raft_storage", "fsm", "delete"}, time.Now()) f.l.RLock() defer f.l.RUnlock() @@ -253,7 +257,7 @@ func (f *FSM) Delete(ctx context.Context, path string) error { // Delete deletes the given key from the bolt file. func (f *FSM) DeletePrefix(ctx context.Context, prefix string) error { - defer metrics.MeasureSince([]string{"raft", "delete_prefix"}, time.Now()) + defer metrics.MeasureSince([]string{"raft_storage", "fsm", "delete_prefix"}, time.Now()) f.l.RLock() defer f.l.RUnlock() @@ -277,7 +281,9 @@ func (f *FSM) DeletePrefix(ctx context.Context, prefix string) error { // Get retrieves the value at the given path from the bolt file. func (f *FSM) Get(ctx context.Context, path string) (*physical.Entry, error) { + // TODO: Remove this outdated metric name in an older release defer metrics.MeasureSince([]string{"raft", "get"}, time.Now()) + defer metrics.MeasureSince([]string{"raft_storage", "fsm", "get"}, time.Now()) f.l.RLock() defer f.l.RUnlock() @@ -311,7 +317,7 @@ func (f *FSM) Get(ctx context.Context, path string) (*physical.Entry, error) { // Put writes the given entry to the bolt file. func (f *FSM) Put(ctx context.Context, entry *physical.Entry) error { - defer metrics.MeasureSince([]string{"raft", "put"}, time.Now()) + defer metrics.MeasureSince([]string{"raft_storage", "fsm", "put"}, time.Now()) f.l.RLock() defer f.l.RUnlock() @@ -324,7 +330,9 @@ func (f *FSM) Put(ctx context.Context, entry *physical.Entry) error { // List retrieves the set of keys with the given prefix from the bolt file. func (f *FSM) List(ctx context.Context, prefix string) ([]string, error) { + // TODO: Remove this outdated metric name in a future release defer metrics.MeasureSince([]string{"raft", "list"}, time.Now()) + defer metrics.MeasureSince([]string{"raft_storage", "fsm", "list"}, time.Now()) f.l.RLock() defer f.l.RUnlock() @@ -531,6 +539,8 @@ type writeErrorCloser interface { // (size, checksum, etc) and a second for the sink of the data. We also use a // proto delimited writer so we can stream proto messages to the sink. func (f *FSM) writeTo(ctx context.Context, metaSink writeErrorCloser, sink writeErrorCloser) { + defer metrics.MeasureSince([]string{"raft_storage", "fsm", "write_snapshot"}, time.Now()) + protoWriter := protoio.NewDelimitedWriter(sink) metadataProtoWriter := protoio.NewDelimitedWriter(metaSink) @@ -573,7 +583,9 @@ func (f *FSM) writeTo(ctx context.Context, metaSink writeErrorCloser, sink write // Snapshot implements the FSM interface. It returns a noop snapshot object. func (f *FSM) Snapshot() (raft.FSMSnapshot, error) { - return &noopSnapshotter{}, nil + return &noopSnapshotter{ + fsm: f, + }, nil } // SetNoopRestore is used to disable restore operations on raft startup. Because @@ -589,48 +601,91 @@ func (f *FSM) SetNoopRestore(enabled bool) { // first deletes the existing bucket to clear all existing data, then recreates // it so we can copy in the snapshot. func (f *FSM) Restore(r io.ReadCloser) error { + defer metrics.MeasureSince([]string{"raft_storage", "fsm", "restore_snapshot"}, time.Now()) + if f.noopRestore == true { return nil } + snapMeta := r.(*boltSnapshotMetadataReader).Metadata() + protoReader := protoio.NewDelimitedReader(r, math.MaxInt32) defer protoReader.Close() f.l.Lock() defer f.l.Unlock() - // Start a write transaction. + // Delete the existing data bucket and create a new one. + f.logger.Debug("snapshot restore: deleting bucket") err := f.db.Update(func(tx *bolt.Tx) error { err := tx.DeleteBucket(dataBucketName) if err != nil { return err } - b, err := tx.CreateBucket(dataBucketName) + _, err = tx.CreateBucket(dataBucketName) if err != nil { return err } - for { + return nil + }) + if err != nil { + f.logger.Error("could not restore snapshot: could not clear existing bucket", "error", err) + return err + } + + // If we are testing a failed snapshot error here. + if f.testSnapshotRestoreError { + return errors.New("Test error") + } + + f.logger.Debug("snapshot restore: deleting bucket done") + f.logger.Debug("snapshot restore: writing keys") + + var done bool + var keys int + for !done { + err := f.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(dataBucketName) s := new(pb.StorageEntry) - err := protoReader.ReadMsg(s) - if err != nil { - if err == io.EOF { - return nil + + // Commit in batches of 50k. Bolt holds all the data in memory and + // doesn't split the pages until commit so we do incremental writes. + // This is safe since we have a write lock on the fsm's lock. + for i := 0; i < 50000; i++ { + err := protoReader.ReadMsg(s) + if err != nil { + if err == io.EOF { + done = true + return nil + } + return err } - return err - } - err = b.Put([]byte(s.Key), s.Value) - if err != nil { - return err + err = b.Put([]byte(s.Key), s.Value) + if err != nil { + return err + } + keys += 1 } + + return nil + }) + if err != nil { + f.logger.Error("could not restore snapshot", "error", err) + return err } - return nil - }) - if err != nil { - f.logger.Error("could not restore snapshot", "error", err) + f.logger.Trace("snapshot restore: writing keys", "num_written", keys) + } + + f.logger.Debug("snapshot restore: writing keys done") + + // Write the metadata after we have applied all the snapshot data + f.logger.Debug("snapshot restore: writing metadata") + if err := f.witnessSnapshot(snapMeta); err != nil { + f.logger.Error("could not write metadata", "error", err) return err } @@ -639,10 +694,23 @@ func (f *FSM) Restore(r io.ReadCloser) error { // noopSnapshotter implements the fsm.Snapshot interface. It doesn't do anything // since our SnapshotStore reads data out of the FSM on Open(). -type noopSnapshotter struct{} +type noopSnapshotter struct { + fsm *FSM +} -// Persist doesn't do anything. +// Persist implements the fsm.Snapshot interface. It doesn't need to persist any +// state data, but it does persist the raft metadata. This is necessary so we +// can be sure to capture indexes for operation types that are not sent to the +// FSM. func (s *noopSnapshotter) Persist(sink raft.SnapshotSink) error { + boltSnapshotSink := sink.(*BoltSnapshotSink) + + // We are processing a snapshot, fastforward the index, term, and + // configuration to the latest seen by the raft system. + if err := s.fsm.witnessSnapshot(&boltSnapshotSink.meta); err != nil { + return err + } + return nil } diff --git a/physical/raft/raft_test.go b/physical/raft/raft_test.go index 40faa09b18f32..930e46a40d07f 100644 --- a/physical/raft/raft_test.go +++ b/physical/raft/raft_test.go @@ -76,22 +76,77 @@ func getRaftWithDir(t testing.TB, bootstrap bool, noStoreState bool, raftDir str return backend, raftDir } +func connectPeers(nodes ...*RaftBackend) { + for _, node := range nodes { + for _, peer := range nodes { + if node == peer { + continue + } + + node.raftTransport.(*raft.InmemTransport).Connect(raft.ServerAddress(peer.NodeID()), peer.raftTransport) + peer.raftTransport.(*raft.InmemTransport).Connect(raft.ServerAddress(node.NodeID()), node.raftTransport) + } + } +} + +func stepDownLeader(t *testing.T, node *RaftBackend) { + t.Helper() + + if err := node.raft.LeadershipTransfer().Error(); err != nil { + t.Fatal(err) + } + + timeout := time.Now().Add(time.Second * 10) + for !time.Now().After(timeout) { + if err := node.raft.VerifyLeader().Error(); err != nil { + return + } + time.Sleep(100 * time.Millisecond) + } + + t.Fatal("still leader") +} + +func waitForLeader(t *testing.T, nodes ...*RaftBackend) *RaftBackend { + t.Helper() + timeout := time.Now().Add(time.Second * 10) + for !time.Now().After(timeout) { + for _, node := range nodes { + if node.raft.Leader() == raft.ServerAddress(node.NodeID()) { + return node + } + } + time.Sleep(100 * time.Millisecond) + } + + t.Fatal("no leader") + return nil +} + func compareFSMs(t *testing.T, fsm1, fsm2 *FSM) { + t.Helper() + if err := compareFSMsWithErr(t, fsm1, fsm2); err != nil { + t.Fatal(err) + } +} + +func compareFSMsWithErr(t *testing.T, fsm1, fsm2 *FSM) error { t.Helper() index1, config1 := fsm1.LatestState() index2, config2 := fsm2.LatestState() if !proto.Equal(index1, index2) { - t.Fatalf("indexes did not match: %+v != %+v", index1, index2) + return fmt.Errorf("indexes did not match: %+v != %+v", index1, index2) } if !proto.Equal(config1, config2) { - t.Fatalf("configs did not match: %+v != %+v", config1, config2) + return fmt.Errorf("configs did not match: %+v != %+v", config1, config2) } - compareDBs(t, fsm1.db, fsm2.db) + return compareDBs(t, fsm1.db, fsm2.db) } -func compareDBs(t *testing.T, boltDB1, boltDB2 *bolt.DB) { +func compareDBs(t *testing.T, boltDB1, boltDB2 *bolt.DB) error { + t.Helper() db1 := make(map[string]string) db2 := make(map[string]string) @@ -135,8 +190,10 @@ func compareDBs(t *testing.T, boltDB1, boltDB2 *bolt.DB) { } if diff := deep.Equal(db1, db2); diff != nil { - t.Fatal(diff) + return fmt.Errorf("%+v", diff) } + + return nil } func TestRaft_Backend(t *testing.T) { diff --git a/physical/raft/snapshot.go b/physical/raft/snapshot.go index 8538778b5d06a..7139cce7d9d7b 100644 --- a/physical/raft/snapshot.go +++ b/physical/raft/snapshot.go @@ -104,13 +104,6 @@ func (f *BoltSnapshotStore) Create(version raft.SnapshotVersion, index, term uin return nil, fmt.Errorf("unsupported snapshot version %d", version) } - // We are processing a snapshot, fastforward the index, term, and - // configuration to the latest seen by the raft system. This could include - // log indexes for operation types that are never sent to the FSM. - if err := f.fsm.witnessSnapshot(index, term, configurationIndex, configuration); err != nil { - return nil, err - } - // Create the sink sink := &BoltSnapshotSink{ store: f, @@ -208,6 +201,11 @@ func (f *BoltSnapshotStore) Open(id string) (*raft.SnapshotMeta, io.ReadCloser, if err != nil { return nil, nil, err } + + readCloser = &boltSnapshotMetadataReader{ + meta: meta, + ReadCloser: readCloser, + } } return meta, readCloser, nil @@ -286,3 +284,12 @@ func (s *BoltSnapshotSink) Cancel() error { return nil } + +type boltSnapshotMetadataReader struct { + io.ReadCloser + meta *raft.SnapshotMeta +} + +func (r *boltSnapshotMetadataReader) Metadata() *raft.SnapshotMeta { + return r.meta +} diff --git a/physical/raft/snapshot_test.go b/physical/raft/snapshot_test.go index 5851a2c0e9185..33b1d1b22e821 100644 --- a/physical/raft/snapshot_test.go +++ b/physical/raft/snapshot_test.go @@ -345,6 +345,88 @@ func TestRaft_Snapshot_Restart(t *testing.T) { compareFSMs(t, raft1.fsm, raft2.fsm) } +func TestRaft_Snapshot_ErrorRecovery(t *testing.T) { + raft1, dir := getRaft(t, true, false) + raft2, dir2 := getRaft(t, false, false) + raft3, dir3 := getRaft(t, false, false) + defer os.RemoveAll(dir) + defer os.RemoveAll(dir2) + defer os.RemoveAll(dir3) + + // Add raft2 to the cluster + addPeer(t, raft1, raft2) + + // Write some data + for i := 0; i < 100; i++ { + err := raft1.Put(context.Background(), &physical.Entry{ + Key: fmt.Sprintf("key-%d", i), + Value: []byte(fmt.Sprintf("value-%d", i)), + }) + if err != nil { + t.Fatal(err) + } + } + + // Take a snapshot on each node to ensure we no longer have older logs + snapFuture := raft1.raft.Snapshot() + if err := snapFuture.Error(); err != nil { + t.Fatal(err) + } + + stepDownLeader(t, raft1) + leader := waitForLeader(t, raft1, raft2) + + snapFuture = leader.raft.Snapshot() + if err := snapFuture.Error(); err != nil { + t.Fatal(err) + } + + // Advance FSM's index past snapshot index + leader.Put(context.Background(), &physical.Entry{ + Key: "key", + Value: []byte("value"), + }) + + // Error on snapshot restore + raft3.fsm.testSnapshotRestoreError = true + + // Add raft3 to the cluster + addPeer(t, leader, raft3) + + time.Sleep(2 * time.Second) + + // Restart the failing node to make sure fresh state does not have invalid + // values. + if err := raft3.TeardownCluster(nil); err != nil { + t.Fatal(err) + } + + // Ensure the databases are not equal + if err := compareFSMsWithErr(t, leader.fsm, raft3.fsm); err == nil { + t.Fatal("nil error") + } + + // Remove error and make sure we can reconcile state + raft3.fsm.testSnapshotRestoreError = false + + // Step down leader node + stepDownLeader(t, leader) + leader = waitForLeader(t, raft1, raft2) + + // Start Raft3 + if err := raft3.SetupCluster(context.Background(), SetupOpts{}); err != nil { + t.Fatal(err) + } + + connectPeers(raft1, raft2, raft3) + waitForLeader(t, raft1, raft2) + + time.Sleep(5 * time.Second) + + // Make sure state gets re-replicated. + compareFSMs(t, raft1.fsm, raft3.fsm) +} + func TestRaft_Snapshot_Take_Restore(t *testing.T) { raft1, dir := getRaft(t, true, false) defer os.RemoveAll(dir)