Skip to content

Commit

Permalink
Merge upstream (#2)
Browse files Browse the repository at this point in the history
* Add SASL-SCRAM ability to Kafka connection

This PR adds the ability to connect to Kafka via SASL-SCRAM 256 or 512

It adds an entry in the SASL Profile configuration called
key=mechanism
value type=string
required=no
default value=(empty)
Which accepts either values SCRAM-SHA-256, SCRAM-SHA-512

Partially addresses linkedin#526

* Ignore ZooKeeper znode Create if the path already exists

Currently, Burrow will attempt to create the znode used by Burrow on startup
This will cause problems if there is authentication needed when connecting
to zk.

The fix is to ignore creating zk node paths if it already exists

* Yext specific Dockerfile

the config file and dir used by Burrow is updated for M4 and Khan

* Update module and import references

go build -o build/Burrow github.com/rjh-yext/Burrow
pulls in linkedin's branch of Burrow

Changing references of linkedin to current fork

* Fix Travis CI build.

* Add Prometheus Metrics Exporter

* Add support for Kafka 2.5.0 anf Go 1.14

* NameToCertificate only allows associating a single certificate with a given name. Leave that field nil to let the library select the first compatible chain from Certificates.

* Update sarama with a fix for IBM/sarama#1692

* Removing CI jobs from upstream merge.

Co-authored-by: Roger Hwang <rhwang@yext.com>
Co-authored-by: rjh-yext <59895513+rjh-yext@users.noreply.github.com>
Co-authored-by: klDen <kl@kenzymele.com>
Co-authored-by: Michael Wain <michaelwain1990@gmail.com>
Co-authored-by: Vlad Gorodetsky <v@gor.io>
Co-authored-by: Vlad Gorodetsky <17348+bai@users.noreply.github.com>
  • Loading branch information
7 people committed May 15, 2020
1 parent 9d3a2c5 commit 26d79f3
Show file tree
Hide file tree
Showing 13 changed files with 696 additions and 185 deletions.
44 changes: 0 additions & 44 deletions .github/workflows/ci.yml

This file was deleted.

49 changes: 0 additions & 49 deletions .github/workflows/release.yml

This file was deleted.

2 changes: 1 addition & 1 deletion Dockerfile
@@ -1,5 +1,5 @@
# stage 1: builder
FROM golang:1.13.7-alpine as builder
FROM golang:1.14.2-alpine as builder

ENV BURROW_SRC /usr/src/Burrow/

Expand Down
14 changes: 13 additions & 1 deletion core/internal/helpers/sarama.go
Expand Up @@ -55,6 +55,7 @@ var kafkaVersions = map[string]sarama.KafkaVersion{
"2.2.1": sarama.V2_2_0_0,
"2.3.0": sarama.V2_3_0_0,
"2.4.0": sarama.V2_4_0_0,
"2.5.0": sarama.V2_5_0_0,
}

func parseKafkaVersion(kafkaVersion string) sarama.KafkaVersion {
Expand Down Expand Up @@ -113,7 +114,6 @@ func GetSaramaConfigFromClientProfile(profileName string) *sarama.Config {
panic("cannot read TLS certificate or key file: " + err.Error())
}
saramaConfig.Net.TLS.Config.Certificates = []tls.Certificate{cert}
saramaConfig.Net.TLS.Config.BuildNameToCertificate()
}
}
saramaConfig.Net.TLS.Config.InsecureSkipVerify = viper.GetBool("tls." + tlsName + ".noverify")
Expand All @@ -124,6 +124,18 @@ func GetSaramaConfigFromClientProfile(profileName string) *sarama.Config {
saslName := viper.GetString(configRoot + ".sasl")

saramaConfig.Net.SASL.Enable = true
mechanism := viper.GetString("sasl." + saslName + ".mechanism")
if mechanism == "SCRAM-SHA-256" {
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
saramaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA256}
}
} else if mechanism == "SCRAM-SHA-512" {
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
saramaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
}
}
saramaConfig.Net.SASL.Handshake = viper.GetBool("sasl." + saslName + ".handshake-first")
saramaConfig.Net.SASL.User = viper.GetString("sasl." + saslName + ".username")
saramaConfig.Net.SASL.Password = viper.GetString("sasl." + saslName + ".password")
Expand Down
35 changes: 35 additions & 0 deletions core/internal/helpers/scram.go
@@ -0,0 +1,35 @@
package helpers

