Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support for XAUTOCLAIM min-idle parameter. #276

Merged
merged 1 commit into from Jun 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 9 additions & 6 deletions cmd_stream.go
Expand Up @@ -1083,11 +1083,6 @@ func (m *Miniredis) cmdXautoclaim(c *server.Peer, cmd string, args []string) {
c.WriteError("ERR Invalid min-idle-time argument for XAUTOCLAIM")
return
}
if minIdleTime != 0 {
setDirty(c)
c.WriteError("ERR IDLE is unsupported")
return
}

start_, err := formatStreamRangeBound(args[4], true, false)
if err != nil {
Expand Down Expand Up @@ -1142,14 +1137,16 @@ parsing:
return
}

nextCallId, entries := xautoclaim(m.effectiveNow(), *g, start, count, consumer)
nextCallId, entries := xautoclaim(m.effectiveNow(), *g, minIdleTime,
start, count, consumer)
writeXautoclaim(c, nextCallId, entries, justId)
})
}

func xautoclaim(
now time.Time,
g streamGroup,
minIdleTime int,
start string,
count int,
consumerID string,
Expand All @@ -1162,11 +1159,17 @@ func xautoclaim(
msgs := g.pendingAfter(start)
var res []StreamEntry
for i, p := range msgs {
if minIdleTime > 0 && now.Before(p.lastDelivery.Add(time.Duration(minIdleTime)*time.Millisecond)) {
continue
}
g.consumers[consumerID] = consumer{}
p.consumer = consumerID
_, entry := g.stream.get(p.id)
// not found. Weird?
if entry == nil {
// TODO: support third element of return from XAUTOCLAIM, which
// should delete entries not found in the PEL during XAUTOCLAIM.
// (Introduced in Redis 7.0)
continue
}
p.deliveryCount += 1
Expand Down
61 changes: 61 additions & 0 deletions cmd_stream_test.go
Expand Up @@ -838,6 +838,9 @@ func TestStreamAutoClaim(t *testing.T) {
ok(t, err)
defer c.Close()

now := time.Now()
s.SetTime(now)

mustDo(t, c,
"XAUTOCLAIM", "planets", "processing", "alice", "0", "0",
proto.Error("NOGROUP No such key 'planets' or consumer group 'processing'"),
Expand Down Expand Up @@ -879,4 +882,62 @@ func TestStreamAutoClaim(t *testing.T) {
proto.Array(proto.Array(proto.String("0-1"), proto.Strings("name", "Mercury"))),
),
)

// Add an additional item to pending
s.SetTime(now.Add(5000 * time.Millisecond))
mustDo(t, c,
"XADD", "planets", "0-2", "name", "Venus",
proto.String("0-2"),
)
mustDo(t, c,
"XREADGROUP", "GROUP", "processing", "alice", "STREAMS", "planets", ">",
proto.Array(
proto.Array(proto.String("planets"), proto.Array(proto.Array(proto.String("0-2"), proto.Strings("name", "Venus")))),
),
)

// Autoclaim with a min idle time that should not catch any items
s.SetTime(now.Add(10000 * time.Millisecond))
mustDo(t, c,
"XAUTOCLAIM", "planets", "processing", "alice", "15000", "0",
proto.Array(
proto.String("0-0"),
proto.Array(),
),
)

// Set time further in the future where autoclaim with min idle time should
// return only one result
s.SetTime(now.Add(15000 * time.Millisecond))
mustDo(t, c,
"XAUTOCLAIM", "planets", "processing", "alice", "15000", "0",
proto.Array(
proto.String("0-0"),
proto.Array(proto.Array(proto.String("0-1"), proto.Strings("name", "Mercury"))),
),
)

// Further in the future we should return Venus but not Mercury since it is
// claimed more recently
s.SetTime(now.Add(25000 * time.Millisecond))
mustDo(t, c,
"XAUTOCLAIM", "planets", "processing", "alice", "15000", "0",
proto.Array(
proto.String("0-0"),
proto.Array(proto.Array(proto.String("0-2"), proto.Strings("name", "Venus"))),
),
)

// Even further in the future we should return both
s.SetTime(now.Add(40000 * time.Millisecond))
mustDo(t, c,
"XAUTOCLAIM", "planets", "processing", "alice", "15000", "0",
proto.Array(
proto.String("0-0"),
proto.Array(
proto.Array(proto.String("0-1"), proto.Strings("name", "Mercury")),
proto.Array(proto.String("0-2"), proto.Strings("name", "Venus")),
),
),
)
}