From a5974e10da0901ceddb882b1bb65bd5e92d41bf0 Mon Sep 17 00:00:00 2001 From: Loc Mai Date: Wed, 27 Jul 2022 00:21:03 +0700 Subject: [PATCH] [ingester/fix] Apply sanitizers to avoid panic on span.process=nil (#3819) --- cmd/ingester/app/processor/span_processor.go | 6 +++- .../app/processor/span_processor_test.go | 30 ++++++++++++------- 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/cmd/ingester/app/processor/span_processor.go b/cmd/ingester/app/processor/span_processor.go index ad1c8aa7479..4817103384b 100644 --- a/cmd/ingester/app/processor/span_processor.go +++ b/cmd/ingester/app/processor/span_processor.go @@ -19,6 +19,7 @@ import ( "fmt" "io" + "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer" "github.com/jaegertracing/jaeger/plugin/storage/kafka" "github.com/jaegertracing/jaeger/storage/spanstore" ) @@ -45,6 +46,7 @@ type SpanProcessorParams struct { // KafkaSpanProcessor implements SpanProcessor for Kafka messages type KafkaSpanProcessor struct { unmarshaller kafka.Unmarshaller + sanitizer sanitizer.SanitizeSpan writer spanstore.Writer io.Closer } @@ -54,6 +56,7 @@ func NewSpanProcessor(params SpanProcessorParams) *KafkaSpanProcessor { return &KafkaSpanProcessor{ unmarshaller: params.Unmarshaller, writer: params.Writer, + sanitizer: sanitizer.NewChainedSanitizer(sanitizer.NewStandardSanitizers()...), } } @@ -63,6 +66,7 @@ func (s KafkaSpanProcessor) Process(message Message) error { if err != nil { return fmt.Errorf("cannot unmarshall byte array into span: %w", err) } + // TODO context should be propagated from upstream components - return s.writer.WriteSpan(context.TODO(), span) + return s.writer.WriteSpan(context.TODO(), s.sanitizer(span)) } diff --git a/cmd/ingester/app/processor/span_processor_test.go b/cmd/ingester/app/processor/span_processor_test.go index 0fae62e278f..5df0159b7a6 100644 --- a/cmd/ingester/app/processor/span_processor_test.go +++ b/cmd/ingester/app/processor/span_processor_test.go @@ -20,6 +20,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" cmocks "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/mocks" "github.com/jaegertracing/jaeger/model" @@ -33,25 +34,32 @@ func TestNewSpanProcessor(t *testing.T) { } func TestSpanProcessor_Process(t *testing.T) { - writer := &smocks.Writer{} - unmarshallerMock := &umocks.Unmarshaller{} - processor := &KafkaSpanProcessor{ - unmarshaller: unmarshallerMock, - writer: writer, - } + mockUnmarshaller := &umocks.Unmarshaller{} + mockWriter := &smocks.Writer{} + processor := NewSpanProcessor(SpanProcessorParams{ + Unmarshaller: mockUnmarshaller, + Writer: mockWriter, + }) message := &cmocks.Message{} - data := []byte("police") - span := &model.Span{} + data := []byte("irrelevant, mock unmarshaller should return the span") + span := &model.Span{ + Process: nil, // we want to make sure sanitizers will fix this data issue. + } message.On("Value").Return(data) - unmarshallerMock.On("Unmarshal", data).Return(span, nil) - writer.On("WriteSpan", context.Background(), span).Return(nil) + mockUnmarshaller.On("Unmarshal", data).Return(span, nil) + mockWriter.On("WriteSpan", context.Background(), span). + Return(nil). + Run(func(args mock.Arguments) { + span := args[1].(*model.Span) + assert.NotNil(t, span.Process, "sanitizer must fix Process=nil data issue") + }) assert.Nil(t, processor.Process(message)) message.AssertExpectations(t) - writer.AssertExpectations(t) + mockWriter.AssertExpectations(t) } func TestSpanProcessor_ProcessError(t *testing.T) {