diff --git a/server/filestore.go b/server/filestore.go index 58b8ca2aaa..2c37c43f83 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -315,6 +315,11 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim // Always track per subject information. fs.tms = true + // Recover our message state. + if err := fs.recoverMsgs(); err != nil { + return nil, err + } + // Write our meta data iff does not exist. meta := path.Join(fcfg.StoreDir, JetStreamMetaFile) if _, err := os.Stat(meta); err != nil && os.IsNotExist(err) { @@ -323,11 +328,6 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim } } - // Recover our message state. - if err := fs.recoverMsgs(); err != nil { - return nil, err - } - // If we expect to be encrypted check that what we are restoring is not plaintext. // This can happen on snapshot restores or conversions. if fs.prf != nil { @@ -4426,6 +4426,13 @@ func (mb *msgBlock) generatePerSubjectInfo() error { for seq := fseq; seq <= lseq; seq++ { sm, err := mb.cacheLookup(seq) if err != nil { + // Since we are walking by sequence we can ignore some errors that are benign to rebuilding our state. + if err == ErrStoreMsgNotFound || err == errDeletedMsg { + continue + } + if err == errNoCache { + return nil + } return err } if sm != nil && len(sm.subj) > 0 { diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 96b03da85b..c22cce1005 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -15209,6 +15209,51 @@ func TestJetStreamPullConsumerHeartBeats(t *testing.T) { } } +func TestJetStreamRecoverStreamWithDeletedMessagesNonCleanShutdown(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{Name: "T"}) + require_NoError(t, err) + + for i := 0; i < 100; i++ { + js.Publish("T", []byte("OK")) + } + + js.DeleteMsg("T", 22) + + // Now we need a non-clean shutdown. + // For this use case that means we do *not* write the fss file. + sd := s.JetStreamConfig().StoreDir + fss := path.Join(sd, "$G", "streams", "T", "msgs", "1.fss") + + // Stop current + nc.Close() + s.Shutdown() + + // Remove fss file to simulate a non-clean shutdown. + err = os.Remove(fss) + require_NoError(t, err) + + // Restart. + s = RunJetStreamServerOnPort(-1, sd) + defer s.Shutdown() + + nc, js = jsClientConnect(t, s) + defer nc.Close() + + // Make sure we recovered our stream + _, err = js.StreamInfo("T") + require_NoError(t, err) +} + /////////////////////////////////////////////////////////////////////////// // Simple JetStream Benchmarks ///////////////////////////////////////////////////////////////////////////