Skip to content

Commit

Permalink
extend XPENDING and add support for `XGROUP {CREATECONSUMER,DESTROY…
Browse files Browse the repository at this point in the history
…,DELCONSUMER}`, `XINFO {CONSUMERS,GROUPS}`, `XCLAIM`
  • Loading branch information
Sandy Harvie committed Aug 9, 2022
1 parent 7879d48 commit e3776cb
Show file tree
Hide file tree
Showing 17 changed files with 1,835 additions and 75 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Expand Up @@ -12,7 +12,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
go: [ '1.14', '1.17', '1.18' ]
go: [ '1.14', '1.17', '1.18', '1.19' ]
name: Go ${{ matrix.go }}
steps:
- uses: actions/checkout@v2
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Expand Up @@ -2,3 +2,5 @@
/integration/dump.rdb
*.swp
/integration/nodes.conf
.idea/
miniredis.iml
6 changes: 6 additions & 0 deletions README.md
Expand Up @@ -180,9 +180,15 @@ Implemented commands:
- XACK
- XADD
- XAUTOCLAIM
- XCLAIM
- XDEL
- XGROUP CREATE
- XGROUP CREATECONSUMER
- XGROUP DESTROY
- XGROUP DELCONSUMER
- XINFO STREAM -- partly
- XINFO GROUPS
- XINFO CONSUMERS -- partly
- XLEN
- XRANGE
- XREAD
Expand Down
156 changes: 150 additions & 6 deletions cmd_list.go
Expand Up @@ -23,6 +23,7 @@ func commandsList(m *Miniredis) {
m.srv.Register("BRPOP", m.cmdBrpop)
m.srv.Register("BRPOPLPUSH", m.cmdBrpoplpush)
m.srv.Register("LINDEX", m.cmdLindex)
m.srv.Register("LPOS", m.cmdLpos)
m.srv.Register("LINSERT", m.cmdLinsert)
m.srv.Register("LLEN", m.cmdLlen)
m.srv.Register("LPOP", m.cmdLpop)
Expand Down Expand Up @@ -165,6 +166,153 @@ func (m *Miniredis) cmdLindex(c *server.Peer, cmd string, args []string) {
})
}

// LPOS key element [RANK rank] [COUNT num-matches] [MAXLEN len]
func (m *Miniredis) cmdLpos(c *server.Peer, cmd string, args []string) {
if !m.handleAuth(c) {
return
}
if m.checkPubsub(c, cmd) {
return
}

if len(args) == 1 {
setDirty(c)
c.WriteError(errWrongNumber(cmd))
return
}

// Extract options from arguments if present.
//
// Redis allows duplicate options and uses the last specified.
// `LPOS key term RANK 1 RANK 2` is effectively the same as
// `LPOS key term RANK 2`
if len(args)%2 == 1 {
setDirty(c)
c.WriteError(msgSyntaxError)
return
}
rank, count := 1, 1 // Default values
var maxlen int // Default value is the list length (see below)
var countSpecified, maxlenSpecified bool
if len(args) > 2 {
for i := 2; i < len(args); i++ {
if i%2 == 0 {
val := args[i+1]
var err error
switch strings.ToLower(args[i]) {
case "rank":
if rank, err = strconv.Atoi(val); err != nil {
setDirty(c)
c.WriteError(msgInvalidInt)
return
}
if rank == 0 {
setDirty(c)
c.WriteError(msgRankIsZero)
return
}
case "count":
countSpecified = true
if count, err = strconv.Atoi(val); err != nil || count < 0 {
setDirty(c)
c.WriteError(msgCountIsNegative)
return
}
case "maxlen":
maxlenSpecified = true
if maxlen, err = strconv.Atoi(val); err != nil || maxlen < 0 {
setDirty(c)
c.WriteError(msgMaxLengthIsNegative)
return
}
default:
setDirty(c)
c.WriteError(msgSyntaxError)
return
}
}
}
}

withTx(m, c, func(c *server.Peer, ctx *connCtx) {
db := m.db(ctx.selectedDB)
key, element := args[0], args[1]
t, ok := db.keys[key]
if !ok {
// No such key
c.WriteNull()
return
}
if t != "list" {
c.WriteError(msgWrongType)
return
}
l := db.listKeys[key]

// RANK cannot be zero (see above).
// If RANK is positive search forward (left to right).
// If RANK is negative search backward (right to left).
// Iterator returns true to continue iterating.
iterate := func(iterator func(i int, e string) bool) {
comparisons := len(l)
// Only use max length if specified, not zero, and less than total length.
// When max length is specified, but is zero, this means "unlimited".
if maxlenSpecified && maxlen != 0 && maxlen < len(l) {
comparisons = maxlen
}
if rank > 0 {
for i := 0; i < comparisons; i++ {
if resume := iterator(i, l[i]); !resume {
return
}
}
} else if rank < 0 {
start := len(l) - 1
end := len(l) - comparisons
for i := start; i >= end; i-- {
if resume := iterator(i, l[i]); !resume {
return
}
}
}
}

var currentRank, currentCount int
vals := make([]int, 0, count)
iterate(func(i int, e string) bool {
if e == element {
currentRank++
// Only collect values only after surpassing the absolute value of rank.
if rank > 0 && currentRank < rank {
return true
}
if rank < 0 && currentRank < -rank {
return true
}
vals = append(vals, i)
currentCount++
if currentCount == count {
return false
}
}
return true
})

if !countSpecified && len(vals) == 0 {
c.WriteNull()
return
}
if !countSpecified && len(vals) == 1 {
c.WriteInt(vals[0])
return
}
c.WriteLen(len(vals))
for _, val := range vals {
c.WriteInt(val)
}
})
}

// LINSERT
func (m *Miniredis) cmdLinsert(c *server.Peer, cmd string, args []string) {
if len(args) != 4 {
Expand Down Expand Up @@ -297,18 +445,14 @@ func (m *Miniredis) cmdXpop(c *server.Peer, cmd string, args []string, lr leftri

opts.key, args = args[0], args[1:]
if len(args) > 0 {
v, err := strconv.Atoi(args[0])
if err != nil {
setDirty(c)
c.WriteError(msgInvalidInt)
if ok := optInt(c, args[0], &opts.count); !ok {
return
}
if v < 0 {
if opts.count < 0 {
setDirty(c)
c.WriteError(msgOutOfRange)
return
}
opts.count = v
opts.withCount = true
args = args[1:]
}
Expand Down

0 comments on commit e3776cb

Please sign in to comment.