diff --git a/README.md b/README.md index dd3cf5d0..cf9bde44 100644 --- a/README.md +++ b/README.md @@ -180,9 +180,15 @@ Implemented commands: - XACK - XADD - XAUTOCLAIM + - XCLAIM - XDEL - XGROUP CREATE + - XGROUP CREATECONSUMER + - XGROUP DESTROY + - XGROUP DELCONSUMER - XINFO STREAM -- partly + - XINFO GROUPS + - XINFO CONSUMERS -- partly - XLEN - XRANGE - XREAD diff --git a/cmd_stream.go b/cmd_stream.go index 487d750a..dea316cc 100644 --- a/cmd_stream.go +++ b/cmd_stream.go @@ -28,6 +28,7 @@ func commandsStream(m *Miniredis) { m.srv.Register("XPENDING", m.cmdXpending) m.srv.Register("XTRIM", m.cmdXtrim) m.srv.Register("XAUTOCLAIM", m.cmdXautoclaim) + m.srv.Register("XCLAIM", m.cmdXclaim) } // XADD @@ -300,19 +301,44 @@ func (m *Miniredis) makeCmdXrange(reverse bool) server.Cmd { // XGROUP func (m *Miniredis) cmdXgroup(c *server.Peer, cmd string, args []string) { - if (len(args) == 4 || len(args) == 5) && strings.ToUpper(args[0]) == "CREATE" { + if len(args) == 0 { + setDirty(c) + c.WriteError(errWrongNumber(cmd)) + return + } + + subCmd, args := strings.ToUpper(args[0]), args[1:] + switch subCmd { + case "CREATE": m.cmdXgroupCreate(c, cmd, args) - } else { - j := strings.Join(args, " ") - err := fmt.Sprintf("ERR 'XGROUP %s' not supported", j) + case "DESTROY": + m.cmdXgroupDestroy(c, cmd, args) + case "CREATECONSUMER": + m.cmdXgroupCreateconsumer(c, cmd, args) + case "DELCONSUMER": + m.cmdXgroupDelconsumer(c, cmd, args) + case "HELP", + "SETID": + err := fmt.Sprintf("ERR 'XGROUP %s' not supported", subCmd) setDirty(c) c.WriteError(err) + default: + setDirty(c) + c.WriteError(fmt.Sprintf( + "ERR Unknown subcommand or wrong number of arguments for '%s'. Try XGROUP HELP.", + subCmd, + )) } } // XGROUP CREATE func (m *Miniredis) cmdXgroupCreate(c *server.Peer, cmd string, args []string) { - stream, group, id := args[1], args[2], args[3] + if len(args) != 3 && len(args) != 4 { + setDirty(c) + c.WriteError(errWrongNumber("CREATE")) + return + } + stream, group, id := args[0], args[1], args[2] withTx(m, c, func(c *server.Peer, ctx *connCtx) { db := m.db(ctx.selectedDB) @@ -322,7 +348,7 @@ func (m *Miniredis) cmdXgroupCreate(c *server.Peer, cmd string, args []string) { c.WriteError(err.Error()) return } - if s == nil && len(args) == 5 && strings.ToUpper(args[4]) == "MKSTREAM" { + if s == nil && len(args) == 4 && strings.ToUpper(args[3]) == "MKSTREAM" { if s, err = db.newStream(stream); err != nil { c.WriteError(err.Error()) return @@ -342,6 +368,124 @@ func (m *Miniredis) cmdXgroupCreate(c *server.Peer, cmd string, args []string) { }) } +// XGROUP DESTROY +func (m *Miniredis) cmdXgroupDestroy(c *server.Peer, cmd string, args []string) { + if len(args) != 2 { + setDirty(c) + c.WriteError(errWrongNumber("DESTROY")) + return + } + stream, groupName := args[0], args[1] + + withTx(m, c, func(c *server.Peer, ctx *connCtx) { + db := m.db(ctx.selectedDB) + + s, err := db.stream(stream) + if err != nil { + c.WriteError(err.Error()) + return + } + if s == nil { + c.WriteError(msgXgroupKeyNotFound) + return + } + + if _, ok := s.groups[groupName]; !ok { + c.WriteInt(0) + return + } + delete(s.groups, groupName) + c.WriteInt(1) + }) +} + +// XGROUP CREATECONSUMER +func (m *Miniredis) cmdXgroupCreateconsumer(c *server.Peer, cmd string, args []string) { + if len(args) != 3 { + setDirty(c) + c.WriteError(errWrongNumber("CREATECONSUMER")) + return + } + key, groupName, consumerName := args[0], args[1], args[2] + + withTx(m, c, func(c *server.Peer, ctx *connCtx) { + db := m.db(ctx.selectedDB) + + s, err := db.stream(key) + if err != nil { + c.WriteError(err.Error()) + return + } + if s == nil { + c.WriteError(msgXgroupKeyNotFound) + return + } + + g, ok := s.groups[groupName] + if !ok { + err := fmt.Sprintf("NOGROUP No such consumer group '%s' for key name '%s'", groupName, key) + c.WriteError(err) + return + } + + if _, ok = g.consumers[consumerName]; ok { + c.WriteInt(0) + return + } + g.consumers[consumerName] = &consumer{} + c.WriteInt(1) + }) +} + +// XGROUP DELCONSUMER +func (m *Miniredis) cmdXgroupDelconsumer(c *server.Peer, cmd string, args []string) { + if len(args) != 3 { + setDirty(c) + c.WriteError(errWrongNumber("DELCONSUMER")) + return + } + key, groupName, consumerName := args[0], args[1], args[2] + + withTx(m, c, func(c *server.Peer, ctx *connCtx) { + db := m.db(ctx.selectedDB) + + s, err := db.stream(key) + if err != nil { + c.WriteError(err.Error()) + return + } + if s == nil { + c.WriteError(msgXgroupKeyNotFound) + return + } + + g, ok := s.groups[groupName] + if !ok { + err := fmt.Sprintf("NOGROUP No such consumer group '%s' for key name '%s'", groupName, key) + c.WriteError(err) + return + } + + consumer, ok := g.consumers[consumerName] + if !ok { + c.WriteInt(0) + return + } + defer delete(g.consumers, consumerName) + + if consumer.numPendingEntries > 0 { + newPending := make([]pendingEntry, 0) + for _, entry := range g.pending { + if entry.consumer != consumerName { + newPending = append(newPending, entry) + } + } + g.pending = newPending + } + c.WriteInt(consumer.numPendingEntries) + }) +} + // XINFO func (m *Miniredis) cmdXinfo(c *server.Peer, cmd string, args []string) { if len(args) < 1 { @@ -353,7 +497,11 @@ func (m *Miniredis) cmdXinfo(c *server.Peer, cmd string, args []string) { switch subCmd { case "STREAM": m.cmdXinfoStream(c, args) - case "CONSUMERS", "GROUPS", "HELP": + case "CONSUMERS": + m.cmdXinfoConsumers(c, args) + case "GROUPS": + m.cmdXinfoGroups(c, args) + case "HELP": err := fmt.Sprintf("'XINFO %s' not supported", strings.Join(args, " ")) setDirty(c) c.WriteError(err) @@ -371,7 +519,7 @@ func (m *Miniredis) cmdXinfo(c *server.Peer, cmd string, args []string) { func (m *Miniredis) cmdXinfoStream(c *server.Peer, args []string) { if len(args) < 1 { setDirty(c) - c.WriteError(errWrongNumber("XINFO")) + c.WriteError(errWrongNumber("STREAM")) return } key := args[0] @@ -395,6 +543,100 @@ func (m *Miniredis) cmdXinfoStream(c *server.Peer, args []string) { }) } +// XINFO GROUPS +func (m *Miniredis) cmdXinfoGroups(c *server.Peer, args []string) { + if len(args) != 1 { + setDirty(c) + c.WriteError(errWrongNumber("GROUPS")) + return + } + key := args[0] + + withTx(m, c, func(c *server.Peer, ctx *connCtx) { + db := m.db(ctx.selectedDB) + + s, err := db.stream(key) + if err != nil { + c.WriteError(err.Error()) + return + } + if s == nil { + c.WriteError(msgKeyNotFound) + return + } + + c.WriteLen(len(s.groups)) + for name, g := range s.groups { + c.WriteMapLen(4) + + c.WriteBulk("name") + c.WriteBulk(name) + + c.WriteBulk("consumers") + c.WriteInt(len(g.consumers)) + + c.WriteBulk("pending") + c.WriteInt(len(g.pending)) + + c.WriteBulk("last-delivered-id") + c.WriteBulk(g.lastID) + } + }) +} + +// XINFO CONSUMERS +// Please note that this is only a partial implementation, for it does not +// return each consumer's "idle" value, which indicates "the number of +// milliseconds that have passed since the consumer last interacted with the +// server." +func (m *Miniredis) cmdXinfoConsumers(c *server.Peer, args []string) { + if len(args) != 2 { + setDirty(c) + c.WriteError(errWrongNumber("CONSUMERS")) + return + } + key := args[0] + groupName := args[1] + + withTx(m, c, func(c *server.Peer, ctx *connCtx) { + db := m.db(ctx.selectedDB) + + s, err := db.stream(key) + if err != nil { + c.WriteError(err.Error()) + return + } + if s == nil { + c.WriteError(msgKeyNotFound) + return + } + + g, ok := s.groups[groupName] + if !ok { + err := fmt.Sprintf("NOGROUP No such consumer group '%s' for key name '%s'", groupName, key) + c.WriteError(err) + return + } + + consumerNames := make([]string, 0) + for name := range g.consumers { + consumerNames = append(consumerNames, name) + } + sort.Strings(consumerNames) + + c.WriteLen(len(consumerNames)) + for _, name := range consumerNames { + c.WriteMapLen(2) + + c.WriteBulk("name") + c.WriteBulk(name) + + c.WriteBulk("pending") + c.WriteInt(g.consumers[name].numPendingEntries) + } + }) +} + // XREADGROUP func (m *Miniredis) cmdXreadgroup(c *server.Peer, cmd string, args []string) { // XREADGROUP GROUP group consumer STREAMS key ID @@ -807,6 +1049,7 @@ func (m *Miniredis) cmdXpending(c *server.Peer, cmd string, args []string) { key string group string summary bool + idle time.Duration start, end string count int consumer *string @@ -814,38 +1057,49 @@ func (m *Miniredis) cmdXpending(c *server.Peer, cmd string, args []string) { opts.key, opts.group, args = args[0], args[1], args[2:] opts.summary = true - if len(args) > 0 && strings.ToUpper(args[0]) == "IDLE" { - setDirty(c) - c.WriteError("ERR IDLE is unsupported") - return - } if len(args) >= 3 { opts.summary = false - start_, err := formatStreamRangeBound(args[0], true, false) + if strings.ToUpper(args[0]) == "IDLE" { + idleMs, err := strconv.ParseInt(args[1], 10, 64) + if err != nil { + setDirty(c) + c.WriteError(msgInvalidInt) + return + } + opts.idle = time.Duration(idleMs) * time.Millisecond + + args = args[2:] + if len(args) < 3 { + setDirty(c) + c.WriteError(msgSyntaxError) + return + } + } + + var err error + opts.start, err = formatStreamRangeBound(args[0], true, false) if err != nil { + setDirty(c) c.WriteError(msgInvalidStreamID) return } - opts.start = start_ - end_, err := formatStreamRangeBound(args[1], false, false) + opts.end, err = formatStreamRangeBound(args[1], false, false) if err != nil { + setDirty(c) c.WriteError(msgInvalidStreamID) return } - opts.end = end_ - n, err := strconv.Atoi(args[2]) // negative is allowed + opts.count, err = strconv.Atoi(args[2]) // negative is allowed if err != nil { + setDirty(c) c.WriteError(msgInvalidInt) return } - opts.count = n args = args[3:] if len(args) == 1 { - var c string - c, args = args[0], args[1:] - opts.consumer = &c + opts.consumer, args = &args[0], args[1:] } } if len(args) != 0 { @@ -870,7 +1124,7 @@ func (m *Miniredis) cmdXpending(c *server.Peer, cmd string, args []string) { writeXpendingSummary(c, *g) return } - writeXpending(m.effectiveNow(), c, *g, opts.start, opts.end, opts.count, opts.consumer) + writeXpending(m.effectiveNow(), c, *g, opts.idle, opts.start, opts.end, opts.count, opts.consumer) }) } @@ -917,6 +1171,7 @@ func writeXpending( now time.Time, c *server.Peer, g streamGroup, + idle time.Duration, start, end string, count int, @@ -952,12 +1207,19 @@ func writeXpending( if streamCmp(p.id, end) > 0 { continue } - res = append(res, entry{ - id: p.id, - consumer: p.consumer, - millis: int(now.Sub(p.lastDelivery).Milliseconds()), - count: p.deliveryCount, - }) + timeSinceLastDelivery := now.Sub(p.lastDelivery) + if timeSinceLastDelivery >= idle { + res = append(res, entry{ + id: p.id, + consumer: p.consumer, + millis: int(timeSinceLastDelivery.Milliseconds()), + count: p.deliveryCount, + }) + } + } + if len(res) == 0 { + c.WriteLen(-1) + return } c.WriteLen(len(res)) for _, e := range res { @@ -1180,8 +1442,13 @@ func xautoclaim( if minIdleTime > 0 && now.Before(p.lastDelivery.Add(minIdleTime)) { continue } - g.consumers[consumerID] = consumer{} + + prevConsumerID := p.consumer + if _, ok := g.consumers[consumerID]; !ok { + g.consumers[consumerID] = &consumer{} + } p.consumer = consumerID + _, entry := g.stream.get(p.id) // not found. Weird? if entry == nil { @@ -1190,8 +1457,13 @@ func xautoclaim( // (Introduced in Redis 7.0) continue } + p.deliveryCount += 1 p.lastDelivery = now + + g.consumers[prevConsumerID].numPendingEntries-- + g.consumers[consumerID].numPendingEntries++ + msgs[i] = p res = append(res, *entry) @@ -1224,6 +1496,192 @@ func writeXautoclaim(c *server.Peer, nextCallId string, res []StreamEntry, justI } } +// XCLAIM +func (m *Miniredis) cmdXclaim(c *server.Peer, cmd string, args []string) { + if len(args) < 5 { + setDirty(c) + c.WriteError(errWrongNumber(cmd)) + return + } + + var opts struct { + key string + groupName string + consumerName string + minIdleTime time.Duration + newLastDelivery time.Time + ids []string + retryCount *int + force bool + justId bool + } + + opts.key, opts.groupName, opts.consumerName = args[0], args[1], args[2] + + minIdleTimeMillis, err := strconv.Atoi(args[3]) + if err != nil { + setDirty(c) + c.WriteError("ERR Invalid min-idle-time argument for XCLAIM") + return + } + opts.minIdleTime = time.Millisecond * time.Duration(minIdleTimeMillis) + + opts.newLastDelivery = m.effectiveNow() + opts.ids = append(opts.ids, args[4]) + + args = args[5:] + for len(args) > 0 { + arg := strings.ToUpper(args[0]) + if arg == "IDLE" || + arg == "TIME" || + arg == "RETRYCOUNT" || + arg == "FORCE" || + arg == "JUSTID" { + break + } + opts.ids = append(opts.ids, arg) + args = args[1:] + } + + for len(args) > 0 { + arg := strings.ToUpper(args[0]) + switch arg { + case "IDLE": + idleMs, err := strconv.ParseInt(args[1], 10, 64) + if err != nil { + setDirty(c) + c.WriteError("ERR Invalid IDLE option argument for XCLAIM") + return + } + if idleMs < 0 { + idleMs = 0 + } + opts.newLastDelivery = m.effectiveNow().Add(time.Millisecond * time.Duration(-idleMs)) + args = args[2:] + case "TIME": + timeMs, err := strconv.ParseInt(args[1], 10, 64) + if err != nil { + setDirty(c) + c.WriteError("ERR Invalid TIME option argument for XCLAIM") + return + } + opts.newLastDelivery = unixMilli(timeMs) + args = args[2:] + case "RETRYCOUNT": + retryCount, err := strconv.Atoi(args[1]) + if err != nil { + setDirty(c) + c.WriteError("ERR Invalid RETRYCOUNT option argument for XCLAIM") + return + } + opts.retryCount = &retryCount + args = args[2:] + case "FORCE": + opts.force = true + args = args[1:] + case "JUSTID": + opts.justId = true + args = args[1:] + default: + setDirty(c) + c.WriteError(fmt.Sprintf("ERR Unrecognized XCLAIM option '%s'", args[0])) + return + } + } + + withTx(m, c, func(c *server.Peer, ctx *connCtx) { + db := m.db(ctx.selectedDB) + + g, err := db.streamGroup(opts.key, opts.groupName) + if err != nil { + c.WriteError(err.Error()) + return + } + if g == nil { + c.WriteError(errReadgroup(opts.key, opts.groupName).Error()) + return + } + + claimedEntryIDs := m.xclaim(g, opts.consumerName, opts.minIdleTime, opts.newLastDelivery, opts.ids, opts.retryCount, opts.force) + writeXclaim(c, g.stream, claimedEntryIDs, opts.justId) + }) +} + +func (m *Miniredis) xclaim( + group *streamGroup, + consumerName string, + minIdleTime time.Duration, + newLastDelivery time.Time, + ids []string, + retryCount *int, + force bool, +) (claimedEntryIDs []string) { + for _, id := range ids { + pelPos, pelEntry := group.searchPending(id) + if pelEntry == nil { + if !force { + continue + } + + if pelPos < len(group.pending) { + group.pending = append(group.pending[:pelPos+1], group.pending[pelPos:]...) + } else { + group.pending = append(group.pending, pendingEntry{}) + } + pelEntry = &group.pending[pelPos] + + *pelEntry = pendingEntry{ + id: id, + consumer: consumerName, + deliveryCount: 1, + } + } else { + group.consumers[pelEntry.consumer].numPendingEntries-- + pelEntry.consumer = consumerName + } + + if retryCount != nil { + pelEntry.deliveryCount = *retryCount + } else { + pelEntry.deliveryCount++ + } + pelEntry.lastDelivery = newLastDelivery + + claimedEntryIDs = append(claimedEntryIDs, id) + } + if len(claimedEntryIDs) == 0 { + return + } + + if _, ok := group.consumers[consumerName]; !ok { + group.consumers[consumerName] = &consumer{} + } + consumer := group.consumers[consumerName] + consumer.numPendingEntries += len(claimedEntryIDs) + + return +} + +func writeXclaim(c *server.Peer, stream *streamKey, claimedEntryIDs []string, justId bool) { + c.WriteLen(len(claimedEntryIDs)) + for _, id := range claimedEntryIDs { + if justId { + c.WriteBulk(id) + continue + } + + _, entry := stream.get(id) + if entry == nil { + c.WriteNull() + continue + } + + c.WriteLen(2) + c.WriteBulk(entry.ID) + c.WriteStrings(entry.Values) + } +} + func parseBlock(cmd string, args []string, block *bool, timeout *time.Duration) error { if len(args) < 2 { return errors.New(errWrongNumber(cmd)) @@ -1239,3 +1697,8 @@ func parseBlock(cmd string, args []string, block *bool, timeout *time.Duration) (*timeout) = time.Millisecond * time.Duration(ms) return nil } + +// taken from Go's time package. Can be dropped if miniredis supports >= 1.17 +func unixMilli(msec int64) time.Time { + return time.Unix(msec/1e3, (msec%1e3)*1e6) +} diff --git a/cmd_stream_test.go b/cmd_stream_test.go index 048ae460..d0dfb69d 100644 --- a/cmd_stream_test.go +++ b/cmd_stream_test.go @@ -4,6 +4,7 @@ import ( "fmt" "math" "regexp" + "strconv" "testing" "time" @@ -482,6 +483,28 @@ func TestStreamInfo(t *testing.T) { "XINFO", "STREAM", "planets", proto.Array(proto.String("length"), proto.Int(1)), ) + + mustDo(t, c, + "XINFO", "GROUPS", "planets", "foo", "bar", + proto.Error("ERR wrong number of arguments for 'groups' command"), + ) + mustDo(t, c, + "XINFO", "GROUPS", "foo", + proto.Error("ERR no such key"), + ) + mustDo(t, c, + "XINFO", "GROUPS", "planets", + proto.Array(), + ) + + mustDo(t, c, + "XINFO", "CONSUMERS", "foo", "bar", + proto.Error("ERR no such key"), + ) + mustDo(t, c, + "XINFO", "CONSUMERS", "planets", "processing", + proto.Error("NOGROUP No such consumer group 'processing' for key name 'planets'"), + ) } // Test XGROUP @@ -497,14 +520,118 @@ func TestStreamGroup(t *testing.T) { "XGROUP", "CREATE", "s", "processing", "$", proto.Error(msgXgroupKeyNotFound), ) + mustDo(t, c, + "XGROUP", "DESTROY", "s", "processing", + proto.Error(msgXgroupKeyNotFound), + ) + mustDo(t, c, + "XGROUP", "DELCONSUMER", "s", "processing", "foo", + proto.Error(msgXgroupKeyNotFound), + ) mustOK(t, c, "XGROUP", "CREATE", "s", "processing", "$", "MKSTREAM", ) + mustDo(t, c, + "XGROUP", "DESTROY", "s", "foo", + proto.Int(0), + ) + + mustDo(t, c, + "XINFO", "GROUPS", "s", + proto.Array( + proto.Array( + proto.String("name"), proto.String("processing"), + proto.String("consumers"), proto.Int(0), + proto.String("pending"), proto.Int(0), + proto.String("last-delivered-id"), proto.String("0-0"), + ), + ), + ) + mustDo(t, c, + "XINFO", "CONSUMERS", "s", "processing", + proto.Array(), + ) + + mustDo(t, c, + "XGROUP", "CREATECONSUMER", "s", "processing", "alice", + proto.Int(1), + ) + mustDo(t, c, + "XINFO", "GROUPS", "s", + proto.Array( + proto.Array( + proto.String("name"), proto.String("processing"), + proto.String("consumers"), proto.Int(1), + proto.String("pending"), proto.Int(0), + proto.String("last-delivered-id"), proto.String("0-0"), + ), + ), + ) + mustDo(t, c, + "XINFO", "CONSUMERS", "s", "processing", + proto.Array( + proto.Array( + proto.String("name"), proto.String("alice"), + proto.String("pending"), proto.Int(0), + ), + ), + ) + + mustDo(t, c, + "XGROUP", "DELCONSUMER", "s", "processing", "foo", + proto.Int(0), + ) + mustDo(t, c, + "XGROUP", "DELCONSUMER", "s", "processing", "alice", + proto.Int(0), + ) + mustDo(t, c, + "XINFO", "GROUPS", "s", + proto.Array( + proto.Array( + proto.String("name"), proto.String("processing"), + proto.String("consumers"), proto.Int(0), + proto.String("pending"), proto.Int(0), + proto.String("last-delivered-id"), proto.String("0-0"), + ), + ), + ) + mustDo(t, c, + "XINFO", "CONSUMERS", "s", "processing", + proto.Array(), + ) + mustDo(t, c, + "XGROUP", "DESTROY", "s", "processing", + proto.Int(1), + ) must0(t, c, "XLEN", "s", ) + mustDo(t, c, + "XINFO", "GROUPS", "s", + proto.Array(), + ) + + t.Run("errors", func(t *testing.T) { + mustDo(t, c, + "XGROUP", + proto.Error("ERR wrong number of arguments for 'xgroup' command"), + ) + mustDo(t, c, + "XGROUP", "HELP", + proto.Error("ERR 'XGROUP HELP' not supported"), + ) + mustDo(t, c, + "XGROUP", "foo", + proto.Error("ERR Unknown subcommand or wrong number of arguments for 'FOO'. Try XGROUP HELP."), + ) + mustDo(t, c, + "XGROUP", "SETID", + proto.Error("ERR 'XGROUP SETID' not supported"), + ) + }) } // Test XREADGROUP @@ -529,6 +656,23 @@ func TestStreamReadGroup(t *testing.T) { "XREADGROUP", "GROUP", "processing", "alice", "STREAMS", "planets", ">", ) + mustDo(t, c, + "XINFO", "GROUPS", "planets", + proto.Array( + proto.Array( + proto.String("name"), proto.String("processing"), + proto.String("consumers"), proto.Int(0), + proto.String("pending"), proto.Int(0), + proto.String("last-delivered-id"), proto.String("0-0"), + ), + ), + ) + + mustDo(t, c, + "XINFO", "CONSUMERS", "planets", "processing", + proto.Array(), + ) + mustDo(t, c, "XADD", "planets", "0-1", "name", "Mercury", proto.String("0-1"), @@ -545,6 +689,28 @@ func TestStreamReadGroup(t *testing.T) { ), ) + mustDo(t, c, + "XINFO", "GROUPS", "planets", + proto.Array( + proto.Array( + proto.String("name"), proto.String("processing"), + proto.String("consumers"), proto.Int(1), + proto.String("pending"), proto.Int(1), + proto.String("last-delivered-id"), proto.String("0-1"), + ), + ), + ) + + mustDo(t, c, + "XINFO", "CONSUMERS", "planets", "processing", + proto.Array( + proto.Array( + proto.String("name"), proto.String("alice"), + proto.String("pending"), proto.Int(1), + ), + ), + ) + mustNilList(t, c, "XREADGROUP", "GROUP", "processing", "alice", "STREAMS", "planets", ">", ) @@ -652,6 +818,32 @@ func TestStreamAck(t *testing.T) { ), ), ) + mustDo(t, c, + "XINFO", "GROUPS", "planets", + proto.Array( + proto.Array( + proto.String("name"), proto.String("processing"), + proto.String("consumers"), proto.Int(1), + proto.String("pending"), proto.Int(0), + proto.String("last-delivered-id"), proto.String("0-1"), + ), + ), + ) + + mustDo(t, c, + "XINFO", "CONSUMERS", "planets", "processing", + proto.Array( + proto.Array( + proto.String("name"), proto.String("alice"), + proto.String("pending"), proto.Int(0), + ), + ), + ) + + mustDo(t, c, + "XGROUP", "DELCONSUMER", "planets", "processing", "alice", + proto.Int(0), + ) } // Test XPENDING @@ -739,6 +931,35 @@ func TestStreamXpending(t *testing.T) { ), ), ) + + mustDo(t, c, + "XPENDING", "planets", "processing", "IDLE", "5000", "-", "+", "999", + proto.NilList, + ) + mustDo(t, c, + "XPENDING", "planets", "processing", "-", "+", "999", "bob", + proto.NilList, + ) + mustDo(t, c, + "XPENDING", "planets", "processing", "IDLE", "4000", "-", "+", "999", "alice", + proto.Array( + proto.Array( + proto.String("99-1"), + proto.String("alice"), + proto.Int(4000), + proto.Int(2), + ), + ), + ) + + mustDo(t, c, + "XGROUP", "DELCONSUMER", "planets", "processing", "alice", + proto.Int(1), + ) + mustDo(t, c, + "XPENDING", "planets", "processing", "-", "+", "999", + proto.NilList, + ) }) t.Run("errors", func(t *testing.T) { @@ -754,6 +975,10 @@ func TestStreamXpending(t *testing.T) { "XPENDING", "planets", "processing", "toomany", proto.Error("ERR syntax error"), ) + mustDo(t, c, + "XPENDING", "planets", "processing", "IDLE", "1000", + proto.Error("ERR syntax error"), + ) mustDo(t, c, "XPENDING", "planets", "processing", "-", "+", "cons", "nine", proto.Error("ERR value is not an integer or out of range"), @@ -857,6 +1082,10 @@ func TestStreamAutoClaim(t *testing.T) { proto.Array(), ), ) + mustDo(t, c, + "XINFO", "CONSUMERS", "planets", "processing", + proto.Array(), + ) mustDo(t, c, "XADD", "planets", "0-1", "name", "Mercury", @@ -882,6 +1111,15 @@ func TestStreamAutoClaim(t *testing.T) { proto.Array(proto.Array(proto.String("0-1"), proto.Strings("name", "Mercury"))), ), ) + mustDo(t, c, + "XINFO", "CONSUMERS", "planets", "processing", + proto.Array( + proto.Array( + proto.String("name"), proto.String("alice"), + proto.String("pending"), proto.Int(1), + ), + ), + ) // Add an additional item to pending s.SetTime(now.Add(5000 * time.Millisecond)) @@ -895,6 +1133,15 @@ func TestStreamAutoClaim(t *testing.T) { proto.Array(proto.String("planets"), proto.Array(proto.Array(proto.String("0-2"), proto.Strings("name", "Venus")))), ), ) + mustDo(t, c, + "XINFO", "CONSUMERS", "planets", "processing", + proto.Array( + proto.Array( + proto.String("name"), proto.String("alice"), + proto.String("pending"), proto.Int(2), + ), + ), + ) // Autoclaim with a min idle time that should not catch any items s.SetTime(now.Add(10000 * time.Millisecond)) @@ -940,4 +1187,396 @@ func TestStreamAutoClaim(t *testing.T) { ), ), ) + + mustDo(t, c, + "XINFO", "CONSUMERS", "planets", "processing", + proto.Array( + proto.Array( + proto.String("name"), proto.String("alice"), + proto.String("pending"), proto.Int(2), + ), + ), + ) + + s.SetTime(now.Add(60000 * time.Millisecond)) + mustDo(t, c, + "XAUTOCLAIM", "planets", "processing", "bob", "15000", "0", + proto.Array( + proto.String("0-0"), + proto.Array( + proto.Array(proto.String("0-1"), proto.Strings("name", "Mercury")), + proto.Array(proto.String("0-2"), proto.Strings("name", "Venus")), + ), + ), + ) + mustDo(t, c, + "XINFO", "CONSUMERS", "planets", "processing", + proto.Array( + proto.Array( + proto.String("name"), proto.String("alice"), + proto.String("pending"), proto.Int(0), + ), + proto.Array( + proto.String("name"), proto.String("bob"), + proto.String("pending"), proto.Int(2), + ), + ), + ) + + s.SetTime(now.Add(80000 * time.Millisecond)) + mustDo(t, c, + "XAUTOCLAIM", "planets", "processing", "alice", "15000", "0", "COUNT", "1", + proto.Array( + proto.String("0-2"), + proto.Array( + proto.Array(proto.String("0-1"), proto.Strings("name", "Mercury")), + ), + ), + ) + mustDo(t, c, + "XINFO", "CONSUMERS", "planets", "processing", + proto.Array( + proto.Array( + proto.String("name"), proto.String("alice"), + proto.String("pending"), proto.Int(1), + ), + proto.Array( + proto.String("name"), proto.String("bob"), + proto.String("pending"), proto.Int(1), + ), + ), + ) + + mustDo(t, c, + "XGROUP", "DELCONSUMER", "planets", "processing", "alice", + proto.Int(1), + ) + mustDo(t, c, + "XPENDING", "planets", "processing", "-", "+", "999", + proto.Array( + proto.Array( + proto.String("0-2"), + proto.String("bob"), + proto.Int(20000), + proto.Int(4), + ), + ), + ) + + mustDo(t, c, + "XGROUP", "DELCONSUMER", "planets", "processing", "bob", + proto.Int(1), + ) + mustDo(t, c, + "XINFO", "CONSUMERS", "planets", "processing", + proto.Array(), + ) + mustDo(t, c, + "XINFO", "GROUPS", "planets", + proto.Array( + proto.Array( + proto.String("name"), proto.String("processing"), + proto.String("consumers"), proto.Int(0), + proto.String("pending"), proto.Int(0), + proto.String("last-delivered-id"), proto.String("0-2"), + ), + ), + ) +} + +func TestStreamClaim(t *testing.T) { + s, err := Run() + ok(t, err) + defer s.Close() + c, err := proto.Dial(s.Addr()) + ok(t, err) + defer c.Close() + + now := time.Now() + s.SetTime(now) + + mustDo(t, c, + "XCLAIM", "planets", "processing", "alice", "0", "0-0", + proto.Error("NOGROUP No such key 'planets' or consumer group 'processing'"), + ) + + mustOK(t, c, + "XGROUP", "CREATE", "planets", "processing", "$", "MKSTREAM", + ) + mustDo(t, c, + "XINFO", "CONSUMERS", "planets", "processing", + proto.Array(), + ) + + mustDo(t, c, + "XCLAIM", "planets", "processing", "alice", "0", "0-0", + proto.Array(), + ) + mustDo(t, c, + "XINFO", "CONSUMERS", "planets", "processing", + proto.Array(), + ) + + mustDo(t, c, + "XADD", "planets", "0-1", "name", "Mercury", + proto.String("0-1"), + ) + mustDo(t, c, + "XADD", "planets", "0-2", "name", "Venus", + proto.String("0-2"), + ) + + mustDo(t, c, + "XCLAIM", "planets", "processing", "alice", "0", "0-1", + proto.Array(), + ) + mustDo(t, c, + "XPENDING", "planets", "processing", "-", "+", "999", + proto.NilList, + ) + + mustDo(t, c, + "XCLAIM", "planets", "processing", "alice", "0", "0-1", "0-2", "FORCE", + proto.Array( + proto.Array(proto.String("0-1"), proto.Strings("name", "Mercury")), + proto.Array(proto.String("0-2"), proto.Strings("name", "Venus")), + ), + ) + mustDo(t, c, + "XINFO", "GROUPS", "planets", + proto.Array( + proto.Array( + proto.String("name"), proto.String("processing"), + proto.String("consumers"), proto.Int(1), + proto.String("pending"), proto.Int(2), + proto.String("last-delivered-id"), proto.String("0-0"), + ), + ), + ) + mustDo(t, c, + "XINFO", "CONSUMERS", "planets", "processing", + proto.Array( + proto.Array( + proto.String("name"), proto.String("alice"), + proto.String("pending"), proto.Int(2), + ), + ), + ) + mustDo(t, c, + "XPENDING", "planets", "processing", "-", "+", "999", + proto.Array( + proto.Array( + proto.String("0-1"), + proto.String("alice"), + proto.Int(0), + proto.Int(2), + ), + proto.Array( + proto.String("0-2"), + proto.String("alice"), + proto.Int(0), + proto.Int(2), + ), + ), + ) + + s.SetTime(now.Add(20000 * time.Millisecond)) + mustDo(t, c, + "XDEL", "planets", "0-1", + proto.Int(1), + ) + mustDo(t, c, + "XCLAIM", "planets", "processing", "bob", "0", "0-1", + proto.Array(proto.Nil), + ) + mustDo(t, c, + "XINFO", "GROUPS", "planets", + proto.Array( + proto.Array( + proto.String("name"), proto.String("processing"), + proto.String("consumers"), proto.Int(2), + proto.String("pending"), proto.Int(2), + proto.String("last-delivered-id"), proto.String("0-0"), + ), + ), + ) + mustDo(t, c, + "XINFO", "CONSUMERS", "planets", "processing", + proto.Array( + proto.Array( + proto.String("name"), proto.String("alice"), + proto.String("pending"), proto.Int(1), + ), + proto.Array( + proto.String("name"), proto.String("bob"), + proto.String("pending"), proto.Int(1), + ), + ), + ) + mustDo(t, c, + "XPENDING", "planets", "processing", "-", "+", "999", + proto.Array( + proto.Array( + proto.String("0-1"), + proto.String("bob"), + proto.Int(0), + proto.Int(3), + ), + proto.Array( + proto.String("0-2"), + proto.String("alice"), + proto.Int(20000), + proto.Int(2), + ), + ), + ) + + mustDo(t, c, + "XADD", "planets", "0-3", "name", "Earth", + proto.String("0-3"), + ) + mustDo(t, c, + "XADD", "planets", "0-4", "name", "Mars", + proto.String("0-4"), + ) + mustDo(t, c, + "XCLAIM", "planets", "processing", "bob", "0", "0-4", "FORCE", + proto.Array( + proto.Array(proto.String("0-4"), proto.Strings("name", "Mars")), + ), + ) + mustDo(t, c, + "XCLAIM", "planets", "processing", "bob", "0", "0-4", + proto.Array( + proto.Array(proto.String("0-4"), proto.Strings("name", "Mars")), + ), + ) + mustDo(t, c, + "XPENDING", "planets", "processing", "-", "+", "999", + proto.Array( + proto.Array( + proto.String("0-1"), + proto.String("bob"), + proto.Int(0), + proto.Int(3), + ), + proto.Array( + proto.String("0-2"), + proto.String("alice"), + proto.Int(20000), + proto.Int(2), + ), + proto.Array( + proto.String("0-4"), + proto.String("bob"), + proto.Int(0), + proto.Int(3), + ), + ), + ) + + mustDo(t, c, + "XREADGROUP", "GROUP", "processing", "alice", "STREAMS", "planets", ">", + proto.Array( + proto.Array( + proto.String("planets"), + proto.Array( + proto.Array(proto.String("0-2"), proto.Strings("name", "Venus")), + proto.Array(proto.String("0-3"), proto.Strings("name", "Earth")), + proto.Array(proto.String("0-4"), proto.Strings("name", "Mars")), + ), + ), + ), + ) + mustDo(t, c, + "XPENDING", "planets", "processing", "-", "+", "999", + proto.Array( + proto.Array( + proto.String("0-1"), + proto.String("bob"), + proto.Int(0), + proto.Int(3), + ), + proto.Array( + proto.String("0-2"), + proto.String("alice"), + proto.Int(0), + proto.Int(1), + ), + proto.Array( + proto.String("0-3"), + proto.String("alice"), + proto.Int(0), + proto.Int(1), + ), + proto.Array( + proto.String("0-4"), + proto.String("alice"), + proto.Int(0), + proto.Int(1), + ), + ), + ) + mustDo(t, c, + "XINFO", "CONSUMERS", "planets", "processing", + proto.Array( + proto.Array( + proto.String("name"), proto.String("alice"), + proto.String("pending"), proto.Int(3), + ), + proto.Array( + proto.String("name"), proto.String("bob"), + proto.String("pending"), proto.Int(1), + ), + ), + ) + + mustDo(t, c, + "XCLAIM", "planets", "processing", "alice", "0", "0-3", "RETRYCOUNT", "10", "IDLE", "5000", "JUSTID", + proto.Array(proto.String("0-3")), + ) + newTime := s.effectiveNow().Add(time.Millisecond * time.Duration(-10000)) + newTimeString := strconv.FormatInt(newTime.UnixNano()/time.Millisecond.Nanoseconds(), 10) + mustDo(t, c, + "XCLAIM", "planets", "processing", "alice", "0", "0-1", "RETRYCOUNT", "1", "TIME", newTimeString, "JUSTID", + proto.Array(proto.String("0-1")), + ) + mustDo(t, c, + "XPENDING", "planets", "processing", "-", "+", "999", + proto.Array( + proto.Array( + proto.String("0-1"), + proto.String("alice"), + proto.Int(10000), + proto.Int(1), + ), + proto.Array( + proto.String("0-2"), + proto.String("alice"), + proto.Int(0), + proto.Int(1), + ), + proto.Array( + proto.String("0-3"), + proto.String("alice"), + proto.Int(5000), + proto.Int(10), + ), + proto.Array( + proto.String("0-4"), + proto.String("alice"), + proto.Int(0), + proto.Int(1), + ), + ), + ) + + mustDo(t, c, + "XACK", "planets", "processing", "0-1", "0-2", "0-3", "0-4", + proto.Int(4), + ) + mustDo(t, c, + "XPENDING", "planets", "processing", "-", "+", "999", + proto.NilList, + ) } diff --git a/integration/stream_test.go b/integration/stream_test.go index 2a7f577b..8967b4ed 100644 --- a/integration/stream_test.go +++ b/integration/stream_test.go @@ -159,6 +159,11 @@ func TestStream(t *testing.T) { c.Error("wrong number", "XINFO") c.Do("SET", "scalar", "foo") c.Error("wrong kind", "XINFO", "STREAM", "scalar") + + c.Error("no such key", "XINFO", "GROUPS", "foo") + c.Do("XINFO", "GROUPS", "planets") + + c.Error("no such key", "XINFO", "CONSUMERS", "foo", "bar") }) }) @@ -374,7 +379,20 @@ func TestStreamGroup(t *testing.T) { c.Error("to exist", "XGROUP", "CREATE", "planets", "processing", "$") c.Do("XADD", "planets", "123-500", "foo", "bar") c.Do("XGROUP", "CREATE", "planets", "processing", "$") + c.Do("XINFO", "GROUPS", "planets") c.Error("already exist", "XGROUP", "CREATE", "planets", "processing", "$") + c.Error("to exist", "XGROUP", "DESTROY", "foo", "bar") + c.Do("XGROUP", "DESTROY", "planets", "bar") + c.Error("No such consumer group", "XGROUP", "DELCONSUMER", "planets", "foo", "bar") + c.Do("XGROUP", "CREATECONSUMER", "planets", "processing", "alice") + c.Do("XINFO", "GROUPS", "planets") + c.Do("XGROUP", "DELCONSUMER", "planets", "processing", "foo") + c.Do("XGROUP", "DELCONSUMER", "planets", "processing", "alice") + c.Do("XINFO", "CONSUMERS", "planets", "processing") + c.Do("XGROUP", "DESTROY", "planets", "processing") + c.Do("XINFO", "GROUPS", "planets") + c.Error("wrong number of arguments", "XGROUP") + c.Error("wrong number of arguments", "XGROUP", "foo") }) }) @@ -521,6 +539,9 @@ func TestStreamGroup(t *testing.T) { c.Do("XREADGROUP", "GROUP", "processing", "eve", "COUNT", "1", "STREAMS", "planets", ">") c.Do("XPENDING", "planets", "processing") + c.Do("XGROUP", "DELCONSUMER", "planets", "processing", "alice") + c.Do("XPENDING", "planets", "processing") + c.Error("consumer group", "XPENDING", "foo", "processing") c.Error("consumer group", "XPENDING", "planets", "foo") @@ -528,6 +549,7 @@ func TestStreamGroup(t *testing.T) { c.Error("wrong number", "XPENDING") c.Error("wrong number", "XPENDING", "planets") c.Error("syntax", "XPENDING", "planets", "processing", "too many") + c.Error("syntax", "XPENDING", "planets", "processing", "IDLE", "10") }) // full mode @@ -545,12 +567,14 @@ func TestStreamGroup(t *testing.T) { c.DoLoosely("XPENDING", "planets", "processing", "-", "+", "1") c.DoLoosely("XPENDING", "planets", "processing", "-", "+", "0") c.DoLoosely("XPENDING", "planets", "processing", "-", "+", "-1") + c.DoLoosely("XPENDING", "planets", "processing", "IDLE", "10", "-", "+", "999") c.Do("XADD", "planets", "4000-5", "name", "Earth") c.Do("XREADGROUP", "GROUP", "processing", "bob", "STREAMS", "planets", ">") c.DoLoosely("XPENDING", "planets", "processing", "-", "+", "999") c.DoLoosely("XPENDING", "planets", "processing", "-", "+", "999", "bob") c.DoLoosely("XPENDING", "planets", "processing", "-", "+", "999", "eve") + c.DoLoosely("XPENDING", "planets", "processing", "IDLE", "10", "-", "+", "999", "eve") // update delivery counts (which we can't test thanks to the time field) c.Do("XREADGROUP", "GROUP", "processing", "bob", "STREAMS", "planets", "99") @@ -559,6 +583,7 @@ func TestStreamGroup(t *testing.T) { c.Error("Invalid", "XPENDING", "planets", "processing", "foo", "+", "999") c.Error("Invalid", "XPENDING", "planets", "processing", "-", "foo", "999") c.Error("not an integer", "XPENDING", "planets", "processing", "-", "+", "foo") + c.Error("not an integer", "XPENDING", "planets", "processing", "IDLE", "abc", "-", "+", "999") }) }) @@ -604,6 +629,58 @@ func TestStreamGroup(t *testing.T) { }) }) + t.Run("XCLAIM", func(t *testing.T) { + testRaw(t, func(c *client) { + c.Error("No such key", "XCLAIM", "planets", "processing", "alice", "0", "0-0") + c.Do("XGROUP", "CREATE", "planets", "processing", "$", "MKSTREAM") + c.Error("No such key", "XCLAIM", "planets", "foo", "alice", "0", "0-0") + c.Do("XCLAIM", "planets", "processing", "alice", "0", "0-0") + c.Do("XINFO", "CONSUMERS", "planets", "processing") + + c.Do("XADD", "planets", "0-1", "name", "Mercury") + c.Do("XADD", "planets", "0-2", "name", "Venus") + + c.Do("XCLAIM", "planets", "processing", "alice", "0", "0-1") + c.Do("XINFO", "CONSUMERS", "planets", "processing") + + c.Do("XCLAIM", "planets", "processing", "alice", "0", "0-1", "0-2", "FORCE") + c.Do("XINFO", "GROUPS", "planets") + c.Do("XPENDING", "planets", "processing") + + c.Do("XDEL", "planets", "0-1") + c.Do("XCLAIM", "planets", "processing", "bob", "0", "0-1") + c.Do("XINFO", "GROUPS", "planets") + c.Do("XPENDING", "planets", "processing") + + c.Do("XADD", "planets", "0-3", "name", "Mercury") + c.Do("XADD", "planets", "0-4", "name", "Venus") + + c.Do("XCLAIM", "planets", "processing", "bob", "0", "0-4", "FORCE") + c.Do("XCLAIM", "planets", "processing", "bob", "0", "0-4") + c.Do("XPENDING", "planets", "processing") + + c.Do("XREADGROUP", "GROUP", "processing", "alice", "COUNT", "1", "STREAMS", "planets", ">") + c.Do("XPENDING", "planets", "processing") + c.Do("XREADGROUP", "GROUP", "processing", "alice", "STREAMS", "planets", ">") + c.Do("XPENDING", "planets", "processing") + + c.Do("XCLAIM", "planets", "processing", "alice", "0", "0-3", "RETRYCOUNT", "10", "IDLE", "5000", "JUSTID") + c.Do("XCLAIM", "planets", "processing", "alice", "0", "0-1", "0-2", "RETRYCOUNT", "1", "TIME", "1", "JUSTID") + c.Do("XCLAIM", "planets", "processing", "alice", "0", "0-1", "0-4", "RETRYCOUNT", "1", "TIME", "1", "justid") + c.Do("XPENDING", "planets", "processing") + + c.Do("XACK", "planets", "processing", "0-1", "0-2", "0-3", "0-4") + c.Do("XPENDING", "planets", "processing") + + c.Error("Unrecognized XCLAIM option", "XCLAIM", "planets", "processing", "alice", "0", "0-3", "RETRYCOUNT", "10", "0-4", "IDLE", "0") + c.Error("Unrecognized XCLAIM option", "XCLAIM", "planets", "processing", "alice", "0", "0-3", "RETRYCOUNT", "10", "IDLE", "0", "0-4") + c.Error("Invalid min-idle-time", "XCLAIM", "planets", "processing", "alice", "foo", "0-1", "JUSTID") + c.Error("Invalid IDLE", "XCLAIM", "planets", "processing", "alice", "0", "0-1", "JUSTID", "IDLE", "foo") + c.Error("Invalid TIME", "XCLAIM", "planets", "processing", "alice", "0", "0-1", "JUSTID", "TIME", "foo") + c.Error("Invalid RETRYCOUNT", "XCLAIM", "planets", "processing", "alice", "0", "0-1", "JUSTID", "RETRYCOUNT", "foo") + }) + }) + testRESP3(t, func(c *client) { c.DoLoosely("XINFO", "STREAM", "foo") }) diff --git a/stream.go b/stream.go index 574f9016..c09051a2 100644 --- a/stream.go +++ b/stream.go @@ -31,10 +31,11 @@ type streamGroup struct { stream *streamKey lastID string pending []pendingEntry - consumers map[string]consumer + consumers map[string]*consumer } type consumer struct { + numPendingEntries int // TODO: "last seen" timestamp } @@ -203,7 +204,7 @@ func (s *streamKey) createGroup(group, id string) error { s.groups[group] = &streamGroup{ stream: s, lastID: id, - consumers: map[string]consumer{}, + consumers: map[string]*consumer{}, } return nil } @@ -279,16 +280,39 @@ func (g *streamGroup) readGroup( } if !noack { + shouldAppend := len(g.pending) == 0 for _, msg := range msgs { - g.pending = append(g.pending, pendingEntry{ + if !shouldAppend { + shouldAppend = streamCmp(msg.ID, g.pending[len(g.pending)-1].id) == 1 + } + + var entry *pendingEntry + if shouldAppend { + g.pending = append(g.pending, pendingEntry{}) + entry = &g.pending[len(g.pending)-1] + } else { + var pos int + pos, entry = g.searchPending(msg.ID) + if entry == nil { + g.pending = append(g.pending[:pos+1], g.pending[pos:]...) + entry = &g.pending[pos] + } else { + g.consumers[entry.consumer].numPendingEntries-- + } + } + + *entry = pendingEntry{ id: msg.ID, consumer: consumerID, deliveryCount: 1, lastDelivery: now, - }) + } } } - g.consumers[consumerID] = consumer{} + if _, ok := g.consumers[consumerID]; !ok { + g.consumers[consumerID] = &consumer{} + } + g.consumers[consumerID].numPendingEntries += len(msgs) g.lastID = msgs[len(msgs)-1].ID return msgs } @@ -314,6 +338,16 @@ func (g *streamGroup) readGroup( return res } +func (g *streamGroup) searchPending(id string) (int, *pendingEntry) { + pos := sort.Search(len(g.pending), func(i int) bool { + return streamCmp(id, g.pending[i].id) <= 0 + }) + if pos >= len(g.pending) || g.pending[pos].id != id { + return pos, nil + } + return pos, &g.pending[pos] +} + func (g *streamGroup) ack(ids []string) (int, error) { count := 0 for _, id := range ids { @@ -321,13 +355,14 @@ func (g *streamGroup) ack(ids []string) (int, error) { return 0, errors.New(msgInvalidStreamID) } - pos := sort.Search(len(g.pending), func(i int) bool { - return streamCmp(id, g.pending[i].id) <= 0 - }) - if len(g.pending) <= pos || g.pending[pos].id != id { + pos, entry := g.searchPending(id) + if entry == nil { continue } + consumer := g.consumers[entry.consumer] + consumer.numPendingEntries-- + g.pending = append(g.pending[:pos], g.pending[pos+1:]...) count++ } @@ -370,9 +405,10 @@ func (g *streamGroup) pendingCount(consumer string) int { } func (g *streamGroup) copy() *streamGroup { - cns := map[string]consumer{} + cns := map[string]*consumer{} for k, v := range g.consumers { - cns[k] = v + c := *v + cns[k] = &c } return &streamGroup{ // don't copy stream