Skip to content

Commit

Permalink
Merge pull request #627 from klDen/master
Browse files Browse the repository at this point in the history
Fixes Travis CI build of PR 598
  • Loading branch information
bai committed May 11, 2020
2 parents 4ce194f + f02501b commit bc09a91
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 12 deletions.
12 changes: 12 additions & 0 deletions core/internal/helpers/sarama.go
Expand Up @@ -124,6 +124,18 @@ func GetSaramaConfigFromClientProfile(profileName string) *sarama.Config {
saslName := viper.GetString(configRoot + ".sasl")

saramaConfig.Net.SASL.Enable = true
mechanism := viper.GetString("sasl." + saslName + ".mechanism")
if mechanism == "SCRAM-SHA-256" {
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
saramaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA256}
}
} else if mechanism == "SCRAM-SHA-512" {
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
saramaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
}
}
saramaConfig.Net.SASL.Handshake = viper.GetBool("sasl." + saslName + ".handshake-first")
saramaConfig.Net.SASL.User = viper.GetString("sasl." + saslName + ".username")
saramaConfig.Net.SASL.Password = viper.GetString("sasl." + saslName + ".password")
Expand Down
35 changes: 35 additions & 0 deletions core/internal/helpers/scram.go
@@ -0,0 +1,35 @@
package helpers

import (
"crypto/sha256"
"crypto/sha512"

"github.com/xdg/scram"
)

var SHA256 scram.HashGeneratorFcn = sha256.New
var SHA512 scram.HashGeneratorFcn = sha512.New

type XDGSCRAMClient struct {
*scram.Client
*scram.ClientConversation
scram.HashGeneratorFcn
}

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
if err != nil {
return err
}
x.ClientConversation = x.Client.NewConversation()
return nil
}

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
response, err = x.ClientConversation.Step(challenge)
return
}

func (x *XDGSCRAMClient) Done() bool {
return x.ClientConversation.Done()
}
11 changes: 11 additions & 0 deletions core/internal/helpers/zookeeper.go
Expand Up @@ -120,6 +120,11 @@ func (z *BurrowZookeeperClient) GetW(path string) ([]byte, *zk.Stat, <-chan zk.E
return z.client.GetW(path)
}

// Exists returns a boolean stating whether or not the specified path exists.
func (z *BurrowZookeeperClient) Exists(path string) (bool, *zk.Stat, error) {
return z.client.Exists(path)
}

// ExistsW returns a boolean stating whether or not the specified path exists. This method also sets a watch on the node
// (exists if it does not currently exist, or a data watch otherwise), providing an event channel that will receive a
// message when the watch fires
Expand Down Expand Up @@ -179,6 +184,12 @@ func (m *MockZookeeperClient) GetW(path string) ([]byte, *zk.Stat, <-chan zk.Eve
return args.Get(0).([]byte), args.Get(1).(*zk.Stat), args.Get(2).(<-chan zk.Event), args.Error(3)
}

// Exists mocks protocol.ZookeeperClient.Exists
func (m *MockZookeeperClient) Exists(path string) (bool, *zk.Stat, error) {
args := m.Called(path)
return args.Bool(0), args.Get(1).(*zk.Stat), args.Error(2)
}

// ExistsW mocks protocol.ZookeeperClient.ExistsW
func (m *MockZookeeperClient) ExistsW(path string) (bool, *zk.Stat, <-chan zk.Event, error) {
args := m.Called(path)
Expand Down
14 changes: 10 additions & 4 deletions core/internal/zookeeper/coordinator.go
Expand Up @@ -124,10 +124,16 @@ func (zc *Coordinator) createRecursive(path string) error {

parts := strings.Split(path, "/")
for i := 2; i <= len(parts); i++ {
_, err := zc.App.Zookeeper.Create(strings.Join(parts[:i], "/"), []byte{}, 0, zk.WorldACL(zk.PermAll))
// Ignore when the node exists already
if (err != nil) && (err != zk.ErrNodeExists) {
return err
// If the rootpath exists, skip the Create process to avoid "zk: not authenticated" error
exist, _, errExists := zc.App.Zookeeper.Exists(strings.Join(parts[:i], "/"))
if !exist {
_, err := zc.App.Zookeeper.Create(strings.Join(parts[:i], "/"), []byte{}, 0, zk.WorldACL(zk.PermAll))
// Ignore when the node exists already
if (err != nil) && (err != zk.ErrNodeExists) {
return err
}
} else {
return errExists
}
}
return nil
Expand Down
6 changes: 5 additions & 1 deletion core/protocol/protocol.go
Expand Up @@ -141,13 +141,17 @@ type ZookeeperClient interface {
// the children of the specified path, providing an event channel that will receive a message when the watch fires
GetW(path string) ([]byte, *zk.Stat, <-chan zk.Event, error)

// For the given path in Zookeeper, return a boolean stating whether or not the node exists.
// The method does not set watch on the node, but verifies existence of a node to avoid authentication error.
Exists(path string) (bool, *zk.Stat, error)

// For the given path in Zookeeper, return a boolean stating whether or not the node exists. This method also sets
// a watch on the node (exists if it does not currently exist, or a data watch otherwise), providing an event
// channel that will receive a message when the watch fires
ExistsW(path string) (bool, *zk.Stat, <-chan zk.Event, error)

// Create makes a new ZNode at the specified path with the contents set to the data byte-slice. Flags can be
// provided to specify that this is an ephemeral or sequence node, and an ACL must be provided. If no ACL is\
// provided to specify that this is an ephemeral or sequence node, and an ACL must be provided. If no ACL is
// desired, specify
// zk.WorldACL(zk.PermAll)
Create(string, []byte, int32, []zk.ACL) (string, error)
Expand Down
18 changes: 11 additions & 7 deletions go.mod
Expand Up @@ -26,13 +26,17 @@ require (
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.6.3
github.com/stretchr/objx v0.2.0 // indirect
github.com/stretchr/testify v1.5.1
go.uber.org/zap v1.15.0
golang.org/x/crypto v0.0.0-20200429183012-4b2356b1ed79 // indirect
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
golang.org/x/net v0.0.0-20200505041828-1ed23360d12c // indirect
golang.org/x/sys v0.0.0-20200501145240-bc7a7d42d5c3 // indirect
golang.org/x/tools v0.0.0-20200505023115-26f46d2f7ef8 // indirect
github.com/stretchr/testify v1.4.0
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
go.uber.org/atomic v1.5.1 // indirect
go.uber.org/multierr v1.4.0 // indirect
go.uber.org/zap v1.13.0
golang.org/x/crypto v0.0.0-20200109152110-61a87790db17 // indirect
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f // indirect
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa // indirect
golang.org/x/sys v0.0.0-20200113162924-86b910548bc1 // indirect
golang.org/x/text v0.3.2 // indirect
golang.org/x/tools v0.0.0-20200114052453-d31a08c2edf2 // indirect
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Expand Up @@ -199,7 +199,10 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
Expand Down

0 comments on commit bc09a91

Please sign in to comment.