Skip to content

Commit

Permalink
Add support for consistent hashing in receivers
Browse files Browse the repository at this point in the history
This commit adds support for distributing series in Receivers using
consistent hashing based on the libketama algorithm.

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed Jun 8, 2022
1 parent 586449e commit 2654534
Show file tree
Hide file tree
Showing 14 changed files with 332 additions and 5,218 deletions.
250 changes: 0 additions & 250 deletions .bingo/alertmanager.sum

This file was deleted.

20 changes: 0 additions & 20 deletions .bingo/goimports.sum

This file was deleted.

1,500 changes: 0 additions & 1,500 deletions .bingo/minio.sum

This file was deleted.

1,055 changes: 0 additions & 1,055 deletions .bingo/prometheus.1.sum

This file was deleted.

1,290 changes: 0 additions & 1,290 deletions .bingo/prometheus.2.sum

This file was deleted.

974 changes: 0 additions & 974 deletions .bingo/prometheus.sum

This file was deleted.

86 changes: 0 additions & 86 deletions .bingo/promu.sum

This file was deleted.

32 changes: 0 additions & 32 deletions .bingo/shfmt.sum

This file was deleted.

1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -18,6 +18,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

- [#5352](https://github.com/thanos-io/thanos/pull/5352) Cache: Add cache metrics to groupcache.
- [#5391](https://github.com/thanos-io/thanos/pull/5391) Receive: Add relabeling support.
- [#5408](https://github.com/thanos-io/thanos/pull/5391) Receive: Add support for consistent hashrings.

### Changed

Expand Down
7 changes: 5 additions & 2 deletions cmd/thanos/receive.go
Expand Up @@ -426,7 +426,7 @@ func setupHashring(g *run.Group,
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
level.Info(logger).Log("msg", "the hashring initialized with config watcher.")
return receive.HashringFromConfigWatcher(ctx, updates, cw)
return receive.HashringFromConfigWatcher(ctx, receive.HashringAlgorithm(conf.hashringsAlgorithm), updates, cw)
}, func(error) {
cancel()
})
Expand All @@ -437,7 +437,7 @@ func setupHashring(g *run.Group,
)
// The Hashrings config file content given initialize configuration from content.
if len(conf.hashringsFileContent) > 0 {
ring, err = receive.HashringFromConfig(conf.hashringsFileContent)
ring, err = receive.HashringFromConfig(receive.HashringAlgorithm(conf.hashringsAlgorithm), conf.hashringsFileContent)
if err != nil {
close(updates)
return errors.Wrap(err, "failed to validate hashring configuration file")
Expand Down Expand Up @@ -718,6 +718,7 @@ type receiveConfig struct {

hashringsFilePath string
hashringsFileContent string
hashringsAlgorithm string

refreshInterval *model.Duration
endpoint string
Expand Down Expand Up @@ -779,6 +780,8 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {

cmd.Flag("receive.hashrings", "Alternative to 'receive.hashrings-file' flag (lower priority). Content of file that contains the hashring configuration.").PlaceHolder("<content>").StringVar(&rc.hashringsFileContent)

cmd.Flag("receive.hashrings-algorithm", "The algorithm used when distributing series in the hashrings.").Default(string(receive.AlgorithmHashmod)).EnumVar(&rc.hashringsAlgorithm, string(receive.AlgorithmHashmod), string(receive.AlgorithmConsistent))

rc.refreshInterval = extkingpin.ModelDuration(cmd.Flag("receive.hashrings-file-refresh-interval", "Refresh interval to re-read the hashring configuration file. (used as a fallback)").
Default("5m"))

Expand Down
3 changes: 3 additions & 0 deletions docs/components/receive.md
Expand Up @@ -126,6 +126,9 @@ Flags:
Alternative to 'receive.hashrings-file' flag
(lower priority). Content of file that contains
the hashring configuration.
--receive.hashrings-algorithm=hashmod
The algorithm used when distributing series in
the hashrings.
--receive.hashrings-file=<path>
Path to file that contains the hashring
configuration. A watcher is initialized to
Expand Down
2 changes: 1 addition & 1 deletion pkg/receive/handler_test.go
Expand Up @@ -324,7 +324,7 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin
cfg[0].Endpoints = append(cfg[0].Endpoints, h.options.Endpoint)
peers.cache[addr] = &fakeRemoteWriteGRPCServer{h: h}
}
hashring := newMultiHashring(cfg)
hashring := newMultiHashring(AlgorithmHashmod, cfg)
for _, h := range handlers {
h.Hashring(hashring)
}
Expand Down

0 comments on commit 2654534

Please sign in to comment.