From 2384c4c71d11df1305fee631599e898987e1f41b Mon Sep 17 00:00:00 2001 From: Rob Adams Date: Mon, 27 Jun 2022 23:23:14 -0700 Subject: [PATCH] Fix possibility to generate duplicate IDs in XADD The existing code is attempting to avoid duplicate IDs by checking the last ID in the queue. But if the queue is emptied and a new event added fast enough, it's possible to end up with duplicate IDs. This adds a variable to streamKey that keeps track of the last ID even when the stream itself is empty, which likely solves most cases where this could cause problems. --- stream.go | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/stream.go b/stream.go index d748cfa6..574f9016 100644 --- a/stream.go +++ b/stream.go @@ -14,8 +14,9 @@ import ( // a Stream is a list of entries, lowest ID (oldest) first, and all "groups". type streamKey struct { - entries []StreamEntry - groups map[string]*streamGroup + entries []StreamEntry + groups map[string]*streamGroup + lastAllocatedID string } // a StreamEntry is an entry in a stream. The ID is always of the form @@ -53,14 +54,20 @@ func newStreamKey() *streamKey { func (s *streamKey) generateID(now time.Time) string { ts := uint64(now.UnixNano()) / 1_000_000 - lastID := s.lastID() - next := fmt.Sprintf("%d-%d", ts, 0) - if streamCmp(lastID, next) == -1 { - return next + if s.lastAllocatedID != "" && streamCmp(s.lastAllocatedID, next) >= 0 { + last, _ := parseStreamID(s.lastAllocatedID) + next = fmt.Sprintf("%d-%d", last[0], last[1]+1) + } + + lastID := s.lastID() + if streamCmp(lastID, next) >= 0 { + last, _ := parseStreamID(lastID) + next = fmt.Sprintf("%d-%d", last[0], last[1]+1) } - last, _ := parseStreamID(lastID) - return fmt.Sprintf("%d-%d", last[0], last[1]+1) + + s.lastAllocatedID = next + return next } func (s *streamKey) lastID() string {