Skip to content

Commit

Permalink
Merge pull request #2900 from nats-io/fss-recover-bug
Browse files Browse the repository at this point in the history
Can't recover stream on restart with deleted msg error.
  • Loading branch information
derekcollison committed Mar 4, 2022
2 parents 77bce19 + b759ff4 commit adfcfdb
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 5 deletions.
17 changes: 12 additions & 5 deletions server/filestore.go
Expand Up @@ -315,6 +315,11 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
// Always track per subject information.
fs.tms = true

// Recover our message state.
if err := fs.recoverMsgs(); err != nil {
return nil, err
}

// Write our meta data iff does not exist.
meta := path.Join(fcfg.StoreDir, JetStreamMetaFile)
if _, err := os.Stat(meta); err != nil && os.IsNotExist(err) {
Expand All @@ -323,11 +328,6 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
}
}

// Recover our message state.
if err := fs.recoverMsgs(); err != nil {
return nil, err
}

// If we expect to be encrypted check that what we are restoring is not plaintext.
// This can happen on snapshot restores or conversions.
if fs.prf != nil {
Expand Down Expand Up @@ -4426,6 +4426,13 @@ func (mb *msgBlock) generatePerSubjectInfo() error {
for seq := fseq; seq <= lseq; seq++ {
sm, err := mb.cacheLookup(seq)
if err != nil {
// Since we are walking by sequence we can ignore some errors that are benign to rebuilding our state.
if err == ErrStoreMsgNotFound || err == errDeletedMsg {
continue
}
if err == errNoCache {
return nil
}
return err
}
if sm != nil && len(sm.subj) > 0 {
Expand Down
45 changes: 45 additions & 0 deletions server/jetstream_test.go
Expand Up @@ -15209,6 +15209,51 @@ func TestJetStreamPullConsumerHeartBeats(t *testing.T) {
}
}

func TestJetStreamRecoverStreamWithDeletedMessagesNonCleanShutdown(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{Name: "T"})
require_NoError(t, err)

for i := 0; i < 100; i++ {
js.Publish("T", []byte("OK"))
}

js.DeleteMsg("T", 22)

// Now we need a non-clean shutdown.
// For this use case that means we do *not* write the fss file.
sd := s.JetStreamConfig().StoreDir
fss := path.Join(sd, "$G", "streams", "T", "msgs", "1.fss")

// Stop current
nc.Close()
s.Shutdown()

// Remove fss file to simulate a non-clean shutdown.
err = os.Remove(fss)
require_NoError(t, err)

// Restart.
s = RunJetStreamServerOnPort(-1, sd)
defer s.Shutdown()

nc, js = jsClientConnect(t, s)
defer nc.Close()

// Make sure we recovered our stream
_, err = js.StreamInfo("T")
require_NoError(t, err)
}

///////////////////////////////////////////////////////////////////////////
// Simple JetStream Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down

0 comments on commit adfcfdb

Please sign in to comment.