Skip to content

Commit

Permalink
Merge pull request #276 from readams/autoclaim-minidle
Browse files Browse the repository at this point in the history
Adds support for XAUTOCLAIM min-idle parameter.
  • Loading branch information
alicebob committed Jun 29, 2022
2 parents 80cb163 + a784dd4 commit 529c1de
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 6 deletions.
15 changes: 9 additions & 6 deletions cmd_stream.go
Expand Up @@ -1083,11 +1083,6 @@ func (m *Miniredis) cmdXautoclaim(c *server.Peer, cmd string, args []string) {
c.WriteError("ERR Invalid min-idle-time argument for XAUTOCLAIM")
return
}
if minIdleTime != 0 {
setDirty(c)
c.WriteError("ERR IDLE is unsupported")
return
}

start_, err := formatStreamRangeBound(args[4], true, false)
if err != nil {
Expand Down Expand Up @@ -1142,14 +1137,16 @@ parsing:
return
}

nextCallId, entries := xautoclaim(m.effectiveNow(), *g, start, count, consumer)
nextCallId, entries := xautoclaim(m.effectiveNow(), *g, minIdleTime,
start, count, consumer)
writeXautoclaim(c, nextCallId, entries, justId)
})
}

func xautoclaim(
now time.Time,
g streamGroup,
minIdleTime int,
start string,
count int,
consumerID string,
Expand All @@ -1162,11 +1159,17 @@ func xautoclaim(
msgs := g.pendingAfter(start)
var res []StreamEntry
for i, p := range msgs {
if minIdleTime > 0 && now.Before(p.lastDelivery.Add(time.Duration(minIdleTime)*time.Millisecond)) {
continue
}
g.consumers[consumerID] = consumer{}
p.consumer = consumerID
_, entry := g.stream.get(p.id)
// not found. Weird?
if entry == nil {
// TODO: support third element of return from XAUTOCLAIM, which
// should delete entries not found in the PEL during XAUTOCLAIM.
// (Introduced in Redis 7.0)
continue
}
p.deliveryCount += 1
Expand Down
61 changes: 61 additions & 0 deletions cmd_stream_test.go
Expand Up @@ -838,6 +838,9 @@ func TestStreamAutoClaim(t *testing.T) {
ok(t, err)
defer c.Close()

now := time.Now()
s.SetTime(now)

mustDo(t, c,
"XAUTOCLAIM", "planets", "processing", "alice", "0", "0",
proto.Error("NOGROUP No such key 'planets' or consumer group 'processing'"),
Expand Down Expand Up @@ -879,4 +882,62 @@ func TestStreamAutoClaim(t *testing.T) {
proto.Array(proto.Array(proto.String("0-1"), proto.Strings("name", "Mercury"))),
),
)

// Add an additional item to pending
s.SetTime(now.Add(5000 * time.Millisecond))
mustDo(t, c,
"XADD", "planets", "0-2", "name", "Venus",
proto.String("0-2"),
)
mustDo(t, c,
"XREADGROUP", "GROUP", "processing", "alice", "STREAMS", "planets", ">",
proto.Array(
proto.Array(proto.String("planets"), proto.Array(proto.Array(proto.String("0-2"), proto.Strings("name", "Venus")))),
),
)

// Autoclaim with a min idle time that should not catch any items
s.SetTime(now.Add(10000 * time.Millisecond))
mustDo(t, c,
"XAUTOCLAIM", "planets", "processing", "alice", "15000", "0",
proto.Array(
proto.String("0-0"),
proto.Array(),
),
)

// Set time further in the future where autoclaim with min idle time should
// return only one result
s.SetTime(now.Add(15000 * time.Millisecond))
mustDo(t, c,
"XAUTOCLAIM", "planets", "processing", "alice", "15000", "0",
proto.Array(
proto.String("0-0"),
proto.Array(proto.Array(proto.String("0-1"), proto.Strings("name", "Mercury"))),
),
)

// Further in the future we should return Venus but not Mercury since it is
// claimed more recently
s.SetTime(now.Add(25000 * time.Millisecond))
mustDo(t, c,
"XAUTOCLAIM", "planets", "processing", "alice", "15000", "0",
proto.Array(
proto.String("0-0"),
proto.Array(proto.Array(proto.String("0-2"), proto.Strings("name", "Venus"))),
),
)

// Even further in the future we should return both
s.SetTime(now.Add(40000 * time.Millisecond))
mustDo(t, c,
"XAUTOCLAIM", "planets", "processing", "alice", "15000", "0",
proto.Array(
proto.String("0-0"),
proto.Array(
proto.Array(proto.String("0-1"), proto.Strings("name", "Mercury")),
proto.Array(proto.String("0-2"), proto.Strings("name", "Venus")),
),
),
)
}

0 comments on commit 529c1de

Please sign in to comment.