Skip to content

Commit

Permalink
storage/raft: Fix memory allocation issue and Metadata tracking issue…
Browse files Browse the repository at this point in the history
…s with snapshots (#8793)

* storage/raft: Split snapshot restore disk write into batches

* Work on snapshot consistency

* make sure tests send a snapshot

* Fix comment

* Don't remove metrics

* Fix comment
  • Loading branch information
briankassouf committed Apr 23, 2020
1 parent 7807d45 commit 5299537
Show file tree
Hide file tree
Showing 5 changed files with 255 additions and 40 deletions.
1 change: 1 addition & 0 deletions go.sum
Expand Up @@ -150,6 +150,7 @@ github.com/coreos/go-oidc v2.1.0+incompatible h1:sdJrfw8akMnCuUlaZU3tE/uYXFgfqom
github.com/coreos/go-oidc v2.1.0+incompatible/go.mod h1:CgnwVTmzoESiwO9qyAFEMiHoZ1nMCKZlZ9V6mm3/LKc=
github.com/coreos/go-semver v0.2.0 h1:3Jm3tLmsgAYcjC+4Up7hJrFBPr+n7rAqYeSw/SZazuY=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7 h1:u9SHYsPQNyt5tgDm3YN7+9dYrpK96E5wFilTFWIDZOM=
github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e h1:Wf6HqHfScWJN9/ZjdUKyjop4mf3Qdd+1TvvltAvM3m8=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
Expand Down
124 changes: 96 additions & 28 deletions physical/raft/fsm.go
Expand Up @@ -88,6 +88,10 @@ type FSM struct {
storeLatestState bool

chunker *raftchunking.ChunkingBatchingFSM

// testSnapshotRestoreError is used in tests to simulate an error while
// restoring a snapshot.
testSnapshotRestoreError bool
}

// NewFSM constructs a FSM using the given directory
Expand Down Expand Up @@ -193,20 +197,20 @@ func (f *FSM) witnessIndex(i *IndexValue) {
}
}

