Skip to content

Commit

Permalink
Reduce exposure to opentracing lib
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Nov 25, 2021
1 parent 0b38cf6 commit 5983fd7
Show file tree
Hide file tree
Showing 71 changed files with 512 additions and 273 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ Benthos [exposes lots of metrics][metrics] either to Statsd, Prometheus or for d

### Tracing

Benthos also [emits opentracing events][tracers] to a tracer of your choice (currently only [Jaeger][jaeger] is supported) which can be used to visualise the processors within a pipeline.
Benthos also [emits tracing events][tracers] to a tracer of your choice (currently only [Jaeger][jaeger] is supported) which can be used to visualise the processors within a pipeline.

## Configuration

Expand Down
2 changes: 1 addition & 1 deletion internal/component/buffer/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"time"

"github.com/Jeffail/benthos/v3/internal/shutdown"
"github.com/Jeffail/benthos/v3/internal/tracing"
"github.com/Jeffail/benthos/v3/lib/buffer"
"github.com/Jeffail/benthos/v3/lib/log"
"github.com/Jeffail/benthos/v3/lib/message/tracing"
"github.com/Jeffail/benthos/v3/lib/metrics"
"github.com/Jeffail/benthos/v3/lib/response"
"github.com/Jeffail/benthos/v3/lib/types"
Expand Down
16 changes: 2 additions & 14 deletions internal/component/input/span_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ import (
"github.com/Jeffail/benthos/v3/internal/bloblang/mapping"
"github.com/Jeffail/benthos/v3/internal/docs"
"github.com/Jeffail/benthos/v3/internal/interop"
"github.com/Jeffail/benthos/v3/internal/tracing"
"github.com/Jeffail/benthos/v3/lib/input/reader"
"github.com/Jeffail/benthos/v3/lib/log"
"github.com/Jeffail/benthos/v3/lib/message/tracing"
"github.com/Jeffail/benthos/v3/lib/types"
"github.com/opentracing/opentracing-go"
)

// ExtractTracingSpanMappingDocs returns a docs spec for a mapping field.
Expand Down Expand Up @@ -78,20 +77,9 @@ func (s *SpanReader) ReadWithContext(ctx context.Context) (types.Message, reader
return m, afn, nil
}

textMap := make(opentracing.TextMapCarrier, len(spanMap))
for k, v := range spanMap {
if vStr, ok := v.(string); ok {
textMap[k] = vStr
}
}

parent, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, textMap)
if err != nil {
if err := tracing.InitSpansFromParentTextMap("input_"+s.inputName, spanMap, m); err != nil {
s.log.Errorf("Extraction of parent tracing span failed: %v", err)
return m, afn, nil
}

tracing.InitSpansFromParent("input_"+s.inputName, parent, m)
return m, afn, nil
}

Expand Down
26 changes: 8 additions & 18 deletions internal/component/processor/processor_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@ import (
"time"

"github.com/Jeffail/benthos/v3/internal/shutdown"
"github.com/Jeffail/benthos/v3/internal/tracing"
"github.com/Jeffail/benthos/v3/lib/message"
"github.com/Jeffail/benthos/v3/lib/message/tracing"
"github.com/Jeffail/benthos/v3/lib/metrics"
"github.com/Jeffail/benthos/v3/lib/processor"
"github.com/Jeffail/benthos/v3/lib/response"
"github.com/Jeffail/benthos/v3/lib/types"
"github.com/opentracing/opentracing-go"
olog "github.com/opentracing/opentracing-go/log"
)