import (
"crypto/sha256"
"crypto/sha512"

"github.com/xdg/scram"
)

var SHA256 scram.HashGeneratorFcn = sha256.New
var SHA512 scram.HashGeneratorFcn = sha512.New

type XDGSCRAMClient struct {
*scram.Client
*scram.ClientConversation
scram.HashGeneratorFcn
}

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
if err != nil {
return err
}
x.ClientConversation = x.Client.NewConversation()
return nil
}

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
response, err = x.ClientConversation.Step(challenge)
return
}

func (x *XDGSCRAMClient) Done() bool {
return x.ClientConversation.Done()
}
11 changes: 11 additions & 0 deletions core/internal/helpers/zookeeper.go
Expand Up @@ -120,6 +120,11 @@ func (z *BurrowZookeeperClient) GetW(path string) ([]byte, *zk.Stat, <-chan zk.E
return z.client.GetW(path)
}

// Exists returns a boolean stating whether or not the specified path exists.
func (z *BurrowZookeeperClient) Exists(path string) (bool, *zk.Stat, error) {
return z.client.Exists(path)
}

// ExistsW returns a boolean stating whether or not the specified path exists. This method also sets a watch on the node
// (exists if it does not currently exist, or a data watch otherwise), providing an event channel that will receive a
// message when the watch fires
Expand Down Expand Up @@ -179,6 +184,12 @@ func (m *MockZookeeperClient) GetW(path string) ([]byte, *zk.Stat, <-chan zk.Eve
return args.Get(0).([]byte), args.Get(1).(*zk.Stat), args.Get(2).(<-chan zk.Event), args.Error(3)
}

// Exists mocks protocol.ZookeeperClient.Exists
func (m *MockZookeeperClient) Exists(path string) (bool, *zk.Stat, error) {
args := m.Called(path)
return args.Bool(0), args.Get(1).(*zk.Stat), args.Error(2)
}

// ExistsW mocks protocol.ZookeeperClient.ExistsW
func (m *MockZookeeperClient) ExistsW(path string) (bool, *zk.Stat, <-chan zk.Event, error) {
args := m.Called(path)
Expand Down
3 changes: 2 additions & 1 deletion core/internal/httpserver/coordinator.go
Expand Up @@ -115,7 +115,6 @@ func (hc *Coordinator) Configure() {
panic("cannot read TLS certificate or key file: " + err.Error())
}
server.TLSConfig.Certificates = []tls.Certificate{cert}
server.TLSConfig.BuildNameToCertificate()
}
hc.servers[name] = server
hc.theCert[name] = certFile
Expand All @@ -130,6 +129,8 @@ func (hc *Coordinator) Configure() {
// This is a healthcheck URL. Please don't change it
hc.router.GET("/burrow/admin", hc.handleAdmin)

hc.router.Handler(http.MethodGet, "/metrics", hc.handlePrometheusMetrics())

// All valid paths go here
hc.router.GET("/v3/kafka", hc.handleClusterList)
hc.router.GET("/v3/kafka/:cluster", hc.handleClusterDetail)
Expand Down
182 changes: 182 additions & 0 deletions core/internal/httpserver/prometheus.go
@@ -0,0 +1,182 @@
package httpserver

import (
"net/http"
"strconv"

"github.com/prometheus/client_golang/prometheus"

"github.com/linkedin/Burrow/core/protocol"

"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

var (
consumerTotalLagGauge = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "burrow_kafka_consumer_lag_total",
Help: "The sum of all partition current lag values for the group",
},
[]string{"cluster", "consumer_group"},
)

consumerStatusGauge = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "burrow_kafka_consumer_status",
Help: "The status of the consumer group. It is calculated from the highest status for the individual partitions. Statuses are an index list from NOTFOUND, OK, WARN, ERR, STOP, STALL, REWIND",
},
[]string{"cluster", "consumer_group"},
)

