Skip to content

Commit

Permalink
Add XAutoClaim command (#1780)
Browse files Browse the repository at this point in the history
  • Loading branch information
ericmillin committed Jun 4, 2021
1 parent 6e4eb2e commit 237bad5
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 0 deletions.
116 changes: 116 additions & 0 deletions command.go
Expand Up @@ -1501,6 +1501,122 @@ func (cmd *XPendingExtCmd) readReply(rd *proto.Reader) error {

//------------------------------------------------------------------------------

type XAutoClaimCmd struct {
baseCmd

start string
val []XMessage
}

var _ Cmder = (*XAutoClaimCmd)(nil)

func NewXAutoClaimCmd(ctx context.Context, args ...interface{}) *XAutoClaimCmd {
return &XAutoClaimCmd{
baseCmd: baseCmd{
ctx: ctx,
args: args,
},
}
}

func (cmd *XAutoClaimCmd) Val() (messages []XMessage, start string) {
return cmd.val, cmd.start
}

func (cmd *XAutoClaimCmd) Result() (messages []XMessage, start string, err error) {
return cmd.val, cmd.start, cmd.err
}

func (cmd *XAutoClaimCmd) String() string {
return cmdString(cmd, cmd.val)
}

func (cmd *XAutoClaimCmd) readReply(rd *proto.Reader) error {
_, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
if n != 2 {
return nil, fmt.Errorf("got %d, wanted 2", n)
}
var err error

cmd.start, err = rd.ReadString()
if err != nil {
return nil, err
}

cmd.val, err = readXMessageSlice(rd)
if err != nil {
return nil, err
}

return nil, nil
})
return err
}

//------------------------------------------------------------------------------

type XAutoClaimJustIDCmd struct {
baseCmd

start string
val []string
}

var _ Cmder = (*XAutoClaimJustIDCmd)(nil)

func NewXAutoClaimJustIDCmd(ctx context.Context, args ...interface{}) *XAutoClaimJustIDCmd {
return &XAutoClaimJustIDCmd{
baseCmd: baseCmd{
ctx: ctx,
args: args,
},
}
}

func (cmd *XAutoClaimJustIDCmd) Val() (ids []string, start string) {
return cmd.val, cmd.start
}

func (cmd *XAutoClaimJustIDCmd) Result() (ids []string, start string, err error) {
return cmd.val, cmd.start, cmd.err
}

func (cmd *XAutoClaimJustIDCmd) String() string {
return cmdString(cmd, cmd.val)
}

func (cmd *XAutoClaimJustIDCmd) readReply(rd *proto.Reader) error {
_, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
if n != 2 {
return nil, fmt.Errorf("got %d, wanted 2", n)
}
var err error

cmd.start, err = rd.ReadString()
if err != nil {
return nil, err
}

nn, err := rd.ReadArrayLen()
if err != nil {
return nil, err
}

cmd.val = make([]string, nn)
for i := 0; i < nn; i++ {
cmd.val[i], err = rd.ReadString()
if err != nil {
return nil, err
}
}

return nil, nil
})
return err
}

//------------------------------------------------------------------------------

type XInfoConsumersCmd struct {
baseCmd
val []XInfoConsumer
Expand Down
33 changes: 33 additions & 0 deletions commands.go
Expand Up @@ -1845,6 +1845,39 @@ func (c cmdable) XPendingExt(ctx context.Context, a *XPendingExtArgs) *XPendingE
return cmd
}

type XAutoClaimArgs struct {
Stream string
Group string
MinIdle time.Duration
Start string
Count int64
Consumer string
}

func (c cmdable) XAutoClaim(ctx context.Context, a *XAutoClaimArgs) *XAutoClaimCmd {
args := xAutoClaimArgs(ctx, a)
cmd := NewXAutoClaimCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}

func (c cmdable) XAutoClaimJustID(ctx context.Context, a *XAutoClaimArgs) *XAutoClaimJustIDCmd {
args := xAutoClaimArgs(ctx, a)
args = append(args, "justid")
cmd := NewXAutoClaimJustIDCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}

func xAutoClaimArgs(ctx context.Context, a *XAutoClaimArgs) []interface{} {
args := make([]interface{}, 0, 9)
args = append(args, "xautoclaim", a.Stream, a.Group, a.Consumer, formatMs(ctx, a.MinIdle), a.Start)
if a.Count > 0 {
args = append(args, "count", a.Count)
}
return args
}

type XClaimArgs struct {
Stream string
Group string
Expand Down
37 changes: 37 additions & 0 deletions commands_test.go
Expand Up @@ -4386,6 +4386,43 @@ var _ = Describe("Commands", func() {
Expect(n).To(Equal(int64(3)))
})

It("should XAutoClaim", func() {
xca := &redis.XAutoClaimArgs{
Stream: "stream",
Group: "group",
Consumer: "consumer",
Start: "-",
Count: 2,
}
msgs, start, err := client.XAutoClaim(ctx, xca).Result()
Expect(err).NotTo(HaveOccurred())
Expect(start).To(Equal("2-0"))
Expect(msgs).To(Equal([]redis.XMessage{{
ID: "1-0",
Values: map[string]interface{}{"uno": "un"},
}, {
ID: "2-0",
Values: map[string]interface{}{"dos": "deux"},
}}))

xca.Start = start
msgs, start, err = client.XAutoClaim(ctx, xca).Result()
Expect(err).NotTo(HaveOccurred())
Expect(start).To(Equal("3-0"))
Expect(msgs).To(Equal([]redis.XMessage{{
ID: "2-0",
Values: map[string]interface{}{"dos": "deux"},
}, {
ID: "3-0",
Values: map[string]interface{}{"tres": "troix"},
}}))

ids, start, err := client.XAutoClaimJustID(ctx, xca).Result()
Expect(err).NotTo(HaveOccurred())
Expect(start).To(Equal("3-0"))
Expect(ids).To(Equal([]string{"2-0", "3-0"}))
})

It("should XClaim", func() {
msgs, err := client.XClaim(ctx, &redis.XClaimArgs{
Stream: "stream",
Expand Down

0 comments on commit 237bad5

Please sign in to comment.