Skip to content

Commit

Permalink
removes logrus from the example
Browse files Browse the repository at this point in the history
  • Loading branch information
d1egoaz committed Aug 11, 2020
1 parent 623c14f commit 3f365d9
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 19 deletions.
1 change: 0 additions & 1 deletion examples/interceptors/go.mod
Expand Up @@ -6,7 +6,6 @@ replace github.com/Shopify/sarama => ../../

require (
github.com/Shopify/sarama v1.27.0
github.com/sirupsen/logrus v1.6.0
go.opentelemetry.io/otel v0.10.0
go.opentelemetry.io/otel/exporters/stdout v0.10.0
google.golang.org/genproto v0.0.0-20200331122359-1ee6d9798940 // indirect
Expand Down
6 changes: 0 additions & 6 deletions examples/interceptors/go.sum
Expand Up @@ -56,8 +56,6 @@ github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem
github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
github.com/klauspost/compress v1.10.10 h1:a/y8CglcM7gLGYmlbP/stPE5sR3hbhFRUjCBfd/0B3I=
github.com/klauspost/compress v1.10.10/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
Expand All @@ -75,10 +73,7 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo5oJ0jf870itWm3D0Sjh7+Za9gazKc5LQ=
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.6.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
Expand Down Expand Up @@ -113,7 +108,6 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 h1:ogLJMz+qpzav7lGMh10LMvAkM/fAoGlaiiHYiFYdm80=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
16 changes: 7 additions & 9 deletions examples/interceptors/main.go
Expand Up @@ -10,7 +10,6 @@ import (
"time"

"github.com/Shopify/sarama"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/exporters/stdout"
)

Expand All @@ -24,7 +23,7 @@ func main() {
flag.Parse()

if *brokers == "" {
log.Fatalln("at least one broker is required")
logger.Fatalln("at least one broker is required")
}
splitBrokers := strings.Split(*brokers, ",")
sarama.Logger = log.New(os.Stdout, "[Sarama] ", log.LstdFlags)
Expand All @@ -34,7 +33,7 @@ func main() {
stdout.WithQuantiles([]float64{0.5, 0.9, 0.99}),
}, nil)
if err != nil {
log.Fatalf("failed to initialize stdout export pipeline: %v", err)
logger.Fatalf("failed to initialize stdout export pipeline: %v", err)
}
defer pusher.Stop()

Expand All @@ -45,8 +44,7 @@ func main() {

producer, err := sarama.NewAsyncProducer(splitBrokers, conf)
if err != nil {
logrus.WithError(err).Error("Couldn't create a Kafka producer")
return
panic("Couldn't create a Kafka producer")
}
defer producer.AsyncClose()

Expand All @@ -58,21 +56,21 @@ func main() {
bulkSize := 2
duration := 5 * time.Second
ticker := time.NewTicker(duration)
logrus.Infof("Starting to produce %v messages every %v", bulkSize, duration)
logger.Printf("Starting to produce %v messages every %v", bulkSize, duration)
for {
select {
case t := <-ticker.C:
now := t.Format(time.RFC3339)
logrus.WithFields(logrus.Fields{"topic": *topic}).Infof("producing %v messages at %s", bulkSize, now)
logger.Printf("\nproducing %v messages to topic %s at %s", bulkSize, *topic, now)
for i := 0; i < bulkSize; i++ {
producer.Input() <- &sarama.ProducerMessage{
Topic: *topic, Key: nil,
Value: sarama.StringEncoder(fmt.Sprintf("test message %v/%v from kafka-client-go-test at %s", i+1, bulkSize, now)),
}
}
case <-signals:
logrus.Info("terminating the program")
logrus.Info("Bye :)")
logger.Println("terminating the program")
logger.Println("Bye :)")
return
}
}
Expand Down
10 changes: 7 additions & 3 deletions examples/interceptors/trace_interceptor.go
Expand Up @@ -33,13 +33,13 @@ func NewOtelInterceptor(brokers []string) *otelInterceptor {
return &oi
}

var (
const (
MessageIDHeaderName = "message_id"
SpanHeaderName = "span_id"
TraceHeaderName = "trace_id"
)

func (oi *otelInterceptor) OnSend(msg *sarama.ProducerMessage) {
func shouldIgnoreMsg(msg *sarama.ProducerMessage) bool {
// check message hasn't been here before (retries)
var traceFound, spanFound, msgIDFound bool
for _, h := range msg.Headers {
Expand All @@ -55,7 +55,11 @@ func (oi *otelInterceptor) OnSend(msg *sarama.ProducerMessage) {
msgIDFound = true
}
}
if traceFound && spanFound && msgIDFound {
return traceFound && spanFound && msgIDFound
}

func (oi *OTelInterceptor) OnSend(msg *sarama.ProducerMessage) {
if shouldIgnoreMsg(msg) {
return
}
_ = oi.tracer.WithSpan(context.TODO(), msg.Topic,
Expand Down

0 comments on commit 3f365d9

Please sign in to comment.