Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xgroup/xadd/xtrim supports new options #1787

Merged
merged 2 commits into from Jun 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
118 changes: 103 additions & 15 deletions commands.go
Expand Up @@ -226,15 +226,22 @@ type Cmdable interface {
XGroupCreateMkStream(ctx context.Context, stream, group, start string) *StatusCmd
XGroupSetID(ctx context.Context, stream, group, start string) *StatusCmd
XGroupDestroy(ctx context.Context, stream, group string) *IntCmd
XGroupCreateConsumer(ctx context.Context, stream, group, consumer string) *IntCmd
XGroupDelConsumer(ctx context.Context, stream, group, consumer string) *IntCmd
XReadGroup(ctx context.Context, a *XReadGroupArgs) *XStreamSliceCmd
XAck(ctx context.Context, stream, group string, ids ...string) *IntCmd
XPending(ctx context.Context, stream, group string) *XPendingCmd
XPendingExt(ctx context.Context, a *XPendingExtArgs) *XPendingExtCmd
XClaim(ctx context.Context, a *XClaimArgs) *XMessageSliceCmd
XClaimJustID(ctx context.Context, a *XClaimArgs) *StringSliceCmd

// TODO: XTrim and XTrimApprox remove in v9.
XTrim(ctx context.Context, key string, maxLen int64) *IntCmd
XTrimApprox(ctx context.Context, key string, maxLen int64) *IntCmd
XTrimMaxLen(ctx context.Context, key string, maxLen int64) *IntCmd
XTrimMaxLenApprox(ctx context.Context, key string, maxLen, limit int64) *IntCmd
XTrimMinID(ctx context.Context, key string, minID string) *IntCmd
XTrimMinIDApprox(ctx context.Context, key string, minID string, limit int64) *IntCmd
XInfoGroups(ctx context.Context, key string) *XInfoGroupsCmd
XInfoStream(ctx context.Context, key string) *XInfoStreamCmd
XInfoConsumers(ctx context.Context, key string, group string) *XInfoConsumersCmd
Expand Down Expand Up @@ -1621,22 +1628,50 @@ func (c cmdable) SUnionStore(ctx context.Context, destination string, keys ...st
// - XAddArgs.Values = map[string]interface{}{"key1": "value1", "key2": "value2"}
//
// Note that map will not preserve the order of key-value pairs.
// MaxLen/MaxLenApprox and MinID are in conflict, only one of them can be used.
type XAddArgs struct {
Stream string
MaxLen int64 // MAXLEN N
Stream string
NoMkStream bool
MaxLen int64 // MAXLEN N

// Deprecated: use MaxLen+Approx, remove in v9.
MaxLenApprox int64 // MAXLEN ~ N
ID string
Values interface{}

MinID string
// Approx causes MaxLen and MinID to use "~" matcher (instead of "=").
Approx bool
Limit int64
ID string
Values interface{}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps group MaxLen, MinID, and Approx:

    MaxLen int64
    MinID string
    // Approx causes MaxLen and MinID to use "~" matcher (instead of "=").
    Approx bool


// XAdd a.Limit has a bug, please confirm it and use it.
// issue: https://github.com/redis/redis/issues/9046
func (c cmdable) XAdd(ctx context.Context, a *XAddArgs) *StringCmd {
args := make([]interface{}, 0, 8)
args = append(args, "xadd")
args = append(args, a.Stream)
if a.MaxLen > 0 {
args = append(args, "maxlen", a.MaxLen)
} else if a.MaxLenApprox > 0 {
args := make([]interface{}, 0, 11)
args = append(args, "xadd", a.Stream)
if a.NoMkStream {
args = append(args, "nomkstream")
}
switch {
case a.MaxLen > 0:
if a.Approx {
args = append(args, "maxlen", "~", a.MaxLen)
} else {
args = append(args, "maxlen", a.MaxLen)
}
case a.MaxLenApprox > 0:
// TODO remove in v9.
args = append(args, "maxlen", "~", a.MaxLenApprox)
case a.MinID != "":
if a.Approx {
args = append(args, "minid", "~", a.MinID)
} else {
args = append(args, "minid", a.MinID)
}
}
if a.Limit > 0 {
args = append(args, "limit", a.Limit)
}
if a.ID != "" {
args = append(args, a.ID)
Expand Down Expand Up @@ -1757,6 +1792,12 @@ func (c cmdable) XGroupDestroy(ctx context.Context, stream, group string) *IntCm
return cmd
}

func (c cmdable) XGroupCreateConsumer(ctx context.Context, stream, group, consumer string) *IntCmd {
cmd := NewIntCmd(ctx, "xgroup", "createconsumer", stream, group, consumer)
_ = c(ctx, cmd)
return cmd
}

func (c cmdable) XGroupDelConsumer(ctx context.Context, stream, group, consumer string) *IntCmd {
cmd := NewIntCmd(ctx, "xgroup", "delconsumer", stream, group, consumer)
_ = c(ctx, cmd)
Expand Down Expand Up @@ -1881,16 +1922,63 @@ func xClaimArgs(a *XClaimArgs) []interface{} {
return args
}

func (c cmdable) XTrim(ctx context.Context, key string, maxLen int64) *IntCmd {
cmd := NewIntCmd(ctx, "xtrim", key, "maxlen", maxLen)
// xTrim If approx is true, add the "~" parameter, otherwise it is the default "=" (redis default).
// example:
// XTRIM key MAXLEN/MINID threshold LIMIT limit.
// XTRIM key MAXLEN/MINID ~ threshold LIMIT limit.
// The redis-server version is lower than 6.2, please set limit to 0.
func (c cmdable) xTrim(
ctx context.Context, key, strategy string,
approx bool, threshold interface{}, limit int64,
) *IntCmd {
args := make([]interface{}, 0, 7)
args = append(args, "xtrim", key, strategy)
if approx {
args = append(args, "~")
}
args = append(args, threshold)
if limit > 0 {
args = append(args, "limit", limit)
}
cmd := NewIntCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}

// Deprecated: use XTrimMaxLen, remove in v9.
func (c cmdable) XTrim(ctx context.Context, key string, maxLen int64) *IntCmd {
return c.xTrim(ctx, key, "maxlen", false, maxLen, 0)
}

// Deprecated: use XTrimMaxLenApprox, remove in v9.
func (c cmdable) XTrimApprox(ctx context.Context, key string, maxLen int64) *IntCmd {
cmd := NewIntCmd(ctx, "xtrim", key, "maxlen", "~", maxLen)
_ = c(ctx, cmd)
return cmd
return c.xTrim(ctx, key, "maxlen", true, maxLen, 0)
}

// XTrimMaxLen No `~` rules are used, `limit` cannot be used.
// cmd: XTRIM key MAXLEN maxLen
func (c cmdable) XTrimMaxLen(ctx context.Context, key string, maxLen int64) *IntCmd {
return c.xTrim(ctx, key, "maxlen", false, maxLen, 0)
}

// XTrimMaxLenApprox LIMIT has a bug, please confirm it and use it.
// issue: https://github.com/redis/redis/issues/9046
// cmd: XTRIM key MAXLEN ~ maxLen LIMIT limit
func (c cmdable) XTrimMaxLenApprox(ctx context.Context, key string, maxLen, limit int64) *IntCmd {
return c.xTrim(ctx, key, "maxlen", true, maxLen, limit)
}

// XTrimMinID No `~` rules are used, `limit` cannot be used.
// cmd: XTRIM key MINID minID
func (c cmdable) XTrimMinID(ctx context.Context, key string, minID string) *IntCmd {
return c.xTrim(ctx, key, "minid", false, minID, 0)
}

// XTrimMinIDApprox LIMIT has a bug, please confirm it and use it.
// issue: https://github.com/redis/redis/issues/9046
// cmd: XTRIM key MINID ~ minID LIMIT limit
func (c cmdable) XTrimMinIDApprox(ctx context.Context, key string, minID string, limit int64) *IntCmd {
return c.xTrim(ctx, key, "minid", true, minID, limit)
}

func (c cmdable) XInfoConsumers(ctx context.Context, key string, group string) *XInfoConsumersCmd {
Expand Down
55 changes: 54 additions & 1 deletion commands_test.go
Expand Up @@ -4104,18 +4104,47 @@ var _ = Describe("Commands", func() {
Expect(id).To(Equal("3-0"))
})

// TODO remove in v9.
It("should XTrim", func() {
n, err := client.XTrim(ctx, "stream", 0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(3)))
})

// TODO remove in v9.
It("should XTrimApprox", func() {
n, err := client.XTrimApprox(ctx, "stream", 0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(3)))
})

// TODO XTrimMaxLenApprox/XTrimMinIDApprox There is a bug in the limit parameter.
// TODO Don't test it for now.
// TODO link: https://github.com/redis/redis/issues/9046
It("should XTrimMaxLen", func() {
n, err := client.XTrimMaxLen(ctx, "stream", 0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(3)))
})

It("should XTrimMaxLenApprox", func() {
n, err := client.XTrimMaxLenApprox(ctx, "stream", 0, 0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(3)))
})

It("should XTrimMinID", func() {
n, err := client.XTrimMinID(ctx, "stream", "4-0").Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(3)))
})

It("should XTrimMinIDApprox", func() {
n, err := client.XTrimMinIDApprox(ctx, "stream", "4-0", 0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(3)))
})

It("should XAdd", func() {
id, err := client.XAdd(ctx, &redis.XAddArgs{
Stream: "stream",
Expand All @@ -4133,6 +4162,9 @@ var _ = Describe("Commands", func() {
}))
})

// TODO XAdd There is a bug in the limit parameter.
// TODO Don't test it for now.
// TODO link: https://github.com/redis/redis/issues/9046
It("should XAdd with MaxLen", func() {
id, err := client.XAdd(ctx, &redis.XAddArgs{
Stream: "stream",
Expand All @@ -4148,6 +4180,21 @@ var _ = Describe("Commands", func() {
}))
})

It("should XAdd with MinID", func() {
id, err := client.XAdd(ctx, &redis.XAddArgs{
Stream: "stream",
MinID: "5-0",
ID: "4-0",
Values: map[string]interface{}{"quatro": "quatre"},
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(id).To(Equal("4-0"))

vals, err := client.XRange(ctx, "stream", "-", "+").Result()
Expect(err).NotTo(HaveOccurred())
Expect(vals).To(HaveLen(0))
})

It("should XDel", func() {
n, err := client.XDel(ctx, "stream", "1-0", "2-0", "3-0").Result()
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -4380,8 +4427,14 @@ var _ = Describe("Commands", func() {
infoExt, err = client.XPendingExt(ctx, args).Result()
Expect(err).NotTo(HaveOccurred())
Expect(infoExt).To(HaveLen(0))
})

It("should XGroup Create Delete Consumer", func() {
n, err := client.XGroupCreateConsumer(ctx, "stream", "group", "c1").Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(1)))

n, err := client.XGroupDelConsumer(ctx, "stream", "group", "consumer").Result()
n, err = client.XGroupDelConsumer(ctx, "stream", "group", "consumer").Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(3)))
})
Expand Down