consumerPartitionCurrentOffset = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "burrow_kafka_consumer_current_offset",
Help: "Latest offset that Burrow is storing for this partition",
},
[]string{"cluster", "consumer_group", "topic", "partition"},
)

consumerPartitionLagGauge = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "burrow_kafka_consumer_partition_lag",
Help: "Number of messages the consumer group is behind by for a partition as reported by Burrow",
},
[]string{"cluster", "consumer_group", "topic", "partition"},
)

topicPartitionOffsetGauge = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "burrow_kafka_topic_partition_offset",
Help: "Latest offset the topic that Burrow is storing for this partition",
},
[]string{"cluster", "topic", "partition"},
)
)

func (hc *Coordinator) handlePrometheusMetrics() http.HandlerFunc {
promHandler := promhttp.Handler()

return http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) {
for _, cluster := range listClusters(hc.App) {
for _, consumer := range listConsumers(hc.App, cluster) {
consumerStatus := getFullConsumerStatus(hc.App, cluster, consumer)

if consumerStatus == nil ||
consumerStatus.Status == protocol.StatusNotFound ||
consumerStatus.Complete < 1.0 {
continue
}

labels := map[string]string{
"cluster": cluster,
"consumer_group": consumer,
}

consumerTotalLagGauge.With(labels).Set(float64(consumerStatus.TotalLag))
consumerStatusGauge.With(labels).Set(float64(consumerStatus.Status))

for _, partition := range consumerStatus.Partitions {
if partition.Complete < 1.0 {
continue
}

labels := map[string]string{
"cluster": cluster,
"consumer_group": consumer,
"topic": partition.Topic,
"partition": strconv.FormatInt(int64(partition.Partition), 10),
}

consumerPartitionCurrentOffset.With(labels).Set(float64(partition.End.Offset))
consumerPartitionLagGauge.With(labels).Set(float64(partition.CurrentLag))
}
}

// Topics
for _, topic := range listTopics(hc.App, cluster) {
for partitionNumber, offset := range getTopicDetail(hc.App, cluster, topic) {
topicPartitionOffsetGauge.With(map[string]string{
"cluster": cluster,
"topic": topic,
"partition": strconv.FormatInt(int64(partitionNumber), 10),
}).Set(float64(offset))
}
}
}

promHandler.ServeHTTP(resp, req)
})
}

func listClusters(app *protocol.ApplicationContext) []string {
request := &protocol.StorageRequest{
RequestType: protocol.StorageFetchClusters,
Reply: make(chan interface{}),
}
app.StorageChannel <- request
response := <-request.Reply
if response == nil {
return []string{}
}

return response.([]string)
}

func listConsumers(app *protocol.ApplicationContext, cluster string) []string {
request := &protocol.StorageRequest{
RequestType: protocol.StorageFetchConsumers,
Cluster: cluster,
Reply: make(chan interface{}),
}
app.StorageChannel <- request
response := <-request.Reply
if response == nil {
return []string{}
}

return response.([]string)
}

func getFullConsumerStatus(app *protocol.ApplicationContext, cluster, consumer string) *protocol.ConsumerGroupStatus {
request := &protocol.EvaluatorRequest{
Cluster: cluster,
Group: consumer,
ShowAll: true,
Reply: make(chan *protocol.ConsumerGroupStatus),
}
app.EvaluatorChannel <- request
response := <-request.Reply
return response
}

func listTopics(app *protocol.ApplicationContext, cluster string) []string {
request := &protocol.StorageRequest{
RequestType: protocol.StorageFetchTopics,
Cluster: cluster,
Reply: make(chan interface{}),
}
app.StorageChannel <- request
response := <-request.Reply
if response == nil {
return []string{}
}

return response.([]string)
}

func getTopicDetail(app *protocol.ApplicationContext, cluster, topic string) []int64 {
request := &protocol.StorageRequest{
RequestType: protocol.StorageFetchTopic,
Cluster: cluster,
Topic: topic,
Reply: make(chan interface{}),
}
app.StorageChannel <- request
response := <-request.Reply
if response == nil {
return []int64{}
}

return response.([]int64)
}

0 comments on commit 26d79f3

Please sign in to comment.