diff --git a/command.go b/command.go index 9a3018d3b..77d0bada4 100644 --- a/command.go +++ b/command.go @@ -1501,6 +1501,122 @@ func (cmd *XPendingExtCmd) readReply(rd *proto.Reader) error { //------------------------------------------------------------------------------ +type XAutoClaimCmd struct { + baseCmd + + start string + val []XMessage +} + +var _ Cmder = (*XAutoClaimCmd)(nil) + +func NewXAutoClaimCmd(ctx context.Context, args ...interface{}) *XAutoClaimCmd { + return &XAutoClaimCmd{ + baseCmd: baseCmd{ + ctx: ctx, + args: args, + }, + } +} + +func (cmd *XAutoClaimCmd) Val() (messages []XMessage, start string) { + return cmd.val, cmd.start +} + +func (cmd *XAutoClaimCmd) Result() (messages []XMessage, start string, err error) { + return cmd.val, cmd.start, cmd.err +} + +func (cmd *XAutoClaimCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *XAutoClaimCmd) readReply(rd *proto.Reader) error { + _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { + if n != 2 { + return nil, fmt.Errorf("got %d, wanted 2", n) + } + var err error + + cmd.start, err = rd.ReadString() + if err != nil { + return nil, err + } + + cmd.val, err = readXMessageSlice(rd) + if err != nil { + return nil, err + } + + return nil, nil + }) + return err +} + +//------------------------------------------------------------------------------ + +type XAutoClaimJustIDCmd struct { + baseCmd + + start string + val []string +} + +var _ Cmder = (*XAutoClaimJustIDCmd)(nil) + +func NewXAutoClaimJustIDCmd(ctx context.Context, args ...interface{}) *XAutoClaimJustIDCmd { + return &XAutoClaimJustIDCmd{ + baseCmd: baseCmd{ + ctx: ctx, + args: args, + }, + } +} + +func (cmd *XAutoClaimJustIDCmd) Val() (ids []string, start string) { + return cmd.val, cmd.start +} + +func (cmd *XAutoClaimJustIDCmd) Result() (ids []string, start string, err error) { + return cmd.val, cmd.start, cmd.err +} + +func (cmd *XAutoClaimJustIDCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *XAutoClaimJustIDCmd) readReply(rd *proto.Reader) error { + _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { + if n != 2 { + return nil, fmt.Errorf("got %d, wanted 2", n) + } + var err error + + cmd.start, err = rd.ReadString() + if err != nil { + return nil, err + } + + nn, err := rd.ReadArrayLen() + if err != nil { + return nil, err + } + + cmd.val = make([]string, nn) + for i := 0; i < nn; i++ { + cmd.val[i], err = rd.ReadString() + if err != nil { + return nil, err + } + } + + return nil, nil + }) + return err +} + +//------------------------------------------------------------------------------ + type XInfoConsumersCmd struct { baseCmd val []XInfoConsumer diff --git a/commands.go b/commands.go index 4ac42ecdb..b3b15c75d 100644 --- a/commands.go +++ b/commands.go @@ -1845,6 +1845,39 @@ func (c cmdable) XPendingExt(ctx context.Context, a *XPendingExtArgs) *XPendingE return cmd } +type XAutoClaimArgs struct { + Stream string + Group string + MinIdle time.Duration + Start string + Count int64 + Consumer string +} + +func (c cmdable) XAutoClaim(ctx context.Context, a *XAutoClaimArgs) *XAutoClaimCmd { + args := xAutoClaimArgs(ctx, a) + cmd := NewXAutoClaimCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +func (c cmdable) XAutoClaimJustID(ctx context.Context, a *XAutoClaimArgs) *XAutoClaimJustIDCmd { + args := xAutoClaimArgs(ctx, a) + args = append(args, "justid") + cmd := NewXAutoClaimJustIDCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +func xAutoClaimArgs(ctx context.Context, a *XAutoClaimArgs) []interface{} { + args := make([]interface{}, 0, 9) + args = append(args, "xautoclaim", a.Stream, a.Group, a.Consumer, formatMs(ctx, a.MinIdle), a.Start) + if a.Count > 0 { + args = append(args, "count", a.Count) + } + return args +} + type XClaimArgs struct { Stream string Group string diff --git a/commands_test.go b/commands_test.go index 427fec798..11b8592dc 100644 --- a/commands_test.go +++ b/commands_test.go @@ -4386,6 +4386,43 @@ var _ = Describe("Commands", func() { Expect(n).To(Equal(int64(3))) }) + It("should XAutoClaim", func() { + xca := &redis.XAutoClaimArgs{ + Stream: "stream", + Group: "group", + Consumer: "consumer", + Start: "-", + Count: 2, + } + msgs, start, err := client.XAutoClaim(ctx, xca).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(start).To(Equal("2-0")) + Expect(msgs).To(Equal([]redis.XMessage{{ + ID: "1-0", + Values: map[string]interface{}{"uno": "un"}, + }, { + ID: "2-0", + Values: map[string]interface{}{"dos": "deux"}, + }})) + + xca.Start = start + msgs, start, err = client.XAutoClaim(ctx, xca).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(start).To(Equal("3-0")) + Expect(msgs).To(Equal([]redis.XMessage{{ + ID: "2-0", + Values: map[string]interface{}{"dos": "deux"}, + }, { + ID: "3-0", + Values: map[string]interface{}{"tres": "troix"}, + }})) + + ids, start, err := client.XAutoClaimJustID(ctx, xca).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(start).To(Equal("3-0")) + Expect(ids).To(Equal([]string{"2-0", "3-0"})) + }) + It("should XClaim", func() { msgs, err := client.XClaim(ctx, &redis.XClaimArgs{ Stream: "stream",