From 2a07e3505aacb06a004522383726c32a1ccea2ab Mon Sep 17 00:00:00 2001 From: Praveen Upadhyay Date: Wed, 9 May 2018 17:49:55 +0530 Subject: [PATCH] Updating with the upstream master version (#1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Return error instead of returning nil * Make Close idempotent * CI update * Auto-discover topics * fix test due to sarama's change in sync_producer's config * fix the doc for Consumer.Notifications * Bump deps * Add HighWaterMarks support ref: HighWaterMarks returns the current high water marks for each topic and partition https://godoc.org/github.com/Shopify/sarama#Consumer * Bump Sarama dependency, use latest Kafka version * Allow to stash offsets when processing batches * Added support method * Subscribe to new topics * Add troubleshooting information regarding consumer not receiving messages * Allow to subscribe to topics via regular expressions * Formatting * Fix troubleshooting formatting. * Added example * Handle authz limited to specific topics * express it a bit simpler * move metadata refresh to consumer method * Fix comparison operator * Allow custom group member UserData to be included when joining the group * Commit every time, even if not dirty. Updated README and CI config. * Updated example * Fix off by one error when marking offset * Updated README * Test with Kafka 0.9 * Partially revert #117. Force-commit when Consumer.Offsets.Retention is set * Remove retention time workaround. Offset retention must be > than log retention. * Allow sarama-cluster offset synchronization dwell time to be configured separately from Sarama MaxProcessingTime * Fix typo and dwell time validation error handling * Remove sleep and replace with select statement. Switch from dwell time as pointer to value. * Switch sleep to select with speedy bailout on close * Better config validation * Ported from glide to dep * Fix CI * Fix CI * Ensure notifications are issued (and consumed) before rebalancing is completed * Fix CI * Bump patch version# * Simpler tests * Fix seeding * Test with Go 1.9 * Remove delay * Reduce memory requirements * Add consumer test * Force rebalance when partition consumers exit with an error * Adding notifications on rebalance start * Stop sharing clients as this is not something Kafka allows us to do * Expose individual partitions * Respond to feedback * Adding whitelist example * Addressing feedback. Always multiplex errors * Updated README * Allow to re-use clients * Adding comment to clarify usage * Update CI settings * Make Close thread-safe * More robust * Fix race, added clarifying comment to CommitOffsets * Fix deps * Fix the kafka source - 0.9.0.1 testing broken * Support Kafka 1.0 * Drain channels on Close * Allow marking of earlier offsets This commit is in alignment with the sarama library commits https://github.com/Shopify/sarama/commit/b966238f31778386bce80c28a12fde729e01cab4#diff-3ca6d659defd100fe2de43adf2b8f41e https://github.com/Shopify/sarama/commit/96fa1c837819761fc5c706010a9677aab15ca46d#diff-3ca6d659defd100fe2de43adf2b8f41e * Address comments and add tests * Ensure we have random topic name and reduce time * Fix test timing for kafka ≥ 0.11.0 * Remove unncessary expect clause * Address comments + gofmt * Unify PartitionConsumer interface * New release * Fix NPE on ResetOffsets and MarkOffsets * Use correct request proto version * Expose offset methods on partition consumers * Fix/remove test * Bump CI config * Fewer builds * add Offset() to PartitionConsumer interface (#221) * add Offset() to PartitionConsumer interface * rename Offset to InitialOffset --- .travis.yml | 19 +- Gopkg.lock | 154 +++++++++ Gopkg.toml | 26 ++ LICENSE | 22 ++ Makefile | 16 +- README.md | 156 +++++++-- README.md.tpl | 67 ++++ balancer.go | 56 +++- balancer_test.go | 16 +- client.go | 26 +- client_test.go | 31 ++ cluster.go | 41 --- cluster_test.go | 52 +-- cmd/sarama-cluster-cli/main.go | 58 ++-- config.go | 54 +++- config_test.go | 1 + consumer.go | 564 ++++++++++++++++++++++++--------- consumer_test.go | 267 ++++++++++++---- examples_test.go | 123 +++++++ glide.lock | 56 ---- glide.yaml | 4 - offsets.go | 69 ++++ offsets_test.go | 87 +++++ partitions.go | 180 ++++++++--- partitions_test.go | 63 ++-- testdata/server.properties | 1 + util.go | 75 +++++ 27 files changed, 1780 insertions(+), 504 deletions(-) create mode 100644 Gopkg.lock create mode 100644 Gopkg.toml create mode 100644 LICENSE create mode 100644 README.md.tpl create mode 100644 client_test.go create mode 100644 examples_test.go delete mode 100644 glide.lock delete mode 100644 glide.yaml create mode 100644 offsets.go create mode 100644 offsets_test.go create mode 100644 util.go diff --git a/.travis.yml b/.travis.yml index 781ca83..ab8c2f4 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,14 +1,19 @@ sudo: false language: go go: - - 1.7 - - 1.6.3 - - 1.5.4 + - 1.10.x + - 1.9.x + - 1.8.x install: - - go get -u github.com/Masterminds/glide - - glide install + - go get -u github.com/golang/dep/cmd/dep + - dep ensure env: - - SCALA_VERSION=2.11 KAFKA_VERSION=0.9.0.1 GO15VENDOREXPERIMENT=1 - - SCALA_VERSION=2.11 KAFKA_VERSION=0.10.0.0 GO15VENDOREXPERIMENT=1 + - SCALA_VERSION=2.12 KAFKA_VERSION=0.10.2.1 + - SCALA_VERSION=2.12 KAFKA_VERSION=0.11.0.1 + - SCALA_VERSION=2.12 KAFKA_VERSION=1.0.1 script: - make default test-race +addons: + apt: + packages: + - oracle-java8-set-default diff --git a/Gopkg.lock b/Gopkg.lock new file mode 100644 index 0000000..3ab8b6a --- /dev/null +++ b/Gopkg.lock @@ -0,0 +1,154 @@ +# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'. + + +[[projects]] + name = "github.com/Shopify/sarama" + packages = ["."] + revision = "3b1b38866a79f06deddf0487d5c27ba0697ccd65" + version = "v1.15.0" + +[[projects]] + name = "github.com/davecgh/go-spew" + packages = ["spew"] + revision = "346938d642f2ec3594ed81d874461961cd0faa76" + version = "v1.1.0" + +[[projects]] + name = "github.com/eapache/go-resiliency" + packages = ["breaker"] + revision = "6800482f2c813e689c88b7ed3282262385011890" + version = "v1.0.0" + +[[projects]] + branch = "master" + name = "github.com/eapache/go-xerial-snappy" + packages = ["."] + revision = "bb955e01b9346ac19dc29eb16586c90ded99a98c" + +[[projects]] + name = "github.com/eapache/queue" + packages = ["."] + revision = "44cc805cf13205b55f69e14bcb69867d1ae92f98" + version = "v1.1.0" + +[[projects]] + branch = "master" + name = "github.com/golang/snappy" + packages = ["."] + revision = "553a641470496b2327abcac10b36396bd98e45c9" + +[[projects]] + name = "github.com/onsi/ginkgo" + packages = [ + ".", + "config", + "extensions/table", + "internal/codelocation", + "internal/containernode", + "internal/failer", + "internal/leafnodes", + "internal/remote", + "internal/spec", + "internal/spec_iterator", + "internal/specrunner", + "internal/suite", + "internal/testingtproxy", + "internal/writer", + "reporters", + "reporters/stenographer", + "reporters/stenographer/support/go-colorable", + "reporters/stenographer/support/go-isatty", + "types" + ] + revision = "9eda700730cba42af70d53180f9dcce9266bc2bc" + version = "v1.4.0" + +[[projects]] + name = "github.com/onsi/gomega" + packages = [ + ".", + "format", + "internal/assertion", + "internal/asyncassertion", + "internal/oraclematcher", + "internal/testingtsupport", + "matchers", + "matchers/support/goraph/bipartitegraph", + "matchers/support/goraph/edge", + "matchers/support/goraph/node", + "matchers/support/goraph/util", + "types" + ] + revision = "003f63b7f4cff3fc95357005358af2de0f5fe152" + version = "v1.3.0" + +[[projects]] + name = "github.com/pierrec/lz4" + packages = ["."] + revision = "2fcda4cb7018ce05a25959d2fe08c83e3329f169" + version = "v1.1" + +[[projects]] + name = "github.com/pierrec/xxHash" + packages = ["xxHash32"] + revision = "f051bb7f1d1aaf1b5a665d74fb6b0217712c69f7" + version = "v0.1.1" + +[[projects]] + branch = "master" + name = "github.com/rcrowley/go-metrics" + packages = ["."] + revision = "8732c616f52954686704c8645fe1a9d59e9df7c1" + +[[projects]] + branch = "master" + name = "golang.org/x/net" + packages = [ + "html", + "html/atom", + "html/charset" + ] + revision = "0ed95abb35c445290478a5348a7b38bb154135fd" + +[[projects]] + branch = "master" + name = "golang.org/x/sys" + packages = ["unix"] + revision = "3dbebcf8efb6a5011a60c2b4591c1022a759af8a" + +[[projects]] + branch = "master" + name = "golang.org/x/text" + packages = [ + "encoding", + "encoding/charmap", + "encoding/htmlindex", + "encoding/internal", + "encoding/internal/identifier", + "encoding/japanese", + "encoding/korean", + "encoding/simplifiedchinese", + "encoding/traditionalchinese", + "encoding/unicode", + "internal/gen", + "internal/tag", + "internal/utf8internal", + "language", + "runes", + "transform", + "unicode/cldr" + ] + revision = "e19ae1496984b1c655b8044a65c0300a3c878dd3" + +[[projects]] + branch = "v2" + name = "gopkg.in/yaml.v2" + packages = ["."] + revision = "d670f9405373e636a5a2765eea47fac0c9bc91a4" + +[solve-meta] + analyzer-name = "dep" + analyzer-version = 1 + inputs-digest = "2fa33a2d1ae87e0905ef09332bb4b3fda29179f6bcd48fd3b94070774b9e458b" + solver-name = "gps-cdcl" + solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml new file mode 100644 index 0000000..1eecfef --- /dev/null +++ b/Gopkg.toml @@ -0,0 +1,26 @@ + +# Gopkg.toml example +# +# Refer to https://github.com/golang/dep/blob/master/docs/Gopkg.toml.md +# for detailed Gopkg.toml documentation. +# +# required = ["github.com/user/thing/cmd/thing"] +# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"] +# +# [[constraint]] +# name = "github.com/user/project" +# version = "1.0.0" +# +# [[constraint]] +# name = "github.com/user/project2" +# branch = "dev" +# source = "github.com/myfork/project2" +# +# [[override]] +# name = "github.com/x/y" +# version = "2.4.0" + + +[[constraint]] + name = "github.com/Shopify/sarama" + version = "^1.14.0" diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..127751c --- /dev/null +++ b/LICENSE @@ -0,0 +1,22 @@ +(The MIT License) + +Copyright (c) 2017 Black Square Media Ltd + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +'Software'), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, +TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE +SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/Makefile b/Makefile index a54a5cb..0e5192b 100644 --- a/Makefile +++ b/Makefile @@ -1,10 +1,9 @@ -SCALA_VERSION?= 2.11 -KAFKA_VERSION?= 0.10.0.0 +SCALA_VERSION?= 2.12 +KAFKA_VERSION?= 1.0.1 KAFKA_DIR= kafka_$(SCALA_VERSION)-$(KAFKA_VERSION) -KAFKA_SRC= http://www.mirrorservice.org/sites/ftp.apache.org/kafka/$(KAFKA_VERSION)/$(KAFKA_DIR).tgz +KAFKA_SRC= https://archive.apache.org/dist/kafka/$(KAFKA_VERSION)/$(KAFKA_DIR).tgz KAFKA_ROOT= testdata/$(KAFKA_DIR) - -PKG:=$(shell glide nv) +PKG=$(shell go list ./... | grep -v vendor) default: vet test @@ -22,10 +21,15 @@ test-race: testdeps testdeps: $(KAFKA_ROOT) -.PHONY: test testdeps vet +doc: README.md + +.PHONY: test testdeps vet doc # --------------------------------------------------------------------- $(KAFKA_ROOT): @mkdir -p $(dir $@) cd $(dir $@) && curl -sSL $(KAFKA_SRC) | tar xz + +README.md: README.md.tpl $(wildcard *.go) + becca -package $(subst $(GOPATH)/src/,,$(PWD)) diff --git a/README.md b/README.md index ec114ed..ebcd755 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,9 @@ -# Sarama Cluster [![Build Status](https://travis-ci.org/bsm/sarama-cluster.png)](https://travis-ci.org/bsm/sarama-cluster) +# Sarama Cluster + +[![GoDoc](https://godoc.org/github.com/bsm/sarama-cluster?status.svg)](https://godoc.org/github.com/bsm/sarama-cluster) +[![Build Status](https://travis-ci.org/bsm/sarama-cluster.svg?branch=master)](https://travis-ci.org/bsm/sarama-cluster) +[![Go Report Card](https://goreportcard.com/badge/github.com/bsm/sarama-cluster)](https://goreportcard.com/report/github.com/bsm/sarama-cluster) +[![License](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT) Cluster extensions for [Sarama](https://github.com/Shopify/sarama), the Go client library for Apache Kafka 0.9 (and later). @@ -6,6 +11,128 @@ Cluster extensions for [Sarama](https://github.com/Shopify/sarama), the Go clien Documentation and example are available via godoc at http://godoc.org/github.com/bsm/sarama-cluster +## Examples + +Consumers have two modes of operation. In the default multiplexed mode messages (and errors) of multiple +topics and partitions are all passed to the single channel: + +```go +package main + +import ( + "fmt" + "log" + "os" + "os/signal" + + cluster "github.com/bsm/sarama-cluster" +) + +func main() { + + // init (custom) config, enable errors and notifications + config := cluster.NewConfig() + config.Consumer.Return.Errors = true + config.Group.Return.Notifications = true + + // init consumer + brokers := []string{"127.0.0.1:9092"} + topics := []string{"my_topic", "other_topic"} + consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config) + if err != nil { + panic(err) + } + defer consumer.Close() + + // trap SIGINT to trigger a shutdown. + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Interrupt) + + // consume errors + go func() { + for err := range consumer.Errors() { + log.Printf("Error: %s\n", err.Error()) + } + }() + + // consume notifications + go func() { + for ntf := range consumer.Notifications() { + log.Printf("Rebalanced: %+v\n", ntf) + } + }() + + // consume messages, watch signals + for { + select { + case msg, ok := <-consumer.Messages(): + if ok { + fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value) + consumer.MarkOffset(msg, "") // mark message as processed + } + case <-signals: + return + } + } +} +``` + +Users who require access to individual partitions can use the partitioned mode which exposes access to partition-level +consumers: + +```go +package main + +import ( + "fmt" + "log" + "os" + "os/signal" + + cluster "github.com/bsm/sarama-cluster" +) + +func main() { + + // init (custom) config, set mode to ConsumerModePartitions + config := cluster.NewConfig() + config.Group.Mode = cluster.ConsumerModePartitions + + // init consumer + brokers := []string{"127.0.0.1:9092"} + topics := []string{"my_topic", "other_topic"} + consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config) + if err != nil { + panic(err) + } + defer consumer.Close() + + // trap SIGINT to trigger a shutdown. + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Interrupt) + + // consume partitions + for { + select { + case part, ok := <-consumer.Partitions(): + if !ok { + return + } + + // start a separate goroutine to consume messages + go func(pc cluster.PartitionConsumer) { + for msg := range pc.Messages() { + fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value) + consumer.MarkOffset(msg, "") // mark message as processed + } + }(part) + case <-signals: + return + } + } +} +``` + ## Running tests You need to install Ginkgo & Gomega to run tests. Please see @@ -13,29 +140,12 @@ http://onsi.github.io/ginkgo for more details. To run tests, call: - $ make test - -## Licence - - (The MIT License) + $ make test - Copyright (c) 2016 Black Square Media Ltd +## Troubleshooting - Permission is hereby granted, free of charge, to any person obtaining - a copy of this software and associated documentation files (the - 'Software'), to deal in the Software without restriction, including - without limitation the rights to use, copy, modify, merge, publish, - distribute, sublicense, and/or sell copies of the Software, and to - permit persons to whom the Software is furnished to do so, subject to - the following conditions: +### Consumer not receiving any messages? - The above copyright notice and this permission notice shall be - included in all copies or substantial portions of the Software. +By default, sarama's `Config.Consumer.Offsets.Initial` is set to `sarama.OffsetNewest`. This means that in the event that a brand new consumer is created, and it has never committed any offsets to kafka, it will only receive messages starting from the message after the current one that was written. - THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, - EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF - MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. - IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY - CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, - TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE - SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +If you wish to receive all messages (from the start of all messages in the topic) in the event that a consumer does not have any offsets committed to kafka, you need to set `Config.Consumer.Offsets.Initial` to `sarama.OffsetOldest`. diff --git a/README.md.tpl b/README.md.tpl new file mode 100644 index 0000000..5f63a69 --- /dev/null +++ b/README.md.tpl @@ -0,0 +1,67 @@ +# Sarama Cluster + +[![GoDoc](https://godoc.org/github.com/bsm/sarama-cluster?status.svg)](https://godoc.org/github.com/bsm/sarama-cluster) +[![Build Status](https://travis-ci.org/bsm/sarama-cluster.svg?branch=master)](https://travis-ci.org/bsm/sarama-cluster) +[![Go Report Card](https://goreportcard.com/badge/github.com/bsm/sarama-cluster)](https://goreportcard.com/report/github.com/bsm/sarama-cluster) +[![License](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT) + +Cluster extensions for [Sarama](https://github.com/Shopify/sarama), the Go client library for Apache Kafka 0.9 (and later). + +## Documentation + +Documentation and example are available via godoc at http://godoc.org/github.com/bsm/sarama-cluster + +## Examples + +Consumers have two modes of operation. In the default multiplexed mode messages (and errors) of multiple +topics and partitions are all passed to the single channel: + +```go +package main + +import ( + "fmt" + "log" + "os" + "os/signal" + + cluster "github.com/bsm/sarama-cluster" +) + +func main() {{ "ExampleConsumer" | code }} +``` + +Users who require access to individual partitions can use the partitioned mode which exposes access to partition-level +consumers: + +```go +package main + +import ( + "fmt" + "log" + "os" + "os/signal" + + cluster "github.com/bsm/sarama-cluster" +) + +func main() {{ "ExampleConsumer_Partitions" | code }} +``` + +## Running tests + +You need to install Ginkgo & Gomega to run tests. Please see +http://onsi.github.io/ginkgo for more details. + +To run tests, call: + + $ make test + +## Troubleshooting + +### Consumer not receiving any messages? + +By default, sarama's `Config.Consumer.Offsets.Initial` is set to `sarama.OffsetNewest`. This means that in the event that a brand new consumer is created, and it has never committed any offsets to kafka, it will only receive messages starting from the message after the current one that was written. + +If you wish to receive all messages (from the start of all messages in the topic) in the event that a consumer does not have any offsets committed to kafka, you need to set `Config.Consumer.Offsets.Initial` to `sarama.OffsetOldest`. diff --git a/balancer.go b/balancer.go index d66ef71..3aeaece 100644 --- a/balancer.go +++ b/balancer.go @@ -7,8 +7,34 @@ import ( "github.com/Shopify/sarama" ) -// Notification events are emitted by the consumers on rebalancing +// NotificationType defines the type of notification +type NotificationType uint8 + +// String describes the notification type +func (t NotificationType) String() string { + switch t { + case RebalanceStart: + return "rebalance start" + case RebalanceOK: + return "rebalance OK" + case RebalanceError: + return "rebalance error" + } + return "unknown" +} + +const ( + UnknownNotification NotificationType = iota + RebalanceStart + RebalanceOK + RebalanceError +) + +// Notification are state events emitted by the consumers on rebalance type Notification struct { + // Type exposes the notification type + Type NotificationType + // Claimed contains topic/partitions that were claimed by this rebalance cycle Claimed map[string][]int32 @@ -19,23 +45,27 @@ type Notification struct { Current map[string][]int32 } -func newNotification(released map[string][]int32) *Notification { +func newNotification(current map[string][]int32) *Notification { return &Notification{ - Claimed: make(map[string][]int32), - Released: released, - Current: make(map[string][]int32), + Type: RebalanceStart, + Current: current, } } -func (n *Notification) claim(current map[string][]int32) { - previous := n.Released +func (n *Notification) success(current map[string][]int32) *Notification { + o := &Notification{ + Type: RebalanceOK, + Claimed: make(map[string][]int32), + Released: make(map[string][]int32), + Current: current, + } for topic, partitions := range current { - n.Claimed[topic] = int32Slice(partitions).Diff(int32Slice(previous[topic])) + o.Claimed[topic] = int32Slice(partitions).Diff(int32Slice(n.Current[topic])) } - for topic, partitions := range previous { - n.Released[topic] = int32Slice(partitions).Diff(int32Slice(current[topic])) + for topic, partitions := range n.Current { + o.Released[topic] = int32Slice(partitions).Diff(int32Slice(current[topic])) } - n.Current = current + return o } // -------------------------------------------------------------------- @@ -127,10 +157,6 @@ func (r *balancer) Topic(name string, memberID string) error { } func (r *balancer) Perform(s Strategy) map[string]map[string][]int32 { - if r == nil { - return nil - } - res := make(map[string]map[string][]int32, 1) for topic, info := range r.topics { for memberID, partitions := range info.Perform(s) { diff --git a/balancer_test.go b/balancer_test.go index eb474e8..0334c18 100644 --- a/balancer_test.go +++ b/balancer_test.go @@ -9,18 +9,24 @@ import ( var _ = Describe("Notification", func() { - It("should init and update", func() { + It("should init and convert", func() { n := newNotification(map[string][]int32{ "a": {1, 2, 3}, "b": {4, 5}, "c": {1, 2}, }) - n.claim(map[string][]int32{ + Expect(n).To(Equal(&Notification{ + Type: RebalanceStart, + Current: map[string][]int32{"a": {1, 2, 3}, "b": {4, 5}, "c": {1, 2}}, + })) + + o := n.success(map[string][]int32{ "a": {3, 4}, "b": {1, 2, 3, 4}, "d": {3, 4}, }) - Expect(n).To(Equal(&Notification{ + Expect(o).To(Equal(&Notification{ + Type: RebalanceOK, Claimed: map[string][]int32{"a": {4}, "b": {1, 2, 3}, "d": {3, 4}}, Released: map[string][]int32{"a": {1, 2}, "b": {5}, "c": {1, 2}}, Current: map[string][]int32{"a": {3, 4}, "b": {1, 2, 3, 4}, "d": {3, 4}}, @@ -43,8 +49,8 @@ var _ = Describe("balancer", func() { var err error subject, err = newBalancerFromMeta(client, map[string]sarama.ConsumerGroupMemberMetadata{ - "b": sarama.ConsumerGroupMemberMetadata{Topics: []string{"three", "one"}}, - "a": sarama.ConsumerGroupMemberMetadata{Topics: []string{"one", "two"}}, + "b": {Topics: []string{"three", "one"}}, + "a": {Topics: []string{"one", "two"}}, }) Expect(err).NotTo(HaveOccurred()) }) diff --git a/client.go b/client.go index 2cfac5d..42ffb30 100644 --- a/client.go +++ b/client.go @@ -1,12 +1,20 @@ package cluster -import "github.com/Shopify/sarama" +import ( + "errors" + "sync/atomic" + + "github.com/Shopify/sarama" +) + +var errClientInUse = errors.New("cluster: client is already used by another consumer") // Client is a group client type Client struct { sarama.Client config Config - own bool + + inUse uint32 } // NewClient creates a new client instance @@ -26,3 +34,17 @@ func NewClient(addrs []string, config *Config) (*Client, error) { return &Client{Client: client, config: *config}, nil } + +// ClusterConfig returns the cluster configuration. +func (c *Client) ClusterConfig() *Config { + cfg := c.config + return &cfg +} + +func (c *Client) claim() bool { + return atomic.CompareAndSwapUint32(&c.inUse, 0, 1) +} + +func (c *Client) release() { + atomic.CompareAndSwapUint32(&c.inUse, 1, 0) +} diff --git a/client_test.go b/client_test.go new file mode 100644 index 0000000..5a2597d --- /dev/null +++ b/client_test.go @@ -0,0 +1,31 @@ +package cluster + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("Client", func() { + var subject *Client + + BeforeEach(func() { + var err error + subject, err = NewClient(testKafkaAddrs, nil) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should not allow to share clients across multiple consumers", func() { + c1, err := NewConsumerFromClient(subject, testGroup, testTopics) + Expect(err).NotTo(HaveOccurred()) + defer c1.Close() + + _, err = NewConsumerFromClient(subject, testGroup, testTopics) + Expect(err).To(MatchError("cluster: client is already used by another consumer")) + + Expect(c1.Close()).To(Succeed()) + c2, err := NewConsumerFromClient(subject, testGroup, testTopics) + Expect(err).NotTo(HaveOccurred()) + Expect(c2.Close()).To(Succeed()) + }) + +}) diff --git a/cluster.go b/cluster.go index b82d52b..adcf0e9 100644 --- a/cluster.go +++ b/cluster.go @@ -1,7 +1,5 @@ package cluster -import "sort" - // Strategy for partition to consumer assignement type Strategy string @@ -25,42 +23,3 @@ type Error struct { Ctx string error } - -// -------------------------------------------------------------------- - -type none struct{} - -type topicPartition struct { - Topic string - Partition int32 -} - -type offsetInfo struct { - Offset int64 - Metadata string -} - -func (i offsetInfo) NextOffset(fallback int64) int64 { - if i.Offset > -1 { - return i.Offset - } - return fallback -} - -type int32Slice []int32 - -func (p int32Slice) Len() int { return len(p) } -func (p int32Slice) Less(i, j int) bool { return p[i] < p[j] } -func (p int32Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } - -func (p int32Slice) Diff(o int32Slice) (res []int32) { - on := len(o) - for _, x := range p { - n := sort.Search(on, func(i int) bool { return o[i] >= x }) - if n < on && o[n] == x { - continue - } - res = append(res, x) - } - return -} diff --git a/cluster_test.go b/cluster_test.go index 11b860e..753fa6c 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -18,7 +18,7 @@ const ( ) var ( - testKafkaRoot = "kafka_2.11-0.10.0.0" + testKafkaRoot = "kafka_2.12-1.0.0" testKafkaAddrs = []string{"127.0.0.1:29092"} testTopics = []string{"topic-a", "topic-b"} @@ -62,48 +62,50 @@ var _ = Describe("int32Slice", func() { // -------------------------------------------------------------------- var _ = BeforeSuite(func() { - testZkCmd = exec.Command( + testZkCmd = testCmd( testDataDir(testKafkaRoot, "bin", "kafka-run-class.sh"), "org.apache.zookeeper.server.quorum.QuorumPeerMain", testDataDir("zookeeper.properties"), ) - testZkCmd.Env = []string{"KAFKA_HEAP_OPTS=-Xmx512M -Xms512M"} - // testZkCmd.Stderr = os.Stderr - // testZkCmd.Stdout = os.Stdout - testKafkaCmd = exec.Command( + testKafkaCmd = testCmd( testDataDir(testKafkaRoot, "bin", "kafka-run-class.sh"), "-name", "kafkaServer", "kafka.Kafka", testDataDir("server.properties"), ) - testKafkaCmd.Env = []string{"KAFKA_HEAP_OPTS=-Xmx1G -Xms1G"} - // testKafkaCmd.Stderr = os.Stderr - // testKafkaCmd.Stdout = os.Stdout - Expect(os.MkdirAll(testKafkaData, 0777)).NotTo(HaveOccurred()) - Expect(testZkCmd.Start()).NotTo(HaveOccurred()) - Expect(testKafkaCmd.Start()).NotTo(HaveOccurred()) + // Remove old test data before starting + Expect(os.RemoveAll(testKafkaData)).NotTo(HaveOccurred()) + + Expect(os.MkdirAll(testKafkaData, 0777)).To(Succeed()) + Expect(testZkCmd.Start()).To(Succeed()) + Expect(testKafkaCmd.Start()).To(Succeed()) // Wait for client Eventually(func() error { var err error - testClient, err = sarama.NewClient(testKafkaAddrs, nil) + // sync-producer requires Return.Successes set to true + testConf := sarama.NewConfig() + testConf.Producer.Return.Successes = true + testClient, err = sarama.NewClient(testKafkaAddrs, testConf) return err - }, "10s", "1s").ShouldNot(HaveOccurred()) + }, "30s", "1s").Should(Succeed()) // Ensure we can retrieve partition info Eventually(func() error { _, err := testClient.Partitions(testTopics[0]) return err - }, "10s", "500ms").ShouldNot(HaveOccurred()) + }, "30s", "1s").Should(Succeed()) // Seed a few messages - Expect(testSeed(1000)).NotTo(HaveOccurred()) + Expect(testSeed(1000, testTopics)).To(Succeed()) }) var _ = AfterSuite(func() { - _ = testClient.Close() + if testClient != nil { + _ = testClient.Close() + } _ = testKafkaCmd.Process.Kill() _ = testZkCmd.Process.Kill() @@ -124,12 +126,12 @@ func testDataDir(tokens ...string) string { return filepath.Join(tokens...) } -// Seed messages -func testSeed(n int) error { +func testSeed(n int, testTopics []string) error { producer, err := sarama.NewSyncProducerFromClient(testClient) if err != nil { return err } + defer producer.Close() for i := 0; i < n; i++ { kv := sarama.StringEncoder(fmt.Sprintf("PLAINDATA-%08d", i)) @@ -140,7 +142,17 @@ func testSeed(n int) error { } } } - return producer.Close() + return nil +} + +func testCmd(name string, arg ...string) *exec.Cmd { + cmd := exec.Command(name, arg...) + if testing.Verbose() || os.Getenv("CI") != "" { + cmd.Stderr = os.Stderr + cmd.Stdout = os.Stdout + } + cmd.Env = []string{"KAFKA_HEAP_OPTS=-Xmx1G -Xms1G"} + return cmd } type testConsumerMessage struct { diff --git a/cmd/sarama-cluster-cli/main.go b/cmd/sarama-cluster-cli/main.go index 66ed986..59a55fb 100644 --- a/cmd/sarama-cluster-cli/main.go +++ b/cmd/sarama-cluster-cli/main.go @@ -57,45 +57,41 @@ func main() { if err != nil { printErrorAndExit(69, "Failed to start consumer: %s", err) } - - go func() { - for err := range consumer.Errors() { - logger.Printf("Error: %s\n", err.Error()) - } - }() - - go func() { - for note := range consumer.Notifications() { - logger.Printf("Rebalanced: %+v\n", note) + defer consumer.Close() + + // Create signal channel + sigchan := make(chan os.Signal, 1) + signal.Notify(sigchan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM) + + // Consume all channels, wait for signal to exit + for { + select { + case msg, more := <-consumer.Messages(): + if more { + fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Value) + consumer.MarkOffset(msg, "") + } + case ntf, more := <-consumer.Notifications(): + if more { + logger.Printf("Rebalanced: %+v\n", ntf) + } + case err, more := <-consumer.Errors(): + if more { + logger.Printf("Error: %s\n", err.Error()) + } + case <-sigchan: + return } - }() - - go func() { - for msg := range consumer.Messages() { - fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Value) - consumer.MarkOffset(msg, "") - } - }() - - wait := make(chan os.Signal, 1) - signal.Notify(wait, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM) - <-wait - - if err := consumer.Close(); err != nil { - logger.Println("Failed to close consumer: ", err) } } func printErrorAndExit(code int, format string, values ...interface{}) { - fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...)) - fmt.Fprintln(os.Stderr) + fmt.Fprintf(os.Stderr, "ERROR: "+format+"\n\n", values...) os.Exit(code) } func printUsageErrorAndExit(format string, values ...interface{}) { - fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...)) - fmt.Fprintln(os.Stderr) - fmt.Fprintln(os.Stderr, "Available command line options:") - flag.PrintDefaults() + fmt.Fprintf(os.Stderr, "ERROR: "+format+"\n\n", values...) + flag.Usage() os.Exit(64) } diff --git a/config.go b/config.go index 9d84b5a..084b835 100644 --- a/config.go +++ b/config.go @@ -1,6 +1,7 @@ package cluster import ( + "regexp" "time" "github.com/Shopify/sarama" @@ -8,30 +9,55 @@ import ( var minVersion = sarama.V0_9_0_0 +type ConsumerMode uint8 + +const ( + ConsumerModeMultiplex ConsumerMode = iota + ConsumerModePartitions +) + // Config extends sarama.Config with Group specific namespace type Config struct { sarama.Config // Group is the namespace for group management properties Group struct { + // The strategy to use for the allocation of partitions to consumers (defaults to StrategyRange) PartitionStrategy Strategy - Offsets struct { + + // By default, messages and errors from the subscribed topics and partitions are all multiplexed and + // made available through the consumer's Messages() and Errors() channels. + // + // Users who require low-level access can enable ConsumerModePartitions where individual partitions + // are exposed on the Partitions() channel. Messages and errors must then be consumed on the partitions + // themselves. + Mode ConsumerMode + + Offsets struct { Retry struct { - // The numer retries when comitting offsets (defaults to 3). + // The numer retries when committing offsets (defaults to 3). Max int } + Synchronization struct { + // The duration allowed for other clients to commit their offsets before resumption in this client, e.g. during a rebalance + // NewConfig sets this to the Consumer.MaxProcessingTime duration of the Sarama configuration + DwellTime time.Duration + } } + Session struct { // The allowed session timeout for registered consumers (defaults to 30s). // Must be within the allowed server range. Timeout time.Duration } + Heartbeat struct { // Interval between each heartbeat (defaults to 3s). It should be no more // than 1/3rd of the Group.Session.Timout setting Interval time.Duration } + // Return specifies which group channels will be populated. If they are set to true, // you must read from the respective channels to prevent deadlock. Return struct { @@ -39,6 +65,21 @@ type Config struct { // Notifications channel (default disabled). Notifications bool } + + Topics struct { + // An additional whitelist of topics to subscribe to. + Whitelist *regexp.Regexp + // An additional blacklist of topics to avoid. If set, this will precede over + // the Whitelist setting. + Blacklist *regexp.Regexp + } + + Member struct { + // Custom metadata to include when joining the group. The user data for all joined members + // can be retrieved by sending a DescribeGroupRequest to the broker that is the + // coordinator for the group. + UserData []byte + } } } @@ -49,6 +90,7 @@ func NewConfig() *Config { } c.Group.PartitionStrategy = StrategyRange c.Group.Offsets.Retry.Max = 3 + c.Group.Offsets.Synchronization.DwellTime = c.Consumer.MaxProcessingTime c.Group.Session.Timeout = 30 * time.Second c.Group.Heartbeat.Interval = 3 * time.Second c.Config.Version = minVersion @@ -79,10 +121,18 @@ func (c *Config) Validate() error { switch { case c.Group.Offsets.Retry.Max < 0: return sarama.ConfigurationError("Group.Offsets.Retry.Max must be >= 0") + case c.Group.Offsets.Synchronization.DwellTime <= 0: + return sarama.ConfigurationError("Group.Offsets.Synchronization.DwellTime must be > 0") + case c.Group.Offsets.Synchronization.DwellTime > 10*time.Minute: + return sarama.ConfigurationError("Group.Offsets.Synchronization.DwellTime must be <= 10m") case c.Group.Heartbeat.Interval <= 0: return sarama.ConfigurationError("Group.Heartbeat.Interval must be > 0") case c.Group.Session.Timeout <= 0: return sarama.ConfigurationError("Group.Session.Timeout must be > 0") + case !c.Metadata.Full && c.Group.Topics.Whitelist != nil: + return sarama.ConfigurationError("Metadata.Full must be enabled when Group.Topics.Whitelist is used") + case !c.Metadata.Full && c.Group.Topics.Blacklist != nil: + return sarama.ConfigurationError("Metadata.Full must be enabled when Group.Topics.Blacklist is used") } // ensure offset is correct diff --git a/config_test.go b/config_test.go index 558cd90..dba525b 100644 --- a/config_test.go +++ b/config_test.go @@ -19,6 +19,7 @@ var _ = Describe("Config", func() { Expect(subject.Group.Heartbeat.Interval).To(Equal(3 * time.Second)) Expect(subject.Group.Return.Notifications).To(BeFalse()) Expect(subject.Metadata.Retry.Max).To(Equal(3)) + Expect(subject.Group.Offsets.Synchronization.DwellTime).NotTo(BeZero()) // Expect(subject.Config.Version).To(Equal(sarama.V0_9_0_0)) }) diff --git a/consumer.go b/consumer.go index 199f590..e7a67da 100644 --- a/consumer.go +++ b/consumer.go @@ -11,51 +11,85 @@ import ( // Consumer is a cluster group consumer type Consumer struct { - client *Client + client *Client + ownClient bool - csmr sarama.Consumer - subs *partitionMap + consumer sarama.Consumer + subs *partitionMap + + consumerID string + groupID string - consumerID string - generationID int32 - groupID string memberID string - topics []string + generationID int32 + membershipMu sync.RWMutex + + coreTopics []string + extraTopics []string dying, dead chan none + closeOnce sync.Once consuming int32 - errors chan error messages chan *sarama.ConsumerMessage + errors chan error + partitions chan PartitionConsumer notifications chan *Notification commitMu sync.Mutex } -// NewConsumerFromClient initializes a new consumer from an existing client +// NewConsumer initializes a new consumer +func NewConsumer(addrs []string, groupID string, topics []string, config *Config) (*Consumer, error) { + client, err := NewClient(addrs, config) + if err != nil { + return nil, err + } + + consumer, err := NewConsumerFromClient(client, groupID, topics) + if err != nil { + return nil, err + } + consumer.ownClient = true + return consumer, nil +} + +// NewConsumerFromClient initializes a new consumer from an existing client. +// +// Please note that clients cannot be shared between consumers (due to Kafka internals), +// they can only be re-used which requires the user to call Close() on the first consumer +// before using this method again to initialize another one. Attempts to use a client with +// more than one consumer at a time will return errors. func NewConsumerFromClient(client *Client, groupID string, topics []string) (*Consumer, error) { - csmr, err := sarama.NewConsumerFromClient(client.Client) + if !client.claim() { + return nil, errClientInUse + } + + consumer, err := sarama.NewConsumerFromClient(client.Client) if err != nil { + client.release() return nil, err } + sort.Strings(topics) c := &Consumer{ - client: client, + client: client, + consumer: consumer, + subs: newPartitionMap(), + groupID: groupID, - csmr: csmr, - subs: newPartitionMap(), - - groupID: groupID, - topics: topics, + coreTopics: topics, dying: make(chan none), dead: make(chan none), - errors: make(chan error, client.config.ChannelBufferSize), messages: make(chan *sarama.ConsumerMessage), - notifications: make(chan *Notification, 1), + errors: make(chan error, client.config.ChannelBufferSize), + partitions: make(chan PartitionConsumer, 1), + notifications: make(chan *Notification), } if err := c.client.RefreshCoordinator(groupID); err != nil { + client.release() return nil, err } @@ -63,26 +97,24 @@ func NewConsumerFromClient(client *Client, groupID string, topics []string) (*Co return c, nil } -// NewConsumer initializes a new consumer -func NewConsumer(addrs []string, groupID string, topics []string, config *Config) (*Consumer, error) { - client, err := NewClient(addrs, config) - if err != nil { - return nil, err - } - - consumer, err := NewConsumerFromClient(client, groupID, topics) - if err != nil { - _ = client.Close() - return nil, err - } - consumer.client.own = true - return consumer, nil -} - // Messages returns the read channel for the messages that are returned by // the broker. +// +// This channel will only return if Config.Group.Mode option is set to +// ConsumerModeMultiplex (default). func (c *Consumer) Messages() <-chan *sarama.ConsumerMessage { return c.messages } +// Partitions returns the read channels for individual partitions of this broker. +// +// This will channel will only return if Config.Group.Mode option is set to +// ConsumerModePartitions. +// +// The Partitions() channel must be listened to for the life of this consumer; +// when a rebalance happens old partitions will be closed (naturally come to +// completion) and new ones will be emitted. The returned channel will only close +// when the consumer is completely shut down. +func (c *Consumer) Partitions() <-chan PartitionConsumer { return c.partitions } + // Errors returns a read channel of errors that occur during offset management, if // enabled. By default, errors are logged and not returned over this channel. If // you want to implement any custom error handling, set your config's @@ -91,9 +123,13 @@ func (c *Consumer) Errors() <-chan error { return c.errors } // Notifications returns a channel of Notifications that occur during consumer // rebalancing. Notifications will only be emitted over this channel, if your config's -// Cluster.Return.Notifications setting to true. +// Group.Return.Notifications setting to true. func (c *Consumer) Notifications() <-chan *Notification { return c.notifications } +// HighWaterMarks returns the current high water marks for each topic and partition +// Consistency between partitions is not guaranteed since high water marks are updated separately. +func (c *Consumer) HighWaterMarks() map[string]map[int32]int64 { return c.consumer.HighWaterMarks() } + // MarkOffset marks the provided message as processed, alongside a metadata string // that represents the state of the partition consumer at that point in time. The // metadata string can be used by another consumer to restore that state, so it @@ -104,13 +140,66 @@ func (c *Consumer) Notifications() <-chan *Notification { return c.notifications // your application crashes. This means that you may end up processing the same // message twice, and your processing should ideally be idempotent. func (c *Consumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string) { - c.subs.Fetch(msg.Topic, msg.Partition).MarkOffset(msg.Offset+1, metadata) + if sub := c.subs.Fetch(msg.Topic, msg.Partition); sub != nil { + sub.MarkOffset(msg.Offset, metadata) + } } // MarkPartitionOffset marks an offset of the provided topic/partition as processed. // See MarkOffset for additional explanation. func (c *Consumer) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) { - c.subs.Fetch(topic, partition).MarkOffset(offset+1, metadata) + if sub := c.subs.Fetch(topic, partition); sub != nil { + sub.MarkOffset(offset, metadata) + } +} + +// MarkOffsets marks stashed offsets as processed. +// See MarkOffset for additional explanation. +func (c *Consumer) MarkOffsets(s *OffsetStash) { + s.mu.Lock() + defer s.mu.Unlock() + + for tp, info := range s.offsets { + if sub := c.subs.Fetch(tp.Topic, tp.Partition); sub != nil { + sub.MarkOffset(info.Offset, info.Metadata) + } + delete(s.offsets, tp) + } +} + +// ResetOffsets marks the provided message as processed, alongside a metadata string +// that represents the state of the partition consumer at that point in time. The +// metadata string can be used by another consumer to restore that state, so it +// can resume consumption. +// +// Difference between ResetOffset and MarkOffset is that it allows to rewind to an earlier offset +func (c *Consumer) ResetOffset(msg *sarama.ConsumerMessage, metadata string) { + if sub := c.subs.Fetch(msg.Topic, msg.Partition); sub != nil { + sub.ResetOffset(msg.Offset, metadata) + } +} + +// ResetPartitionOffset marks an offset of the provided topic/partition as processed. +// See ResetOffset for additional explanation. +func (c *Consumer) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) { + sub := c.subs.Fetch(topic, partition) + if sub != nil { + sub.ResetOffset(offset, metadata) + } +} + +// ResetOffsets marks stashed offsets as processed. +// See ResetOffset for additional explanation. +func (c *Consumer) ResetOffsets(s *OffsetStash) { + s.mu.Lock() + defer s.mu.Unlock() + + for tp, info := range s.offsets { + if sub := c.subs.Fetch(tp.Topic, tp.Partition); sub != nil { + sub.ResetOffset(info.Offset, info.Metadata) + } + delete(s.offsets, tp) + } } // Subscriptions returns the consumed topics and partitions @@ -118,29 +207,35 @@ func (c *Consumer) Subscriptions() map[string][]int32 { return c.subs.Info() } -// CommitOffsets manually commits marked offsets +// CommitOffsets allows to manually commit previously marked offsets. By default there is no +// need to call this function as the consumer will commit offsets automatically +// using the Config.Consumer.Offsets.CommitInterval setting. +// +// Please be aware that calling this function during an internal rebalance cycle may return +// broker errors (e.g. sarama.ErrUnknownMemberId or sarama.ErrIllegalGeneration). func (c *Consumer) CommitOffsets() error { c.commitMu.Lock() defer c.commitMu.Unlock() + memberID, generationID := c.membership() req := &sarama.OffsetCommitRequest{ Version: 2, ConsumerGroup: c.groupID, - ConsumerGroupGeneration: c.generationID, - ConsumerID: c.memberID, + ConsumerGroupGeneration: generationID, + ConsumerID: memberID, RetentionTime: -1, } - if rt := c.client.config.Consumer.Offsets.Retention; rt != 0 { - req.RetentionTime = int64(rt / time.Millisecond) + if ns := c.client.config.Consumer.Offsets.Retention; ns != 0 { + req.RetentionTime = int64(ns / time.Millisecond) } - var dirty bool snap := c.subs.Snapshot() + dirty := false for tp, state := range snap { if state.Dirty { - req.AddBlock(tp.Topic, tp.Partition, state.Info.Offset, 0, state.Info.Metadata) dirty = true + req.AddBlock(tp.Topic, tp.Partition, state.Info.Offset, 0, state.Info.Metadata) } } if !dirty { @@ -159,44 +254,59 @@ func (c *Consumer) CommitOffsets() error { return err } - for topic, perrs := range resp.Errors { - for partition, kerr := range perrs { + for topic, errs := range resp.Errors { + for partition, kerr := range errs { if kerr != sarama.ErrNoError { err = kerr } else if state, ok := snap[topicPartition{topic, partition}]; ok { - c.subs.Fetch(topic, partition).MarkCommitted(state.Info.Offset) + if sub := c.subs.Fetch(topic, partition); sub != nil { + sub.markCommitted(state.Info.Offset) + } } } } - return err } // Close safely closes the consumer and releases all resources func (c *Consumer) Close() (err error) { - close(c.dying) - <-c.dead + c.closeOnce.Do(func() { + close(c.dying) + <-c.dead - if e := c.release(); e != nil { - err = e - } - if e := c.csmr.Close(); e != nil { - err = e - } - close(c.messages) - close(c.errors) - - if e := c.leaveGroup(); e != nil { - err = e - } - close(c.notifications) + if e := c.release(); e != nil { + err = e + } + if e := c.consumer.Close(); e != nil { + err = e + } + close(c.messages) + close(c.errors) - if c.client.own { - if e := c.client.Close(); e != nil { + if e := c.leaveGroup(); e != nil { err = e } - } + close(c.partitions) + close(c.notifications) + + // drain + for range c.messages { + } + for range c.errors { + } + for p := range c.partitions { + _ = p.Close() + } + for range c.notifications { + } + c.client.release() + if c.ownClient { + if e := c.client.Close(); e != nil { + err = e + } + } + }) return } @@ -214,64 +324,78 @@ func (c *Consumer) mainLoop() { default: } - // Remember previous subscriptions - var notification *Notification - if c.client.config.Group.Return.Notifications { - notification = newNotification(c.subs.Info()) - } + // Start next consume cycle + c.nextTick() + } +} - // Rebalance, fetch new subscriptions - subs, err := c.rebalance() - if err != nil { - c.rebalanceError(err, notification) - continue - } +func (c *Consumer) nextTick() { + // Remember previous subscriptions + var notification *Notification + if c.client.config.Group.Return.Notifications { + notification = newNotification(c.subs.Info()) + } - // Start the heartbeat - hbStop, hbDone := make(chan none), make(chan none) - go c.hbLoop(hbStop, hbDone) + // Refresh coordinator + if err := c.refreshCoordinator(); err != nil { + c.rebalanceError(err, nil) + return + } - // Subscribe to topic/partitions - if err := c.subscribe(subs); err != nil { - close(hbStop) - <-hbDone - c.rebalanceError(err, notification) - continue - } + // Release subscriptions + if err := c.release(); err != nil { + c.rebalanceError(err, nil) + return + } - // Start consuming and comitting offsets - cmStop, cmDone := make(chan none), make(chan none) - go c.cmLoop(cmStop, cmDone) - atomic.StoreInt32(&c.consuming, 1) + // Issue rebalance start notification + if c.client.config.Group.Return.Notifications { + c.handleNotification(notification) + } - // Update notification with new claims - if c.client.config.Group.Return.Notifications { - notification.claim(subs) - c.notifications <- notification - } + // Rebalance, fetch new subscriptions + subs, err := c.rebalance() + if err != nil { + c.rebalanceError(err, notification) + return + } - // Wait for signals - select { - case <-hbDone: - close(cmStop) - <-cmDone - case <-cmDone: - close(hbStop) - <-hbDone - case <-c.dying: - close(cmStop) - <-cmDone - close(hbStop) - <-hbDone - return - } + // Coordinate loops, make sure everything is + // stopped on exit + tomb := newLoopTomb() + defer tomb.Close() + + // Start the heartbeat + tomb.Go(c.hbLoop) + + // Subscribe to topic/partitions + if err := c.subscribe(tomb, subs); err != nil { + c.rebalanceError(err, notification) + return + } + + // Update/issue notification with new claims + if c.client.config.Group.Return.Notifications { + notification = notification.success(subs) + c.handleNotification(notification) + } + + // Start topic watcher loop + tomb.Go(c.twLoop) + + // Start consuming and committing offsets + tomb.Go(c.cmLoop) + atomic.StoreInt32(&c.consuming, 1) + + // Wait for signals + select { + case <-tomb.Dying(): + case <-c.dying: } } // heartbeat loop, triggered by the mainLoop -func (c *Consumer) hbLoop(stop <-chan none, done chan<- none) { - defer close(done) - +func (c *Consumer) hbLoop(stopped <-chan none) { ticker := time.NewTicker(c.client.config.Group.Heartbeat.Interval) defer ticker.Stop() @@ -286,16 +410,45 @@ func (c *Consumer) hbLoop(stop <-chan none, done chan<- none) { c.handleError(&Error{Ctx: "heartbeat", error: err}) return } - case <-stop: + case <-stopped: + return + case <-c.dying: return } } } -// commit loop, triggered by the mainLoop -func (c *Consumer) cmLoop(stop <-chan none, done chan<- none) { - defer close(done) +// topic watcher loop, triggered by the mainLoop +func (c *Consumer) twLoop(stopped <-chan none) { + ticker := time.NewTicker(c.client.config.Metadata.RefreshFrequency / 2) + defer ticker.Stop() + for { + select { + case <-ticker.C: + topics, err := c.client.Topics() + if err != nil { + c.handleError(&Error{Ctx: "topics", error: err}) + return + } + + for _, topic := range topics { + if !c.isKnownCoreTopic(topic) && + !c.isKnownExtraTopic(topic) && + c.isPotentialExtraTopic(topic) { + return + } + } + case <-stopped: + return + case <-c.dying: + return + } + } +} + +// commit loop, triggered by the mainLoop +func (c *Consumer) cmLoop(stopped <-chan none) { ticker := time.NewTicker(c.client.config.Consumer.Offsets.CommitInterval) defer ticker.Stop() @@ -306,15 +459,18 @@ func (c *Consumer) cmLoop(stop <-chan none, done chan<- none) { c.handleError(&Error{Ctx: "commit", error: err}) return } - case <-stop: + case <-stopped: + return + case <-c.dying: return } } } -func (c *Consumer) rebalanceError(err error, notification *Notification) { - if c.client.config.Group.Return.Notifications { - c.notifications <- notification +func (c *Consumer) rebalanceError(err error, n *Notification) { + if n != nil { + n.Type = RebalanceError + c.handleNotification(n) } switch err { @@ -329,6 +485,16 @@ func (c *Consumer) rebalanceError(err error, notification *Notification) { } } +func (c *Consumer) handleNotification(n *Notification) { + if c.client.config.Group.Return.Notifications { + select { + case c.notifications <- n: + case <-c.dying: + return + } + } +} + func (c *Consumer) handleError(e *Error) { if c.client.config.Consumer.Return.Errors { select { @@ -350,7 +516,13 @@ func (c *Consumer) release() (err error) { defer c.subs.Clear() // Wait for messages to be processed - time.Sleep(c.client.config.Consumer.MaxProcessingTime) + timeout := time.NewTimer(c.client.config.Group.Offsets.Synchronization.DwellTime) + defer timeout.Stop() + + select { + case <-c.dying: + case <-timeout.C: + } // Commit offsets, continue on errors if e := c.commitOffsetsWithRetry(c.client.config.Group.Offsets.Retry.Max); e != nil { @@ -370,10 +542,11 @@ func (c *Consumer) heartbeat() error { return err } + memberID, generationID := c.membership() resp, err := broker.Heartbeat(&sarama.HeartbeatRequest{ GroupId: c.groupID, - MemberId: c.memberID, - GenerationId: c.generationID, + MemberId: memberID, + GenerationId: generationID, }) if err != nil { c.closeCoordinator(broker, err) @@ -384,27 +557,27 @@ func (c *Consumer) heartbeat() error { // Performs a rebalance, part of the mainLoop() func (c *Consumer) rebalance() (map[string][]int32, error) { - sarama.Logger.Printf("cluster/consumer %s rebalance\n", c.memberID) - - if err := c.client.RefreshCoordinator(c.groupID); err != nil { - return nil, err - } + memberID, _ := c.membership() + sarama.Logger.Printf("cluster/consumer %s rebalance\n", memberID) - // Release subscriptions - if err := c.release(); err != nil { + allTopics, err := c.client.Topics() + if err != nil { return nil, err } + c.extraTopics = c.selectExtraTopics(allTopics) + sort.Strings(c.extraTopics) // Re-join consumer group strategy, err := c.joinGroup() switch { case err == sarama.ErrUnknownMemberId: + c.membershipMu.Lock() c.memberID = "" + c.membershipMu.Unlock() return nil, err case err != nil: return nil, err } - // sarama.Logger.Printf("cluster/consumer %s/%d joined group %s\n", c.memberID, c.generationID, c.groupID) // Sync consumer group state, fetch subscriptions subs, err := c.syncGroup(strategy) @@ -419,7 +592,7 @@ func (c *Consumer) rebalance() (map[string][]int32, error) { } // Performs the subscription, part of the mainLoop() -func (c *Consumer) subscribe(subs map[string][]int32) error { +func (c *Consumer) subscribe(tomb *loopTomb, subs map[string][]int32) error { // fetch offsets offsets, err := c.fetchOffsets(subs) if err != nil { @@ -436,8 +609,8 @@ func (c *Consumer) subscribe(subs map[string][]int32) error { wg.Add(1) info := offsets[topic][partition] - go func(t string, p int32) { - if e := c.createConsumer(t, p, info); e != nil { + go func(topic string, partition int32) { + if e := c.createConsumer(tomb, topic, partition, info); e != nil { mu.Lock() err = e mu.Unlock() @@ -459,16 +632,18 @@ func (c *Consumer) subscribe(subs map[string][]int32) error { // Send a request to the broker to join group on rebalance() func (c *Consumer) joinGroup() (*balancer, error) { + memberID, _ := c.membership() req := &sarama.JoinGroupRequest{ GroupId: c.groupID, - MemberId: c.memberID, + MemberId: memberID, SessionTimeout: int32(c.client.config.Group.Session.Timeout / time.Millisecond), ProtocolType: "consumer", } meta := &sarama.ConsumerGroupMemberMetadata{ - Version: 1, - Topics: c.topics, + Version: 1, + Topics: append(c.coreTopics, c.extraTopics...), + UserData: c.client.config.Group.Member.UserData, } err := req.AddGroupProtocolMetadata(string(StrategyRange), meta) if err != nil { @@ -507,8 +682,10 @@ func (c *Consumer) joinGroup() (*balancer, error) { } } + c.membershipMu.Lock() c.memberID = resp.MemberId c.generationID = resp.GenerationId + c.membershipMu.Unlock() return strategy, nil } @@ -516,18 +693,20 @@ func (c *Consumer) joinGroup() (*balancer, error) { // Send a request to the broker to sync the group on rebalance(). // Returns a list of topics and partitions to consume. func (c *Consumer) syncGroup(strategy *balancer) (map[string][]int32, error) { + memberID, generationID := c.membership() req := &sarama.SyncGroupRequest{ GroupId: c.groupID, - MemberId: c.memberID, - GenerationId: c.generationID, + MemberId: memberID, + GenerationId: generationID, } - for memberID, topics := range strategy.Perform(c.client.config.Group.PartitionStrategy) { - if err := req.AddGroupAssignmentMember(memberID, &sarama.ConsumerGroupMemberAssignment{ - Version: 1, - Topics: topics, - }); err != nil { - return nil, err + if strategy != nil { + for memberID, topics := range strategy.Perform(c.client.config.Group.PartitionStrategy) { + if err := req.AddGroupAssignmentMember(memberID, &sarama.ConsumerGroupMemberAssignment{ + Topics: topics, + }); err != nil { + return nil, err + } } } @@ -580,9 +759,6 @@ func (c *Consumer) fetchOffsets(subs map[string][]int32) (map[string]map[int32]o } } - // Wait for other cluster consumers to process, release and commit - time.Sleep(c.client.config.Consumer.MaxProcessingTime * 2) - broker, err := c.client.Coordinator(c.groupID) if err != nil { c.closeCoordinator(broker, err) @@ -620,9 +796,10 @@ func (c *Consumer) leaveGroup() error { return err } + memberID, _ := c.membership() if _, err = broker.LeaveGroup(&sarama.LeaveGroupRequest{ GroupId: c.groupID, - MemberId: c.memberID, + MemberId: memberID, }); err != nil { c.closeCoordinator(broker, err) } @@ -631,27 +808,37 @@ func (c *Consumer) leaveGroup() error { // -------------------------------------------------------------------- -func (c *Consumer) createConsumer(topic string, partition int32, info offsetInfo) error { - sarama.Logger.Printf("cluster/consumer %s consume %s/%d from %d\n", c.memberID, topic, partition, info.NextOffset(c.client.config.Consumer.Offsets.Initial)) +func (c *Consumer) createConsumer(tomb *loopTomb, topic string, partition int32, info offsetInfo) error { + memberID, _ := c.membership() + sarama.Logger.Printf("cluster/consumer %s consume %s/%d from %d\n", memberID, topic, partition, info.NextOffset(c.client.config.Consumer.Offsets.Initial)) // Create partitionConsumer - pc, err := newPartitionConsumer(c.csmr, topic, partition, info, c.client.config.Consumer.Offsets.Initial) + pc, err := newPartitionConsumer(c.consumer, topic, partition, info, c.client.config.Consumer.Offsets.Initial) if err != nil { - return nil + return err } // Store in subscriptions c.subs.Store(topic, partition, pc) // Start partition consumer goroutine - go pc.Loop(c.messages, c.errors) + tomb.Go(func(stopper <-chan none) { + if c.client.config.Group.Mode == ConsumerModePartitions { + pc.waitFor(stopper, c.errors) + } else { + pc.multiplex(stopper, c.messages, c.errors) + } + }) + if c.client.config.Group.Mode == ConsumerModePartitions { + c.partitions <- pc + } return nil } func (c *Consumer) commitOffsetsWithRetry(retries int) error { err := c.CommitOffsets() - if err != nil && retries > 0 && c.subs.HasDirty() { + if err != nil && retries > 0 { return c.commitOffsetsWithRetry(retries - 1) } return err @@ -667,3 +854,66 @@ func (c *Consumer) closeCoordinator(broker *sarama.Broker, err error) { _ = c.client.RefreshCoordinator(c.groupID) } } + +func (c *Consumer) selectExtraTopics(allTopics []string) []string { + extra := allTopics[:0] + for _, topic := range allTopics { + if !c.isKnownCoreTopic(topic) && c.isPotentialExtraTopic(topic) { + extra = append(extra, topic) + } + } + return extra +} + +func (c *Consumer) isKnownCoreTopic(topic string) bool { + pos := sort.SearchStrings(c.coreTopics, topic) + return pos < len(c.coreTopics) && c.coreTopics[pos] == topic +} + +func (c *Consumer) isKnownExtraTopic(topic string) bool { + pos := sort.SearchStrings(c.extraTopics, topic) + return pos < len(c.extraTopics) && c.extraTopics[pos] == topic +} + +func (c *Consumer) isPotentialExtraTopic(topic string) bool { + rx := c.client.config.Group.Topics + if rx.Blacklist != nil && rx.Blacklist.MatchString(topic) { + return false + } + if rx.Whitelist != nil && rx.Whitelist.MatchString(topic) { + return true + } + return false +} + +func (c *Consumer) refreshCoordinator() error { + if err := c.refreshMetadata(); err != nil { + return err + } + return c.client.RefreshCoordinator(c.groupID) +} + +func (c *Consumer) refreshMetadata() (err error) { + if c.client.config.Metadata.Full { + err = c.client.RefreshMetadata() + } else { + var topics []string + if topics, err = c.client.Topics(); err == nil && len(topics) != 0 { + err = c.client.RefreshMetadata(topics...) + } + } + + // maybe we didn't have authorization to describe all topics + switch err { + case sarama.ErrTopicAuthorizationFailed: + err = c.client.RefreshMetadata(c.coreTopics...) + } + return +} + +func (c *Consumer) membership() (memberID string, generationID int32) { + c.membershipMu.RLock() + memberID, generationID = c.memberID, c.generationID + c.membershipMu.RUnlock() + return +} diff --git a/consumer_test.go b/consumer_test.go index 8fb5b7e..7c8b225 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -2,25 +2,22 @@ package cluster import ( "fmt" + "regexp" "sync/atomic" "time" + "github.com/Shopify/sarama" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" ) var _ = Describe("Consumer", func() { - var newConsumer = func(group string) (*Consumer, error) { + var newConsumerOf = func(group string, topics ...string) (*Consumer, error) { config := NewConfig() config.Consumer.Return.Errors = true - return NewConsumer(testKafkaAddrs, group, testTopics, config) - } - - var newConsumerOf = func(group, topic string) (*Consumer, error) { - config := NewConfig() - config.Consumer.Return.Errors = true - return NewConsumer(testKafkaAddrs, group, []string{topic}, config) + config.Consumer.Offsets.Initial = sarama.OffsetOldest + return NewConsumer(testKafkaAddrs, group, topics, config) } var subscriptionsOf = func(c *Consumer) GomegaAsyncAssertion { @@ -29,29 +26,9 @@ var _ = Describe("Consumer", func() { }, "10s", "100ms") } - var consume = func(consumerID, group string, max int, out chan *testConsumerMessage) { - go func() { - defer GinkgoRecover() - - cs, err := newConsumer(group) - Expect(err).NotTo(HaveOccurred()) - defer cs.Close() - cs.consumerID = consumerID - - for msg := range cs.Messages() { - out <- &testConsumerMessage{*msg, consumerID} - cs.MarkOffset(msg, "") - - if max--; max == 0 { - return - } - } - }() - } - It("should init and share", func() { // start CS1 - cs1, err := newConsumer(testGroup) + cs1, err := newConsumerOf(testGroup, testTopics...) Expect(err).NotTo(HaveOccurred()) // CS1 should consume all 8 partitions @@ -61,7 +38,7 @@ var _ = Describe("Consumer", func() { })) // start CS2 - cs2, err := newConsumer(testGroup) + cs2, err := newConsumerOf(testGroup, testTopics...) Expect(err).NotTo(HaveOccurred()) defer cs2.Close() @@ -106,9 +83,7 @@ var _ = Describe("Consumer", func() { Expect(err).NotTo(HaveOccurred()) defer cs5.Close() - // wait for rebalance, make sure no errors occurred - Eventually(func() bool { return atomic.LoadInt32(&cs5.consuming) == 1 }, "10s", "100ms").Should(BeTrue()) - time.Sleep(time.Second) + // make sure no errors occurred Expect(cs1.Errors()).ShouldNot(Receive()) Expect(cs2.Errors()).ShouldNot(Receive()) Expect(cs3.Errors()).ShouldNot(Receive()) @@ -116,8 +91,10 @@ var _ = Describe("Consumer", func() { Expect(cs5.Errors()).ShouldNot(Receive()) // close 4th, make sure the 5th takes over - cs4.Close() - Eventually(func() bool { return atomic.LoadInt32(&cs4.consuming) == 1 }, "10s", "100ms").Should(BeFalse()) + Expect(cs4.Close()).To(Succeed()) + subscriptionsOf(cs1).Should(HaveKeyWithValue("topic-a", HaveLen(1))) + subscriptionsOf(cs2).Should(HaveKeyWithValue("topic-a", HaveLen(1))) + subscriptionsOf(cs3).Should(HaveKeyWithValue("topic-a", HaveLen(1))) subscriptionsOf(cs4).Should(BeEmpty()) subscriptionsOf(cs5).Should(HaveKeyWithValue("topic-a", HaveLen(1))) @@ -129,6 +106,63 @@ var _ = Describe("Consumer", func() { Expect(cs5.Errors()).ShouldNot(Receive()) }) + It("should be allowed to subscribe to partitions via white/black-lists", func() { + config := NewConfig() + config.Consumer.Return.Errors = true + config.Group.Topics.Whitelist = regexp.MustCompile(`topic-\w+`) + config.Group.Topics.Blacklist = regexp.MustCompile(`[bcd]$`) + + cs, err := NewConsumer(testKafkaAddrs, testGroup, nil, config) + Expect(err).NotTo(HaveOccurred()) + defer cs.Close() + + subscriptionsOf(cs).Should(Equal(map[string][]int32{ + "topic-a": {0, 1, 2, 3}, + })) + }) + + It("should receive rebalance notifications", func() { + config := NewConfig() + config.Consumer.Return.Errors = true + config.Group.Return.Notifications = true + + cs, err := NewConsumer(testKafkaAddrs, testGroup, testTopics, config) + Expect(err).NotTo(HaveOccurred()) + defer cs.Close() + + select { + case n := <-cs.Notifications(): + Expect(n).To(Equal(&Notification{ + Type: RebalanceStart, + Current: map[string][]int32{}, + })) + case err := <-cs.Errors(): + Expect(err).NotTo(HaveOccurred()) + case <-cs.Messages(): + Fail("expected notification to arrive before message") + } + + select { + case n := <-cs.Notifications(): + Expect(n).To(Equal(&Notification{ + Type: RebalanceOK, + Claimed: map[string][]int32{ + "topic-a": {0, 1, 2, 3}, + "topic-b": {0, 1, 2, 3}, + }, + Released: map[string][]int32{}, + Current: map[string][]int32{ + "topic-a": {0, 1, 2, 3}, + "topic-b": {0, 1, 2, 3}, + }, + })) + case err := <-cs.Errors(): + Expect(err).NotTo(HaveOccurred()) + case <-cs.Messages(): + Fail("expected notification to arrive before message") + } + }) + It("should support manual mark/commit", func() { cs, err := newConsumerOf(testGroup, "topic-a") Expect(err).NotTo(HaveOccurred()) @@ -149,35 +183,151 @@ var _ = Describe("Consumer", func() { })) }) + It("should support manual mark/commit, reset/commit", func() { + cs, err := newConsumerOf(testGroup, "topic-a") + Expect(err).NotTo(HaveOccurred()) + defer cs.Close() + + subscriptionsOf(cs).Should(Equal(map[string][]int32{ + "topic-a": {0, 1, 2, 3}}, + )) + + cs.MarkPartitionOffset("topic-a", 1, 3, "") + cs.MarkPartitionOffset("topic-a", 2, 4, "") + cs.MarkPartitionOffset("topic-b", 1, 2, "") // should not throw NPE + Expect(cs.CommitOffsets()).NotTo(HaveOccurred()) + + cs.ResetPartitionOffset("topic-a", 1, 2, "") + cs.ResetPartitionOffset("topic-a", 2, 3, "") + cs.ResetPartitionOffset("topic-b", 1, 2, "") // should not throw NPE + Expect(cs.CommitOffsets()).NotTo(HaveOccurred()) + + offsets, err := cs.fetchOffsets(cs.Subscriptions()) + Expect(err).NotTo(HaveOccurred()) + Expect(offsets).To(Equal(map[string]map[int32]offsetInfo{ + "topic-a": {0: {Offset: -1}, 1: {Offset: 3}, 2: {Offset: 4}, 3: {Offset: -1}}, + })) + }) + + It("should not commit unprocessed offsets", func() { + const groupID = "panicking" + + cs, err := newConsumerOf(groupID, "topic-a") + Expect(err).NotTo(HaveOccurred()) + + subscriptionsOf(cs).Should(Equal(map[string][]int32{ + "topic-a": {0, 1, 2, 3}, + })) + + n := 0 + Expect(func() { + for range cs.Messages() { + n++ + panic("stop here!") + } + }).To(Panic()) + Expect(cs.Close()).To(Succeed()) + Expect(n).To(Equal(1)) + + bk, err := testClient.Coordinator(groupID) + Expect(err).NotTo(HaveOccurred()) + + req := &sarama.OffsetFetchRequest{ + Version: 1, + ConsumerGroup: groupID, + } + req.AddPartition("topic-a", 0) + req.AddPartition("topic-a", 1) + req.AddPartition("topic-a", 2) + req.AddPartition("topic-a", 3) + Expect(bk.FetchOffset(req)).To(Equal(&sarama.OffsetFetchResponse{ + Blocks: map[string]map[int32]*sarama.OffsetFetchResponseBlock{ + "topic-a": {0: {Offset: -1}, 1: {Offset: -1}, 2: {Offset: -1}, 3: {Offset: -1}}, + }, + })) + }) + + It("should consume partitions", func() { + count := int32(0) + consume := func(consumerID string) { + defer GinkgoRecover() + + config := NewConfig() + config.Group.Mode = ConsumerModePartitions + config.Consumer.Offsets.Initial = sarama.OffsetOldest + + cs, err := NewConsumer(testKafkaAddrs, "partitions", testTopics, config) + Expect(err).NotTo(HaveOccurred()) + defer cs.Close() + + for pc := range cs.Partitions() { + go func(pc PartitionConsumer) { + defer pc.Close() + + for msg := range pc.Messages() { + atomic.AddInt32(&count, 1) + cs.MarkOffset(msg, "") + } + }(pc) + } + } + + go consume("A") + go consume("B") + go consume("C") + + Eventually(func() int32 { + return atomic.LoadInt32(&count) + }, "30s", "100ms").Should(BeNumerically(">=", 2000)) + }) + It("should consume/commit/resume", func() { - acc := make(chan *testConsumerMessage, 150000) - consume("A", "fuzzing", 1500, acc) - consume("B", "fuzzing", 2000, acc) - consume("C", "fuzzing", 1500, acc) - consume("D", "fuzzing", 200, acc) - consume("E", "fuzzing", 100, acc) - - Expect(testSeed(5000)).NotTo(HaveOccurred()) + acc := make(chan *testConsumerMessage, 20000) + consume := func(consumerID string, max int32) { + defer GinkgoRecover() + + cs, err := NewConsumer(testKafkaAddrs, "fuzzing", testTopics, nil) + Expect(err).NotTo(HaveOccurred()) + defer cs.Close() + cs.consumerID = consumerID + + for msg := range cs.Messages() { + acc <- &testConsumerMessage{*msg, consumerID} + cs.MarkOffset(msg, "") + + if atomic.AddInt32(&max, -1) <= 0 { + return + } + } + } + + go consume("A", 1500) + go consume("B", 2000) + go consume("C", 1500) + go consume("D", 200) + go consume("E", 100) + time.Sleep(10 * time.Second) // wait for consumers to subscribe to topics + Expect(testSeed(5000, testTopics)).NotTo(HaveOccurred()) Eventually(func() int { return len(acc) }, "30s", "100ms").Should(BeNumerically(">=", 5000)) - consume("F", "fuzzing", 300, acc) - consume("G", "fuzzing", 400, acc) - consume("H", "fuzzing", 1000, acc) - consume("I", "fuzzing", 2000, acc) - Expect(testSeed(5000)).NotTo(HaveOccurred()) + go consume("F", 300) + go consume("G", 400) + go consume("H", 1000) + go consume("I", 2000) + Expect(testSeed(5000, testTopics)).NotTo(HaveOccurred()) Eventually(func() int { return len(acc) }, "30s", "100ms").Should(BeNumerically(">=", 8000)) - consume("J", "fuzzing", 1000, acc) - Expect(testSeed(5000)).NotTo(HaveOccurred()) + go consume("J", 1000) + Expect(testSeed(5000, testTopics)).NotTo(HaveOccurred()) Eventually(func() int { return len(acc) }, "30s", "100ms").Should(BeNumerically(">=", 9000)) - consume("K", "fuzzing", 1000, acc) - consume("L", "fuzzing", 3000, acc) - Expect(testSeed(5000)).NotTo(HaveOccurred()) + go consume("K", 1000) + go consume("L", 3000) + Expect(testSeed(5000, testTopics)).NotTo(HaveOccurred()) Eventually(func() int { return len(acc) }, "30s", "100ms").Should(BeNumerically(">=", 12000)) - consume("M", "fuzzing", 1000, acc) - Expect(testSeed(5000)).NotTo(HaveOccurred()) + go consume("M", 1000) + Expect(testSeed(5000, testTopics)).NotTo(HaveOccurred()) Eventually(func() int { return len(acc) }, "30s", "100ms").Should(BeNumerically(">=", 15000)) close(acc) @@ -190,4 +340,11 @@ var _ = Describe("Consumer", func() { Expect(uniques).To(HaveLen(15000)) }) + It("should allow close to be called multiple times", func() { + cs, err := newConsumerOf(testGroup, testTopics...) + Expect(err).NotTo(HaveOccurred()) + Expect(cs.Close()).NotTo(HaveOccurred()) + Expect(cs.Close()).NotTo(HaveOccurred()) + }) + }) diff --git a/examples_test.go b/examples_test.go new file mode 100644 index 0000000..cb084f4 --- /dev/null +++ b/examples_test.go @@ -0,0 +1,123 @@ +package cluster_test + +import ( + "fmt" + "log" + "os" + "os/signal" + "regexp" + + cluster "github.com/bsm/sarama-cluster" +) + +// This example shows how to use the consumer to read messages +// from a multiple topics through a multiplexed channel. +func ExampleConsumer() { + + // init (custom) config, enable errors and notifications + config := cluster.NewConfig() + config.Consumer.Return.Errors = true + config.Group.Return.Notifications = true + + // init consumer + brokers := []string{"127.0.0.1:9092"} + topics := []string{"my_topic", "other_topic"} + consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config) + if err != nil { + panic(err) + } + defer consumer.Close() + + // trap SIGINT to trigger a shutdown. + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Interrupt) + + // consume errors + go func() { + for err := range consumer.Errors() { + log.Printf("Error: %s\n", err.Error()) + } + }() + + // consume notifications + go func() { + for ntf := range consumer.Notifications() { + log.Printf("Rebalanced: %+v\n", ntf) + } + }() + + // consume messages, watch signals + for { + select { + case msg, ok := <-consumer.Messages(): + if ok { + fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value) + consumer.MarkOffset(msg, "") // mark message as processed + } + case <-signals: + return + } + } +} + +// This example shows how to use the consumer to read messages +// through individual partitions. +func ExampleConsumer_Partitions() { + + // init (custom) config, set mode to ConsumerModePartitions + config := cluster.NewConfig() + config.Group.Mode = cluster.ConsumerModePartitions + + // init consumer + brokers := []string{"127.0.0.1:9092"} + topics := []string{"my_topic", "other_topic"} + consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config) + if err != nil { + panic(err) + } + defer consumer.Close() + + // trap SIGINT to trigger a shutdown. + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Interrupt) + + // consume partitions + for { + select { + case part, ok := <-consumer.Partitions(): + if !ok { + return + } + + // start a separate goroutine to consume messages + go func(pc cluster.PartitionConsumer) { + for msg := range pc.Messages() { + fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value) + consumer.MarkOffset(msg, "") // mark message as processed + } + }(part) + case <-signals: + return + } + } +} + +// This example shows how to use the consumer with +// topic whitelists. +func ExampleConfig_whitelist() { + + // init (custom) config, enable errors and notifications + config := cluster.NewConfig() + config.Group.Topics.Whitelist = regexp.MustCompile(`myservice.*`) + + // init consumer + consumer, err := cluster.NewConsumer([]string{"127.0.0.1:9092"}, "my-consumer-group", nil, config) + if err != nil { + panic(err) + } + defer consumer.Close() + + // consume messages + msg := <-consumer.Messages() + fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value) +} diff --git a/glide.lock b/glide.lock deleted file mode 100644 index de0fa3d..0000000 --- a/glide.lock +++ /dev/null @@ -1,56 +0,0 @@ -hash: 76e38c5c70ece28550aacde5afd4ea321c4986b615e0bd145f899664ffc8888b -updated: 2016-08-15T15:23:34.489333458+01:00 -imports: -- name: github.com/davecgh/go-spew - version: 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d - subpackages: - - spew -- name: github.com/eapache/go-resiliency - version: b86b1ec0dd4209a588dc1285cdd471e73525c0b3 - subpackages: - - breaker -- name: github.com/eapache/go-xerial-snappy - version: bb955e01b9346ac19dc29eb16586c90ded99a98c -- name: github.com/eapache/queue - version: ded5959c0d4e360646dc9e9908cff48666781367 -- name: github.com/golang/snappy - version: d9eb7a3d35ec988b8585d4a0068e462c27d28380 -- name: github.com/klauspost/crc32 - version: 19b0b332c9e4516a6370a0456e6182c3b5036720 -- name: github.com/Shopify/sarama - version: 9bb4a68d57ff6f623363aa172f0a8297aa289ba7 -testImports: -- name: github.com/onsi/ginkgo - version: 059cec02d342bab423425a99b191186a03255e9e - subpackages: - - config - - extensions/table - - internal/codelocation - - internal/containernode - - internal/failer - - internal/leafnodes - - internal/remote - - internal/spec - - internal/specrunner - - internal/suite - - internal/testingtproxy - - internal/writer - - reporters - - reporters/stenographer - - types -- name: github.com/onsi/gomega - version: 0df7b7fdb38123c12a1f569f0affb2c2a7bd72cf - subpackages: - - format - - internal/assertion - - internal/asyncassertion - - internal/oraclematcher - - internal/testingtsupport - - matchers - - matchers/support/goraph/bipartitegraph - - matchers/support/goraph/edge - - matchers/support/goraph/node - - matchers/support/goraph/util - - types -- name: gopkg.in/yaml.v2 - version: a83829b6f1293c91addabc89d0571c246397bbf4 diff --git a/glide.yaml b/glide.yaml deleted file mode 100644 index 2bca63d..0000000 --- a/glide.yaml +++ /dev/null @@ -1,4 +0,0 @@ -package: github.com/bsm/sarama-cluster -import: -- package: github.com/Shopify/sarama - version: ^1.9.0 diff --git a/offsets.go b/offsets.go new file mode 100644 index 0000000..4223ac5 --- /dev/null +++ b/offsets.go @@ -0,0 +1,69 @@ +package cluster + +import ( + "sync" + + "github.com/Shopify/sarama" +) + +// OffsetStash allows to accumulate offsets and +// mark them as processed in a bulk +type OffsetStash struct { + offsets map[topicPartition]offsetInfo + mu sync.Mutex +} + +// NewOffsetStash inits a blank stash +func NewOffsetStash() *OffsetStash { + return &OffsetStash{offsets: make(map[topicPartition]offsetInfo)} +} + +// MarkOffset stashes the provided message offset +func (s *OffsetStash) MarkOffset(msg *sarama.ConsumerMessage, metadata string) { + s.MarkPartitionOffset(msg.Topic, msg.Partition, msg.Offset, metadata) +} + +// MarkPartitionOffset stashes the offset for the provided topic/partition combination +func (s *OffsetStash) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) { + s.mu.Lock() + defer s.mu.Unlock() + + key := topicPartition{Topic: topic, Partition: partition} + if info := s.offsets[key]; offset >= info.Offset { + info.Offset = offset + info.Metadata = metadata + s.offsets[key] = info + } +} + +// ResetPartitionOffset stashes the offset for the provided topic/partition combination. +// Difference between ResetPartitionOffset and MarkPartitionOffset is that, ResetPartitionOffset supports earlier offsets +func (s *OffsetStash) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) { + s.mu.Lock() + defer s.mu.Unlock() + + key := topicPartition{Topic: topic, Partition: partition} + if info := s.offsets[key]; offset <= info.Offset { + info.Offset = offset + info.Metadata = metadata + s.offsets[key] = info + } +} + +// ResetOffset stashes the provided message offset +// See ResetPartitionOffset for explanation +func (s *OffsetStash) ResetOffset(msg *sarama.ConsumerMessage, metadata string) { + s.ResetPartitionOffset(msg.Topic, msg.Partition, msg.Offset, metadata) +} + +// Offsets returns the latest stashed offsets by topic-partition +func (s *OffsetStash) Offsets() map[string]int64 { + s.mu.Lock() + defer s.mu.Unlock() + + res := make(map[string]int64, len(s.offsets)) + for tp, info := range s.offsets { + res[tp.String()] = info.Offset + } + return res +} diff --git a/offsets_test.go b/offsets_test.go new file mode 100644 index 0000000..0f86aba --- /dev/null +++ b/offsets_test.go @@ -0,0 +1,87 @@ +package cluster + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("OffsetStash", func() { + var subject *OffsetStash + + BeforeEach(func() { + subject = NewOffsetStash() + }) + + It("should update", func() { + Expect(subject.offsets).To(HaveLen(0)) + + subject.MarkPartitionOffset("topic", 0, 0, "m3ta") + Expect(subject.offsets).To(HaveLen(1)) + Expect(subject.offsets).To(HaveKeyWithValue( + topicPartition{Topic: "topic", Partition: 0}, + offsetInfo{Offset: 0, Metadata: "m3ta"}, + )) + + subject.MarkPartitionOffset("topic", 0, 200, "m3ta") + Expect(subject.offsets).To(HaveLen(1)) + Expect(subject.offsets).To(HaveKeyWithValue( + topicPartition{Topic: "topic", Partition: 0}, + offsetInfo{Offset: 200, Metadata: "m3ta"}, + )) + + subject.MarkPartitionOffset("topic", 0, 199, "m3t@") + Expect(subject.offsets).To(HaveLen(1)) + Expect(subject.offsets).To(HaveKeyWithValue( + topicPartition{Topic: "topic", Partition: 0}, + offsetInfo{Offset: 200, Metadata: "m3ta"}, + )) + + subject.MarkPartitionOffset("topic", 1, 300, "") + Expect(subject.offsets).To(HaveLen(2)) + Expect(subject.offsets).To(HaveKeyWithValue( + topicPartition{Topic: "topic", Partition: 1}, + offsetInfo{Offset: 300, Metadata: ""}, + )) + }) + + It("should reset", func() { + Expect(subject.offsets).To(HaveLen(0)) + + subject.MarkPartitionOffset("topic", 0, 0, "m3ta") + Expect(subject.offsets).To(HaveLen(1)) + Expect(subject.offsets).To(HaveKeyWithValue( + topicPartition{Topic: "topic", Partition: 0}, + offsetInfo{Offset: 0, Metadata: "m3ta"}, + )) + + subject.MarkPartitionOffset("topic", 0, 200, "m3ta") + Expect(subject.offsets).To(HaveLen(1)) + Expect(subject.offsets).To(HaveKeyWithValue( + topicPartition{Topic: "topic", Partition: 0}, + offsetInfo{Offset: 200, Metadata: "m3ta"}, + )) + + subject.ResetPartitionOffset("topic", 0, 199, "m3t@") + Expect(subject.offsets).To(HaveLen(1)) + Expect(subject.offsets).To(HaveKeyWithValue( + topicPartition{Topic: "topic", Partition: 0}, + offsetInfo{Offset: 199, Metadata: "m3t@"}, + )) + + subject.MarkPartitionOffset("topic", 1, 300, "") + Expect(subject.offsets).To(HaveLen(2)) + Expect(subject.offsets).To(HaveKeyWithValue( + topicPartition{Topic: "topic", Partition: 1}, + offsetInfo{Offset: 300, Metadata: ""}, + )) + + subject.ResetPartitionOffset("topic", 1, 200, "m3t@") + Expect(subject.offsets).To(HaveLen(2)) + Expect(subject.offsets).To(HaveKeyWithValue( + topicPartition{Topic: "topic", Partition: 1}, + offsetInfo{Offset: 200, Metadata: "m3t@"}, + )) + + }) + +}) diff --git a/partitions.go b/partitions.go index 9d97981..bfaa587 100644 --- a/partitions.go +++ b/partitions.go @@ -3,88 +3,161 @@ package cluster import ( "sort" "sync" + "time" "github.com/Shopify/sarama" ) +// PartitionConsumer allows code to consume individual partitions from the cluster. +// +// See docs for Consumer.Partitions() for more on how to implement this. +type PartitionConsumer interface { + sarama.PartitionConsumer + + // Topic returns the consumed topic name + Topic() string + + // Partition returns the consumed partition + Partition() int32 + + // InitialOffset returns the offset used for creating the PartitionConsumer instance. + // The returned offset can be a literal offset, or OffsetNewest, or OffsetOldest + InitialOffset() int64 + + // MarkOffset marks the offset of a message as preocessed. + MarkOffset(offset int64, metadata string) + + // ResetOffset resets the offset to a previously processed message. + ResetOffset(offset int64, metadata string) +} + type partitionConsumer struct { - pcm sarama.PartitionConsumer + sarama.PartitionConsumer state partitionState mu sync.Mutex - closed bool + topic string + partition int32 + initialOffset int64 + + closeOnce sync.Once + closeErr error + dying, dead chan none } func newPartitionConsumer(manager sarama.Consumer, topic string, partition int32, info offsetInfo, defaultOffset int64) (*partitionConsumer, error) { - pcm, err := manager.ConsumePartition(topic, partition, info.NextOffset(defaultOffset)) + offset := info.NextOffset(defaultOffset) + pcm, err := manager.ConsumePartition(topic, partition, offset) // Resume from default offset, if requested offset is out-of-range if err == sarama.ErrOffsetOutOfRange { info.Offset = -1 - pcm, err = manager.ConsumePartition(topic, partition, defaultOffset) + offset = defaultOffset + pcm, err = manager.ConsumePartition(topic, partition, offset) } if err != nil { return nil, err } return &partitionConsumer{ - pcm: pcm, - state: partitionState{Info: info}, + PartitionConsumer: pcm, + state: partitionState{Info: info}, + + topic: topic, + partition: partition, + initialOffset: offset, dying: make(chan none), dead: make(chan none), }, nil } -func (c *partitionConsumer) Loop(messages chan<- *sarama.ConsumerMessage, errors chan<- error) { +// Topic implements PartitionConsumer +func (c *partitionConsumer) Topic() string { return c.topic } + +// Partition implements PartitionConsumer +func (c *partitionConsumer) Partition() int32 { return c.partition } + +// InitialOffset implements PartitionConsumer +func (c *partitionConsumer) InitialOffset() int64 { return c.initialOffset } + +// AsyncClose implements PartitionConsumer +func (c *partitionConsumer) AsyncClose() { + c.closeOnce.Do(func() { + c.closeErr = c.PartitionConsumer.Close() + close(c.dying) + }) +} + +// Close implements PartitionConsumer +func (c *partitionConsumer) Close() error { + c.AsyncClose() + <-c.dead + return c.closeErr +} + +func (c *partitionConsumer) waitFor(stopper <-chan none, errors chan<- error) { + defer close(c.dead) + + for { + select { + case err, ok := <-c.Errors(): + if !ok { + return + } + select { + case errors <- err: + case <-stopper: + return + case <-c.dying: + return + } + case <-stopper: + return + case <-c.dying: + return + } + } +} + +func (c *partitionConsumer) multiplex(stopper <-chan none, messages chan<- *sarama.ConsumerMessage, errors chan<- error) { defer close(c.dead) for { select { - case msg, ok := <-c.pcm.Messages(): + case msg, ok := <-c.Messages(): if !ok { return } select { case messages <- msg: + case <-stopper: + return case <-c.dying: return } - case err, ok := <-c.pcm.Errors(): + case err, ok := <-c.Errors(): if !ok { return } select { case errors <- err: + case <-stopper: + return case <-c.dying: return } + case <-stopper: + return case <-c.dying: return } } } -func (c *partitionConsumer) Close() error { - if c.closed { - return nil - } - - err := c.pcm.Close() - c.closed = true - close(c.dying) - <-c.dead - - return err -} - -func (c *partitionConsumer) State() partitionState { - if c == nil { - return partitionState{} - } - +func (c *partitionConsumer) getState() partitionState { c.mu.Lock() state := c.state c.mu.Unlock() @@ -92,11 +165,7 @@ func (c *partitionConsumer) State() partitionState { return state } -func (c *partitionConsumer) MarkCommitted(offset int64) { - if c == nil { - return - } - +func (c *partitionConsumer) markCommitted(offset int64) { c.mu.Lock() if offset == c.state.Info.Offset { c.state.Dirty = false @@ -104,14 +173,22 @@ func (c *partitionConsumer) MarkCommitted(offset int64) { c.mu.Unlock() } +// MarkOffset implements PartitionConsumer func (c *partitionConsumer) MarkOffset(offset int64, metadata string) { - if c == nil { - return + c.mu.Lock() + if next := offset + 1; next > c.state.Info.Offset { + c.state.Info.Offset = next + c.state.Info.Metadata = metadata + c.state.Dirty = true } + c.mu.Unlock() +} +// ResetOffset implements PartitionConsumer +func (c *partitionConsumer) ResetOffset(offset int64, metadata string) { c.mu.Lock() - if offset > c.state.Info.Offset { - c.state.Info.Offset = offset + if next := offset + 1; next <= c.state.Info.Offset { + c.state.Info.Offset = next c.state.Info.Metadata = metadata c.state.Dirty = true } @@ -121,8 +198,9 @@ func (c *partitionConsumer) MarkOffset(offset int64, metadata string) { // -------------------------------------------------------------------- type partitionState struct { - Info offsetInfo - Dirty bool + Info offsetInfo + Dirty bool + LastCommit time.Time } // -------------------------------------------------------------------- @@ -138,6 +216,18 @@ func newPartitionMap() *partitionMap { } } +func (m *partitionMap) IsSubscribedTo(topic string) bool { + m.mu.RLock() + defer m.mu.RUnlock() + + for tp := range m.data { + if tp.Topic == topic { + return true + } + } + return false +} + func (m *partitionMap) Fetch(topic string, partition int32) *partitionConsumer { m.mu.RLock() pc, _ := m.data[topicPartition{topic, partition}] @@ -151,25 +241,13 @@ func (m *partitionMap) Store(topic string, partition int32, pc *partitionConsume m.mu.Unlock() } -func (m *partitionMap) HasDirty() bool { - m.mu.RLock() - defer m.mu.RUnlock() - - for _, pc := range m.data { - if state := pc.State(); state.Dirty { - return true - } - } - return false -} - func (m *partitionMap) Snapshot() map[topicPartition]partitionState { m.mu.RLock() defer m.mu.RUnlock() snap := make(map[topicPartition]partitionState, len(m.data)) for tp, pc := range m.data { - snap[tp] = pc.State() + snap[tp] = pc.getState() } return snap } diff --git a/partitions_test.go b/partitions_test.go index 0b02fbe..4065ea5 100644 --- a/partitions_test.go +++ b/partitions_test.go @@ -21,9 +21,8 @@ var _ = Describe("partitionConsumer", func() { }) It("should set state", func() { - Expect(subject.State()).To(Equal(partitionState{ - Info: offsetInfo{2000, "m3ta"}, - Dirty: false, + Expect(subject.getState()).To(Equal(partitionState{ + Info: offsetInfo{2000, "m3ta"}, })) }) @@ -33,50 +32,56 @@ var _ = Describe("partitionConsumer", func() { defer pc.Close() close(pc.dead) - state := pc.State() + state := pc.getState() Expect(state.Info.Offset).To(Equal(int64(-1))) Expect(state.Info.Metadata).To(Equal("m3ta")) }) It("should update state", func() { subject.MarkOffset(2001, "met@") // should set state - Expect(subject.State()).To(Equal(partitionState{ - Info: offsetInfo{2001, "met@"}, + Expect(subject.getState()).To(Equal(partitionState{ + Info: offsetInfo{2002, "met@"}, Dirty: true, })) - subject.MarkCommitted(2001) // should reset dirty status - Expect(subject.State()).To(Equal(partitionState{ - Info: offsetInfo{2001, "met@"}, - Dirty: false, + subject.markCommitted(2002) // should reset dirty status + Expect(subject.getState()).To(Equal(partitionState{ + Info: offsetInfo{2002, "met@"}, })) subject.MarkOffset(2001, "me7a") // should not update state - Expect(subject.State()).To(Equal(partitionState{ - Info: offsetInfo{2001, "met@"}, - Dirty: false, + Expect(subject.getState()).To(Equal(partitionState{ + Info: offsetInfo{2002, "met@"}, })) subject.MarkOffset(2002, "me7a") // should bump state - Expect(subject.State()).To(Equal(partitionState{ - Info: offsetInfo{2002, "me7a"}, + Expect(subject.getState()).To(Equal(partitionState{ + Info: offsetInfo{2003, "me7a"}, Dirty: true, })) - subject.MarkCommitted(2001) // should not unset state - Expect(subject.State()).To(Equal(partitionState{ - Info: offsetInfo{2002, "me7a"}, + // After committing a later offset, try rewinding back to earlier offset with new metadata. + subject.ResetOffset(2001, "met@") + Expect(subject.getState()).To(Equal(partitionState{ + Info: offsetInfo{2002, "met@"}, + Dirty: true, + })) + + subject.markCommitted(2002) // should not unset state + Expect(subject.getState()).To(Equal(partitionState{ + Info: offsetInfo{2002, "met@"}, + })) + + subject.MarkOffset(2002, "me7a") // should bump state + Expect(subject.getState()).To(Equal(partitionState{ + Info: offsetInfo{2003, "me7a"}, Dirty: true, })) - }) - It("should not fail when nil", func() { - blank := (*partitionConsumer)(nil) - Expect(func() { - _ = blank.State() - blank.MarkOffset(2001, "met@") - blank.MarkCommitted(2001) - }).NotTo(Panic()) + subject.markCommitted(2003) + Expect(subject.getState()).To(Equal(partitionState{ + Info: offsetInfo{2003, "me7a"}, + })) }) }) @@ -121,11 +126,11 @@ var _ = Describe("partitionMap", func() { subject.Store("topic", 0, pc0) subject.Store("topic", 1, pc1) - subject.Fetch("topic", 1).MarkOffset(2001, "met@") + subject.Fetch("topic", 1).MarkOffset(2000, "met@") Expect(subject.Snapshot()).To(Equal(map[topicPartition]partitionState{ - topicPartition{"topic", 0}: {offsetInfo{2000, "m3ta"}, false}, - topicPartition{"topic", 1}: {offsetInfo{2001, "met@"}, true}, + {"topic", 0}: {Info: offsetInfo{2000, "m3ta"}, Dirty: false}, + {"topic", 1}: {Info: offsetInfo{2001, "met@"}, Dirty: true}, })) }) diff --git a/testdata/server.properties b/testdata/server.properties index 972b2aa..06e7a49 100644 --- a/testdata/server.properties +++ b/testdata/server.properties @@ -14,3 +14,4 @@ log.retention.check.interval.ms=60000 log.cleaner.enable=true zookeeper.connect=localhost:22181 zookeeper.connection.timeout.ms=1000000 +offsets.topic.replication.factor=1 diff --git a/util.go b/util.go new file mode 100644 index 0000000..e7cb5dd --- /dev/null +++ b/util.go @@ -0,0 +1,75 @@ +package cluster + +import ( + "fmt" + "sort" + "sync" +) + +type none struct{} + +type topicPartition struct { + Topic string + Partition int32 +} + +func (tp *topicPartition) String() string { + return fmt.Sprintf("%s-%d", tp.Topic, tp.Partition) +} + +type offsetInfo struct { + Offset int64 + Metadata string +} + +func (i offsetInfo) NextOffset(fallback int64) int64 { + if i.Offset > -1 { + return i.Offset + } + return fallback +} + +type int32Slice []int32 + +func (p int32Slice) Len() int { return len(p) } +func (p int32Slice) Less(i, j int) bool { return p[i] < p[j] } +func (p int32Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + +func (p int32Slice) Diff(o int32Slice) (res []int32) { + on := len(o) + for _, x := range p { + n := sort.Search(on, func(i int) bool { return o[i] >= x }) + if n < on && o[n] == x { + continue + } + res = append(res, x) + } + return +} + +// -------------------------------------------------------------------- + +type loopTomb struct { + c chan none + o sync.Once + w sync.WaitGroup +} + +func newLoopTomb() *loopTomb { + return &loopTomb{c: make(chan none)} +} + +func (t *loopTomb) stop() { t.o.Do(func() { close(t.c) }) } +func (t *loopTomb) Close() { t.stop(); t.w.Wait() } + +func (t *loopTomb) Dying() <-chan none { return t.c } +func (t *loopTomb) Go(f func(<-chan none)) { + t.w.Add(1) + + go func() { + defer t.stop() + defer t.w.Done() + + f(t.c) + }() +}