func (f *FSM) witnessSnapshot(index, term, configurationIndex uint64, configuration raft.Configuration) error {
func (f *FSM) witnessSnapshot(metadata *raft.SnapshotMeta) error {
var indexBytes []byte
latestIndex, _ := f.LatestState()

latestIndex.Index = index
latestIndex.Term = term
latestIndex.Index = metadata.Index
latestIndex.Term = metadata.Term

var err error
indexBytes, err = proto.Marshal(latestIndex)
if err != nil {
return err
}

protoConfig := raftConfigurationToProtoConfiguration(configurationIndex, configuration)
protoConfig := raftConfigurationToProtoConfiguration(metadata.ConfigurationIndex, metadata.Configuration)
configBytes, err := proto.Marshal(protoConfig)
if err != nil {
return err
Expand All @@ -232,16 +236,16 @@ func (f *FSM) witnessSnapshot(index, term, configurationIndex uint64, configurat
}
}

atomic.StoreUint64(f.latestIndex, index)
atomic.StoreUint64(f.latestTerm, term)
atomic.StoreUint64(f.latestIndex, metadata.Index)
atomic.StoreUint64(f.latestTerm, metadata.Term)
f.latestConfig.Store(protoConfig)

return nil
}

// Delete deletes the given key from the bolt file.
func (f *FSM) Delete(ctx context.Context, path string) error {
defer metrics.MeasureSince([]string{"raft", "delete"}, time.Now())
defer metrics.MeasureSince([]string{"raft_storage", "fsm", "delete"}, time.Now())

f.l.RLock()
defer f.l.RUnlock()
Expand All @@ -253,7 +257,7 @@ func (f *FSM) Delete(ctx context.Context, path string) error {

// Delete deletes the given key from the bolt file.
func (f *FSM) DeletePrefix(ctx context.Context, prefix string) error {
defer metrics.MeasureSince([]string{"raft", "delete_prefix"}, time.Now())
defer metrics.MeasureSince([]string{"raft_storage", "fsm", "delete_prefix"}, time.Now())

f.l.RLock()
defer f.l.RUnlock()
Expand All @@ -277,7 +281,9 @@ func (f *FSM) DeletePrefix(ctx context.Context, prefix string) error {

// Get retrieves the value at the given path from the bolt file.
func (f *FSM) Get(ctx context.Context, path string) (*physical.Entry, error) {
// TODO: Remove this outdated metric name in an older release
defer metrics.MeasureSince([]string{"raft", "get"}, time.Now())
defer metrics.MeasureSince([]string{"raft_storage", "fsm", "get"}, time.Now())

f.l.RLock()
defer f.l.RUnlock()
Expand Down Expand Up @@ -311,7 +317,7 @@ func (f *FSM) Get(ctx context.Context, path string) (*physical.Entry, error) {

// Put writes the given entry to the bolt file.
func (f *FSM) Put(ctx context.Context, entry *physical.Entry) error {
defer metrics.MeasureSince([]string{"raft", "put"}, time.Now())
defer metrics.MeasureSince([]string{"raft_storage", "fsm", "put"}, time.Now())

f.l.RLock()
defer f.l.RUnlock()
Expand All @@ -324,7 +330,9 @@ func (f *FSM) Put(ctx context.Context, entry *physical.Entry) error {

// List retrieves the set of keys with the given prefix from the bolt file.
func (f *FSM) List(ctx context.Context, prefix string) ([]string, error) {
// TODO: Remove this outdated metric name in a future release
defer metrics.MeasureSince([]string{"raft", "list"}, time.Now())
defer metrics.MeasureSince([]string{"raft_storage", "fsm", "list"}, time.Now())

f.l.RLock()
defer f.l.RUnlock()
Expand Down Expand Up @@ -531,6 +539,8 @@ type writeErrorCloser interface {
// (size, checksum, etc) and a second for the sink of the data. We also use a
// proto delimited writer so we can stream proto messages to the sink.
func (f *FSM) writeTo(ctx context.Context, metaSink writeErrorCloser, sink writeErrorCloser) {
defer metrics.MeasureSince([]string{"raft_storage", "fsm", "write_snapshot"}, time.Now())

protoWriter := protoio.NewDelimitedWriter(sink)
metadataProtoWriter := protoio.NewDelimitedWriter(metaSink)

Expand Down Expand Up @@ -573,7 +583,9 @@ func (f *FSM) writeTo(ctx context.Context, metaSink writeErrorCloser, sink write

// Snapshot implements the FSM interface. It returns a noop snapshot object.
func (f *FSM) Snapshot() (raft.FSMSnapshot, error) {
return &noopSnapshotter{}, nil
return &noopSnapshotter{
fsm: f,
}, nil
}

// SetNoopRestore is used to disable restore operations on raft startup. Because
Expand All @@ -589,48 +601,91 @@ func (f *FSM) SetNoopRestore(enabled bool) {
// first deletes the existing bucket to clear all existing data, then recreates
// it so we can copy in the snapshot.
func (f *FSM) Restore(r io.ReadCloser) error {
defer metrics.MeasureSince([]string{"raft_storage", "fsm", "restore_snapshot"}, time.Now())

if f.noopRestore == true {
return nil
}

snapMeta := r.(*boltSnapshotMetadataReader).Metadata()

protoReader := protoio.NewDelimitedReader(r, math.MaxInt32)
defer protoReader.Close()

f.l.Lock()
defer f.l.Unlock()

// Start a write transaction.
// Delete the existing data bucket and create a new one.
f.logger.Debug("snapshot restore: deleting bucket")
err := f.db.Update(func(tx *bolt.Tx) error {
err := tx.DeleteBucket(dataBucketName)
if err != nil {
return err
}

b, err := tx.CreateBucket(dataBucketName)
_, err = tx.CreateBucket(dataBucketName)
if err != nil {
return err
}

for {
return nil
})
if err != nil {
f.logger.Error("could not restore snapshot: could not clear existing bucket", "error", err)
return err
}

// If we are testing a failed snapshot error here.
if f.testSnapshotRestoreError {
return errors.New("Test error")
}

f.logger.Debug("snapshot restore: deleting bucket done")
f.logger.Debug("snapshot restore: writing keys")

var done bool
var keys int
for !done {
err := f.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(dataBucketName)
s := new(pb.StorageEntry)
err := protoReader.ReadMsg(s)
if err != nil {
if err == io.EOF {
return nil

// Commit in batches of 50k. Bolt holds all the data in memory and
// doesn't split the pages until commit so we do incremental writes.
// This is safe since we have a write lock on the fsm's lock.
for i := 0; i < 50000; i++ {
err := protoReader.ReadMsg(s)
if err != nil {
if err == io.EOF {
done = true
return nil
}
return err
}
return err
}

err = b.Put([]byte(s.Key), s.Value)
if err != nil {
return err
err = b.Put([]byte(s.Key), s.Value)
if err != nil {
return err
}
keys += 1
}

return nil
})
if err != nil {
f.logger.Error("could not restore snapshot", "error", err)
return err
}

return nil
})
if err != nil {
f.logger.Error("could not restore snapshot", "error", err)
f.logger.Trace("snapshot restore: writing keys", "num_written", keys)
}

f.logger.Debug("snapshot restore: writing keys done")

// Write the metadata after we have applied all the snapshot data
f.logger.Debug("snapshot restore: writing metadata")
if err := f.witnessSnapshot(snapMeta); err != nil {
f.logger.Error("could not write metadata", "error", err)
return err
}

Expand All @@ -639,10 +694,23 @@ func (f *FSM) Restore(r io.ReadCloser) error {

// noopSnapshotter implements the fsm.Snapshot interface. It doesn't do anything
// since our SnapshotStore reads data out of the FSM on Open().
type noopSnapshotter struct{}
type noopSnapshotter struct {
fsm *FSM
}

// Persist doesn't do anything.
// Persist implements the fsm.Snapshot interface. It doesn't need to persist any
// state data, but it does persist the raft metadata. This is necessary so we
// can be sure to capture indexes for operation types that are not sent to the
// FSM.
func (s *noopSnapshotter) Persist(sink raft.SnapshotSink) error {
boltSnapshotSink := sink.(*BoltSnapshotSink)

// We are processing a snapshot, fastforward the index, term, and
// configuration to the latest seen by the raft system.
if err := s.fsm.witnessSnapshot(&boltSnapshotSink.meta); err != nil {
return err
}

return nil
}

Expand Down
67 changes: 62 additions & 5 deletions physical/raft/raft_test.go
Expand Up @@ -76,22 +76,77 @@ func getRaftWithDir(t testing.TB, bootstrap bool, noStoreState bool, raftDir str
return backend, raftDir
}

func connectPeers(nodes ...*RaftBackend) {
for _, node := range nodes {
for _, peer := range nodes {
if node == peer {
continue
}

node.raftTransport.(*raft.InmemTransport).Connect(raft.ServerAddress(peer.NodeID()), peer.raftTransport)
peer.raftTransport.(*raft.InmemTransport).Connect(raft.ServerAddress(node.NodeID()), node.raftTransport)
}
}
}

func stepDownLeader(t *testing.T, node *RaftBackend) {
t.Helper()

if err := node.raft.LeadershipTransfer().Error(); err != nil {
t.Fatal(err)
}

timeout := time.Now().Add(time.Second * 10)
for !time.Now().After(timeout) {
if err := node.raft.VerifyLeader().Error(); err != nil {
return
}
time.Sleep(100 * time.Millisecond)
}

t.Fatal("still leader")
}

func waitForLeader(t *testing.T, nodes ...*RaftBackend) *RaftBackend {
t.Helper()
timeout := time.Now().Add(time.Second * 10)
for !time.Now().After(timeout) {
for _, node := range nodes {
if node.raft.Leader() == raft.ServerAddress(node.NodeID()) {
return node
}
}
time.Sleep(100 * time.Millisecond)
}

t.Fatal("no leader")
return nil
}

func compareFSMs(t *testing.T, fsm1, fsm2 *FSM) {
t.Helper()
if err := compareFSMsWithErr(t, fsm1, fsm2); err != nil {
t.Fatal(err)
}
}

func compareFSMsWithErr(t *testing.T, fsm1, fsm2 *FSM) error {
t.Helper()
index1, config1 := fsm1.LatestState()
index2, config2 := fsm2.LatestState()

if !proto.Equal(index1, index2) {
t.Fatalf("indexes did not match: %+v != %+v", index1, index2)
return fmt.Errorf("indexes did not match: %+v != %+v", index1, index2)
}
if !proto.Equal(config1, config2) {
t.Fatalf("configs did not match: %+v != %+v", config1, config2)
return fmt.Errorf("configs did not match: %+v != %+v", config1, config2)
}

compareDBs(t, fsm1.db, fsm2.db)
return compareDBs(t, fsm1.db, fsm2.db)
}

func compareDBs(t *testing.T, boltDB1, boltDB2 *bolt.DB) {
func compareDBs(t *testing.T, boltDB1, boltDB2 *bolt.DB) error {
t.Helper()
db1 := make(map[string]string)
db2 := make(map[string]string)

Expand Down Expand Up @@ -135,8 +190,10 @@ func compareDBs(t *testing.T, boltDB1, boltDB2 *bolt.DB) {
}

if diff := deep.Equal(db1, db2); diff != nil {
t.Fatal(diff)
return fmt.Errorf("%+v", diff)
}

return nil
}

func TestRaft_Backend(t *testing.T) {
Expand Down
21 changes: 14 additions & 7 deletions physical/raft/snapshot.go
Expand Up @@ -104,13 +104,6 @@ func (f *BoltSnapshotStore) Create(version raft.SnapshotVersion, index, term uin
return nil, fmt.Errorf("unsupported snapshot version %d", version)
}

// We are processing a snapshot, fastforward the index, term, and
// configuration to the latest seen by the raft system. This could include
// log indexes for operation types that are never sent to the FSM.
if err := f.fsm.witnessSnapshot(index, term, configurationIndex, configuration); err != nil {
return nil, err
}

// Create the sink
sink := &BoltSnapshotSink{
store: f,
Expand Down Expand Up @@ -208,6 +201,11 @@ func (f *BoltSnapshotStore) Open(id string) (*raft.SnapshotMeta, io.ReadCloser,
if err != nil {
return nil, nil, err
}

readCloser = &boltSnapshotMetadataReader{
meta: meta,
ReadCloser: readCloser,
}
}

return meta, readCloser, nil
Expand Down Expand Up @@ -286,3 +284,12 @@ func (s *BoltSnapshotSink) Cancel() error {

return nil
}

type boltSnapshotMetadataReader struct {
io.ReadCloser
meta *raft.SnapshotMeta
}

func (r *boltSnapshotMetadataReader) Metadata() *raft.SnapshotMeta {
return r.meta
}

0 comments on commit 5299537

Please sign in to comment.