Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't recover stream on restart with deleted msg error. #2900

Merged
merged 2 commits into from Mar 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 12 additions & 5 deletions server/filestore.go
Expand Up @@ -315,6 +315,11 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
// Always track per subject information.
fs.tms = true

// Recover our message state.
if err := fs.recoverMsgs(); err != nil {
return nil, err
}

// Write our meta data iff does not exist.
meta := path.Join(fcfg.StoreDir, JetStreamMetaFile)
if _, err := os.Stat(meta); err != nil && os.IsNotExist(err) {
Expand All @@ -323,11 +328,6 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
}
}

// Recover our message state.
if err := fs.recoverMsgs(); err != nil {
return nil, err
}

// If we expect to be encrypted check that what we are restoring is not plaintext.
// This can happen on snapshot restores or conversions.
if fs.prf != nil {
Expand Down Expand Up @@ -4426,6 +4426,13 @@ func (mb *msgBlock) generatePerSubjectInfo() error {
for seq := fseq; seq <= lseq; seq++ {
sm, err := mb.cacheLookup(seq)
if err != nil {
// Since we are walking by sequence we can ignore some errors that are benign to rebuilding our state.
kozlovic marked this conversation as resolved.
Show resolved Hide resolved
if err == ErrStoreMsgNotFound || err == errDeletedMsg {
continue
}
if err == errNoCache {
return nil
}
return err
}
if sm != nil && len(sm.subj) > 0 {
Expand Down
45 changes: 45 additions & 0 deletions server/jetstream_test.go
Expand Up @@ -15209,6 +15209,51 @@ func TestJetStreamPullConsumerHeartBeats(t *testing.T) {
}
}

func TestJetStreamRecoverStreamWithDeletedMessagesNonCleanShutdown(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{Name: "T"})
require_NoError(t, err)

for i := 0; i < 100; i++ {
js.Publish("T", []byte("OK"))
}

js.DeleteMsg("T", 22)

// Now we need a non-clean shutdown.
// For this use case that means we do *not* write the fss file.
sd := s.JetStreamConfig().StoreDir
fss := path.Join(sd, "$G", "streams", "T", "msgs", "1.fss")

// Stop current
nc.Close()
s.Shutdown()

// Remove fss file to simulate a non-clean shutdown.
err = os.Remove(fss)
require_NoError(t, err)

// Restart.
s = RunJetStreamServerOnPort(-1, sd)
defer s.Shutdown()

nc, js = jsClientConnect(t, s)
defer nc.Close()

// Make sure we recovered our stream
_, err = js.StreamInfo("T")
require_NoError(t, err)
Comment on lines +15253 to +15254
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check for 99 messages and that js.GetMsg("T", 22) fails?

Am I right to assume that that upon recovery the missing fss file should exist again?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have hundreds of tests that already do this. I understand where you are coming from and actually started to write that and was like, the stream info message count deleted messages have tests everywhere for over a year. So I say no.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meaning I do not believe it buys us anything over the current tests that are in place.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bug was in rebuilding the fss state. That file does not actually exists during the server running, only on exit as a short cut on restart.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also just as an fyi, I always write the test to fail first and it did with "stream not found", which is what was reported, so as long as the fix had the stream being found I think we are good, IMO.

}

///////////////////////////////////////////////////////////////////////////
// Simple JetStream Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down