// V2 is a simpler interface to implement than types.Processor.
Expand Down Expand Up @@ -75,25 +73,17 @@ func (a *v2ToV1Processor) ProcessMessage(msg types.Message) ([]types.Message, ty
newParts := make([]types.Part, 0, msg.Len())

msg.Iter(func(i int, part types.Part) error {
span := tracing.GetSpan(part)
if span == nil {
span = opentracing.StartSpan(a.typeStr)
} else {
span = opentracing.StartSpan(
a.typeStr,
opentracing.ChildOf(span.Context()),
)
}
span := tracing.CreateChildSpan(a.typeStr, part)

nextParts, err := a.p.Process(context.Background(), part)
if err != nil {
newPart := part.Copy()
a.mErr.Incr(1)
processor.FlagErr(newPart, err)
span.SetTag("error", true)
span.LogFields(
olog.String("event", "error"),
olog.String("type", err.Error()),
span.LogKV(
"event", "error",
"type", err.Error(),
)
nextParts = append(nextParts, newPart)
}
Expand Down Expand Up @@ -181,9 +171,9 @@ func (a *v2BatchedToV1Processor) ProcessMessage(msg types.Message) ([]types.Mess
}
for _, s := range spans {
s.SetTag("error", true)
s.LogFields(
olog.String("event", "error"),
olog.String("type", err.Error()),
s.LogKV(
"event", "error",
"type", err.Error(),
)
}
newMsg.SetAll(parts)
Expand Down
17 changes: 6 additions & 11 deletions internal/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@ import (

"github.com/Jeffail/benthos/v3/internal/bloblang/field"
"github.com/Jeffail/benthos/v3/internal/interop"
"github.com/Jeffail/benthos/v3/internal/tracing"
"github.com/Jeffail/benthos/v3/lib/log"
"github.com/Jeffail/benthos/v3/lib/message"
"github.com/Jeffail/benthos/v3/lib/metrics"
"github.com/Jeffail/benthos/v3/lib/types"
"github.com/Jeffail/benthos/v3/lib/util/http/client"
"github.com/Jeffail/benthos/v3/lib/util/throttle"
"github.com/opentracing/opentracing-go"
olog "github.com/opentracing/opentracing-go/log"
)

// Client is a component able to send and receive Benthos messages over HTTP.
Expand Down Expand Up @@ -453,13 +452,9 @@ func (h *Client) checkStatus(code int) (succeeded bool, retStrat retryStrategy)
func (h *Client) SendToResponse(ctx context.Context, sendMsg, refMsg types.Message) (res *http.Response, err error) {
h.mCount.Incr(1)

var spans []opentracing.Span
var spans []*tracing.Span
if sendMsg != nil {
spans = make([]opentracing.Span, sendMsg.Len())
sendMsg.Iter(func(i int, p types.Part) error {
spans[i], _ = opentracing.StartSpanFromContext(message.GetContext(p), "http_request")
return nil
})
spans = tracing.CreateChildSpans("http_request", sendMsg)
defer func() {
for _, s := range spans {
s.Finish()
Expand All @@ -470,9 +465,9 @@ func (h *Client) SendToResponse(ctx context.Context, sendMsg, refMsg types.Messa
h.mErrRes.Incr(1)
h.mErr.Incr(1)
for _, s := range spans {
s.LogFields(
olog.String("event", "error"),
olog.String("type", e.Error()),
s.LogKV(
"event", "error",
"type", e.Error(),
)
}
}
Expand Down
4 changes: 2 additions & 2 deletions internal/impl/mongodb/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import (
"github.com/Jeffail/benthos/v3/internal/docs"
"github.com/Jeffail/benthos/v3/internal/impl/mongodb/client"
"github.com/Jeffail/benthos/v3/internal/shutdown"
"github.com/Jeffail/benthos/v3/internal/tracing"
"github.com/Jeffail/benthos/v3/lib/log"
"github.com/Jeffail/benthos/v3/lib/metrics"
"github.com/Jeffail/benthos/v3/lib/processor"
"github.com/Jeffail/benthos/v3/lib/types"
"github.com/Jeffail/benthos/v3/lib/util/retries"
"github.com/opentracing/opentracing-go"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/writeconcern"
Expand Down Expand Up @@ -273,7 +273,7 @@ func (m *Processor) ProcessMessage(msg types.Message) ([]types.Message, types.Re
newMsg := msg.Copy()

var writeModels []mongo.WriteModel
processor.IteratePartsWithSpan("mongodb", m.parts, newMsg, func(i int, s opentracing.Span, p types.Part) error {
processor.IteratePartsWithSpanV2("mongodb", m.parts, newMsg, func(i int, s *tracing.Span, p types.Part) error {
var err error
var filterVal, documentVal types.Part
var upsertVal, filterValWanted, documentValWanted bool
Expand Down
212 changes: 212 additions & 0 deletions internal/tracing/opentracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
package tracing

import (
"github.com/Jeffail/benthos/v3/lib/message"
"github.com/Jeffail/benthos/v3/lib/types"
"github.com/opentracing/opentracing-go"
)

// GetSpan returns a span attached to a message part. Returns nil if the part
// doesn't have a span attached.
func GetSpan(p types.Part) *Span {
return openTracingSpan(opentracing.SpanFromContext(message.GetContext(p)))
}

// CreateChildSpan takes a message part, extracts an existing span if there is
// one and returns child span.
func CreateChildSpan(operationName string, part types.Part) *Span {
span := GetSpan(part)
if span == nil {
span = openTracingSpan(opentracing.StartSpan(operationName))
} else {
span = openTracingSpan(opentracing.StartSpan(
operationName,
opentracing.ChildOf(span.unwrap().Context()),
))
}
return span
}

// CreateChildSpans takes a message, extracts spans per message part and returns
// a slice of child spans. The length of the returned slice is guaranteed to
// match the message size.
func CreateChildSpans(operationName string, msg types.Message) []*Span {
spans := make([]*Span, msg.Len())
msg.Iter(func(i int, part types.Part) error {
spans[i] = CreateChildSpan(operationName, part)
return nil
})
return spans
}

// PartsWithChildSpans takes a slice of message parts, extracts spans per part,
// creates new child spans, and returns a new slice of parts with those spans
// embedded. The original parts are unchanged.
func PartsWithChildSpans(operationName string, parts []types.Part) ([]types.Part, []*Span) {
spans := make([]*Span, 0, len(parts))
newParts := make([]types.Part, len(parts))
for i, part := range parts {
if part == nil {
continue
}

ctx := message.GetContext(part)
otSpan := opentracing.SpanFromContext(ctx)
if otSpan == nil {
otSpan = opentracing.StartSpan(operationName)
} else {
otSpan = opentracing.StartSpan(
operationName,
opentracing.ChildOf(otSpan.Context()),
)
}
ctx = opentracing.ContextWithSpan(ctx, otSpan)

newParts[i] = message.WithContext(ctx, part)
spans = append(spans, openTracingSpan(otSpan))
}
return newParts, spans
}

// WithChildSpans takes a message, extracts spans per message part, creates new
// child spans, and returns a new message with those spans embedded. The
// original message is unchanged.
func WithChildSpans(operationName string, msg types.Message) (types.Message, []*Span) {
parts := make([]types.Part, 0, msg.Len())
msg.Iter(func(i int, p types.Part) error {
parts = append(parts, p)
return nil
})

newParts, spans := PartsWithChildSpans(operationName, parts)
newMsg := message.New(nil)
newMsg.SetAll(newParts)

return newMsg, spans
}

// WithSiblingSpans takes a message, extracts spans per message part, creates
// new sibling spans, and returns a new message with those spans embedded. The
// original message is unchanged.
func WithSiblingSpans(operationName string, msg types.Message) types.Message {
parts := make([]types.Part, msg.Len())
msg.Iter(func(i int, part types.Part) error {
ctx := message.GetContext(part)
otSpan := opentracing.SpanFromContext(ctx)
if otSpan == nil {
otSpan = opentracing.StartSpan(operationName)
} else {
otSpan = opentracing.StartSpan(
operationName,
opentracing.FollowsFrom(otSpan.Context()),
)
}
ctx = opentracing.ContextWithSpan(ctx, otSpan)
parts[i] = message.WithContext(ctx, part)
return nil
})

newMsg := message.New(nil)
newMsg.SetAll(parts)
return newMsg
}

//------------------------------------------------------------------------------

// IterateWithChildSpans iterates all the parts of a message and, for each part,
// creates a new span from an existing span attached to the part and calls a
// func with that span before finishing the child span.
func IterateWithChildSpans(operationName string, msg types.Message, iter func(int, *Span, types.Part) error) error {
return msg.Iter(func(i int, p types.Part) error {
otSpan, _ := opentracing.StartSpanFromContext(message.GetContext(p), operationName)
err := iter(i, openTracingSpan(otSpan), p)
otSpan.Finish()
return err
})
}

// InitSpans sets up OpenTracing spans on each message part if one does not
// already exist.
func InitSpans(operationName string, msg types.Message) {
tracedParts := make([]types.Part, msg.Len())
msg.Iter(func(i int, p types.Part) error {
tracedParts[i] = InitSpan(operationName, p)
return nil
})
msg.SetAll(tracedParts)
}

// InitSpan sets up an OpenTracing span on a message part if one does not
// already exist.
func InitSpan(operationName string, part types.Part) types.Part {
if GetSpan(part) != nil {
return part
}
otSpan := opentracing.StartSpan(operationName)
ctx := opentracing.ContextWithSpan(message.GetContext(part), otSpan)
return message.WithContext(ctx, part)
}

// InitSpansFromParent sets up OpenTracing spans as children of a parent span on
// each message part if one does not already exist.
func InitSpansFromParent(operationName string, parent *Span, msg types.Message) {
tracedParts := make([]types.Part, msg.Len())
msg.Iter(func(i int, p types.Part) error {
tracedParts[i] = InitSpanFromParent(operationName, parent, p)
return nil
})
msg.SetAll(tracedParts)
}

// InitSpanFromParent sets up an OpenTracing span as children of a parent
// span on a message part if one does not already exist.
func InitSpanFromParent(operationName string, parent *Span, part types.Part) types.Part {
if GetSpan(part) != nil {
return part
}
span := opentracing.StartSpan(operationName, opentracing.ChildOf(parent.unwrap().Context()))
ctx := opentracing.ContextWithSpan(message.GetContext(part), span)
return message.WithContext(ctx, part)
}

// InitSpansFromParentTextMap obtains a span parent reference from a text map
// and creates child spans for each message.
func InitSpansFromParentTextMap(operationName string, textMapGeneric map[string]interface{}, msg types.Message) error {
textMap := make(opentracing.TextMapCarrier, len(textMapGeneric))
for k, v := range textMapGeneric {
if vStr, ok := v.(string); ok {
textMap[k] = vStr
}
}

parentCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, textMap)
if err != nil {
return err
}

tracedParts := make([]types.Part, msg.Len())
msg.Iter(func(i int, p types.Part) error {
otSpan := opentracing.StartSpan(
operationName,
opentracing.ChildOf(parentCtx),
)
ctx := opentracing.ContextWithSpan(message.GetContext(p), otSpan)
tracedParts[i] = message.WithContext(ctx, p)
return nil
})

msg.SetAll(tracedParts)
return nil
}

// FinishSpans calls Finish on all message parts containing a span.
func FinishSpans(msg types.Message) {
msg.Iter(func(i int, p types.Part) error {
span := GetSpan(p)
if span == nil {
return nil
}
span.unwrap().Finish()
return nil
})
}

0 comments on commit 5983fd7

Please sign in to comment.