-
Notifications
You must be signed in to change notification settings - Fork 552
/
seek.go
297 lines (265 loc) · 8.94 KB
/
seek.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
// Copyright 2021 Vectorized, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0
package group
import (
"bufio"
"context"
"fmt"
"strconv"
"strings"
"github.com/spf13/afero"
"github.com/spf13/cobra"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/vectorizedio/redpanda/src/go/rpk/pkg/config"
"github.com/vectorizedio/redpanda/src/go/rpk/pkg/kafka"
"github.com/vectorizedio/redpanda/src/go/rpk/pkg/out"
)
func newSeekCommand(fs afero.Fs) *cobra.Command {
var (
to string
toGroup string
toFile string
topics []string
allowNewTopics bool
)
cmd := &cobra.Command{
Use: "seek [GROUP] --to (start|end|timestamp) --to-group ... --topics ...",
Short: "Modify a group's current offsets.",
Long: `Modify a group's current offsets.
This command allows you to modify a group's offsets. Sometimes, you may need to
rewind a group if you had a mistaken deploy, or fast-forward a group if it is
falling behind on messages that can be skipped.
The --to option allows you to seek to the start of partitions, end of
partitions, or after a specific timestamp. The default is to seek any topic
previously committed. Using --topics allows to you set commits for only the
specified topics; all other commits will remain untouched. Topics with no
commits will not be committed unless allowed with --allow-new-topics.
The --to-group option allows you to seek to commits that are in another group.
This is a merging operation: if g1 is consuming topics A and B, and g2 is
consuming only topic B, "rpk group seek g1 --to-group g2" will update g1's
commits for topic B only. The --topics flag can be used to further narrow which
topics are updated. Unlike --to, all non-filtered topics are committed, even
topics not yet being consumed, meaning --allow-new-topics is not needed.
The --to-file option allows to seek to offsets specified in a text file with
the following format:
[TOPIC] [PARTITION] [OFFSET]
[TOPIC] [PARTITION] [OFFSET]
...
Each line contains the topic, the partition, and the offset to seek to. As with
the prior options, --topics allows filtering which topics are updated. Similar
to --to-group, all non-filtered topics are committed, even topics not yet being
consumed, meaning --allow-new-topics is not needed.
The --to, --to-group, and --to-file options are mutually exclusive. If you are
not authorized to describe or read some topics used in a group, you will not be
able to modify offsets for those topics.
EXAMPLES
Seek group G to June 1st, 2021:
rpk group seek g --to 1622505600
or, rpk group seek g --to 1622505600000
or, rpk group seek g --to 1622505600000000000
Seek group X to the commits of group Y topic foo:
rpk group seek X --to-group Y --topics foo
Seek group G's topics foo, bar, and biz to the end:
rpk group seek G --to end --topics foo,bar,biz
Seek group G to the beginning of a topic it was not previously consuming:
rpk group seek G --to start --topics foo --allow-new-topics
`,
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
p := config.ParamsFromCommand(cmd)
cfg, err := p.Load(fs)
out.MaybeDie(err, "unable to load config: %v", err)
adm, err := kafka.NewAdmin(fs, p, cfg)
out.MaybeDie(err, "unable to initialize kafka client: %v", err)
defer adm.Close()
var n int
for _, f := range []string{to, toGroup, toFile} {
if f != "" {
n++
}
}
switch {
case n == 0:
out.Die("Must specify one --to flag.")
case n == 1:
default:
out.Die("Cannot specify multiple --to flags.")
}
tset := make(map[string]bool)
for _, topic := range topics {
tset[topic] = true
}
group := args[0]
seek(fs, adm, group, to, toGroup, toFile, tset, allowNewTopics)
},
}
cmd.Flags().StringVar(&to, "to", "", "Where to seek (start, end, unix second | millisecond | nanosecond)")
cmd.Flags().StringVar(&toGroup, "to-group", "", "Seek to the commits of another group")
cmd.Flags().StringVar(&toFile, "to-file", "", "Seek to offsets as specified in the file")
cmd.Flags().StringArrayVar(&topics, "topics", nil, "Only seek these topics, if any are specified")
cmd.Flags().BoolVar(&allowNewTopics, "allow-new-topics", false, "Allow seeking to new topics not currently consumed (implied with --to-group or --to-file)")
return cmd
}
func parseSeekFile(
fs afero.Fs, file string, topics map[string]bool,
) (kadm.Offsets, error) {
f, err := fs.Open(file)
if err != nil {
return nil, fmt.Errorf("unable to open %q: %v", file, err)
}
o := make(kadm.Offsets)
s := bufio.NewScanner(f)
for s.Scan() {
line := s.Text()
if len(line) == 0 {
continue
}
fields := strings.Split(line, " ")
if len(fields) == 1 {
fields = strings.Split(line, "\t")
}
if len(fields) != 3 {
return nil, fmt.Errorf("unable to split line %q by space or tab into three fields", line)
}
topic := fields[0]
partition, err := strconv.ParseInt(fields[1], 10, 32)
if err != nil {
return nil, fmt.Errorf("unable to parse partition on line %q: %v", line, err)
}
offset, err := strconv.ParseInt(fields[2], 10, 64)
if err != nil {
return nil, fmt.Errorf("unable to parse offset on line %q: %v", line, err)
}
o.Add(kadm.Offset{
Topic: topic,
Partition: int32(partition),
At: offset,
LeaderEpoch: -1,
})
}
if err := s.Err(); err != nil {
return nil, fmt.Errorf("file %q scan error: %v", file, err)
}
// If we have a filter, keep only what is in the filter.
if len(topics) > 0 {
o.KeepFunc(func(o kadm.Offset) bool { return topics[o.Topic] })
}
return o, nil
}
func seekFetch(
adm *kadm.Client, group string, topics map[string]bool,
) kadm.Offsets {
resps, err := adm.FetchOffsets(context.Background(), group)
if err == nil {
err = resps.Error()
}
out.MaybeDie(err, "unable to fetch offsets for %q: %v", group, err)
// If we have a filter, keep only what is in the filter.
if len(topics) > 0 {
resps.KeepFunc(func(o kadm.OffsetResponse) bool { return topics[o.Topic] })
}
return resps.Into()
}
func seek(
fs afero.Fs,
adm *kadm.Client,
group string,
to string,
toGroup string,
toFile string,
topics map[string]bool,
allowNewTopics bool,
) {
current := seekFetch(adm, group, topics)
var commitTo kadm.Offsets
if toFile != "" {
var err error
commitTo, err = parseSeekFile(fs, toFile, topics)
out.MaybeDieErr(err)
} else if toGroup != "" {
commitTo = seekFetch(adm, toGroup, topics)
} else { // --to, we need to list offsets currently used, as well as any extra
tps := current.TopicsSet()
for topic := range topics {
if _, exists := tps[topic]; !exists && !allowNewTopics {
out.Die("Cannot commit new topic %q without --allow-new-topics.", topic)
}
tps[topic] = map[int32]struct{}{} // ensure exists
}
topics := tps.Topics()
var listed kadm.ListedOffsets
var err error
switch to {
case "start":
listed, err = adm.ListStartOffsets(context.Background(), topics...)
case "end":
listed, err = adm.ListEndOffsets(context.Background(), topics...)
default:
var milli int64
milli, err = strconv.ParseInt(to, 10, 64)
out.MaybeDie(err, "unable to parse millisecond %q: %v", to, err)
switch len(to) {
case 10: // e.g. "1622505600"; sec to milli
milli *= 1000
case 13: // e.g. "1622505600000", already in milli
case 19: // e.g. "1622505600000000000"; nano to milli
milli /= 1e6
default:
out.Die("--to timestamp %q is not a second, nor a millisecond, nor a nanosecond", to)
}
listed, err = adm.ListOffsetsAfterMilli(context.Background(), milli, topics...)
}
if err == nil { // ListOffsets can return ShardErrors, but we want to be entirely successful
err = listed.Error()
}
out.MaybeDie(err, "unable to list all offsets successfully: %v", err)
commitTo = listed.Into()
}
// Finally, we commit.
committed, err := adm.CommitOffsets(context.Background(), group, commitTo)
out.MaybeDie(err, "unable to commit offsets: %v", err)
useErr := committed.Error() != nil
headers := []string{"topic", "partition", "prior-offset", "current-offset"}
if useErr {
headers = append(headers, "error")
}
tw := out.NewTable(headers...)
defer tw.Flush()
for _, c := range committed.Sorted() {
s := seekCommit{c.Topic, c.Partition, -1, -1}
if o, exists := current.Lookup(c.Topic, c.Partition); exists {
s.Prior = o.At
}
if o, exists := commitTo.Lookup(c.Topic, c.Partition); exists {
s.Current = o.At
}
se := seekCommitErr{c.Topic, c.Partition, -1, -1, ""}
if c.Err != nil {
se.Error = c.Err.Error()
}
if useErr {
tw.PrintStructFields(se)
} else {
tw.PrintStructFields(s)
}
}
}
type seekCommit struct {
Topic string
Partition int32
Prior int64
Current int64
}
type seekCommitErr struct {
Topic string
Partition int32
Prior int64
Current int64
Error string
}