Skip to content

Commit

Permalink
[ingester/fix] Apply sanitizers to avoid panic on span.process=nil (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
locmai committed Jul 26, 2022
1 parent 7b33160 commit a5974e1
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 12 deletions.
6 changes: 5 additions & 1 deletion cmd/ingester/app/processor/span_processor.go
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"io"

"github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer"
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
"github.com/jaegertracing/jaeger/storage/spanstore"
)
Expand All @@ -45,6 +46,7 @@ type SpanProcessorParams struct {
// KafkaSpanProcessor implements SpanProcessor for Kafka messages
type KafkaSpanProcessor struct {
unmarshaller kafka.Unmarshaller
sanitizer sanitizer.SanitizeSpan
writer spanstore.Writer
io.Closer
}
Expand All @@ -54,6 +56,7 @@ func NewSpanProcessor(params SpanProcessorParams) *KafkaSpanProcessor {
return &KafkaSpanProcessor{
unmarshaller: params.Unmarshaller,
writer: params.Writer,
sanitizer: sanitizer.NewChainedSanitizer(sanitizer.NewStandardSanitizers()...),
}
}

Expand All @@ -63,6 +66,7 @@ func (s KafkaSpanProcessor) Process(message Message) error {
if err != nil {
return fmt.Errorf("cannot unmarshall byte array into span: %w", err)
}

// TODO context should be propagated from upstream components
return s.writer.WriteSpan(context.TODO(), span)
return s.writer.WriteSpan(context.TODO(), s.sanitizer(span))
}
30 changes: 19 additions & 11 deletions cmd/ingester/app/processor/span_processor_test.go
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

cmocks "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/mocks"
"github.com/jaegertracing/jaeger/model"
Expand All @@ -33,25 +34,32 @@ func TestNewSpanProcessor(t *testing.T) {
}

func TestSpanProcessor_Process(t *testing.T) {
writer := &smocks.Writer{}
unmarshallerMock := &umocks.Unmarshaller{}
processor := &KafkaSpanProcessor{
unmarshaller: unmarshallerMock,
writer: writer,
}
mockUnmarshaller := &umocks.Unmarshaller{}
mockWriter := &smocks.Writer{}
processor := NewSpanProcessor(SpanProcessorParams{
Unmarshaller: mockUnmarshaller,
Writer: mockWriter,
})

message := &cmocks.Message{}
data := []byte("police")
span := &model.Span{}
data := []byte("irrelevant, mock unmarshaller should return the span")
span := &model.Span{
Process: nil, // we want to make sure sanitizers will fix this data issue.
}

message.On("Value").Return(data)
unmarshallerMock.On("Unmarshal", data).Return(span, nil)
writer.On("WriteSpan", context.Background(), span).Return(nil)
mockUnmarshaller.On("Unmarshal", data).Return(span, nil)
mockWriter.On("WriteSpan", context.Background(), span).
Return(nil).
Run(func(args mock.Arguments) {
span := args[1].(*model.Span)
assert.NotNil(t, span.Process, "sanitizer must fix Process=nil data issue")
})

assert.Nil(t, processor.Process(message))

message.AssertExpectations(t)
writer.AssertExpectations(t)
mockWriter.AssertExpectations(t)
}

func TestSpanProcessor_ProcessError(t *testing.T) {
Expand Down

0 comments on commit a5974e1

Please sign in to comment.