diff --git a/cmd/ingester/app/consumer/consumer.go b/cmd/ingester/app/consumer/consumer.go index 433f37921bb..feda7a606c5 100644 --- a/cmd/ingester/app/consumer/consumer.go +++ b/cmd/ingester/app/consumer/consumer.go @@ -42,55 +42,58 @@ type Consumer struct { internalConsumer consumer.Consumer processorFactory ProcessorFactory - close chan struct{} - isClosed sync.WaitGroup + partitionIDToState map[int32]*consumerState +} + +type consumerState struct { + wg sync.WaitGroup + partitionConsumer sc.PartitionConsumer } // New is a constructor for a Consumer func New(params Params) (*Consumer, error) { return &Consumer{ - metricsFactory: params.Factory, - logger: params.Logger, - close: make(chan struct{}, 1), - isClosed: sync.WaitGroup{}, - internalConsumer: params.InternalConsumer, - processorFactory: params.ProcessorFactory, + metricsFactory: params.Factory, + logger: params.Logger, + internalConsumer: params.InternalConsumer, + processorFactory: params.ProcessorFactory, + partitionIDToState: make(map[int32]*consumerState), }, nil } // Start begins consuming messages in a go routine func (c *Consumer) Start() { - c.isClosed.Add(1) - c.logger.Info("Starting main loop") - go c.mainLoop() + go func() { + c.logger.Info("Starting main loop") + for pc := range c.internalConsumer.Partitions() { + if p, ok := c.partitionIDToState[pc.Partition()]; ok { + // This is a guard against simultaneously draining messages + // from the last time the partition was assigned and + // processing new messages for the same partition, which may lead + // to the cleanup process not completing + p.wg.Wait() + } + c.partitionIDToState[pc.Partition()] = &consumerState{partitionConsumer: pc} + go c.handleMessages(pc) + go c.handleErrors(pc.Partition(), pc.Errors()) + } + }() } // Close closes the Consumer and underlying sarama consumer func (c *Consumer) Close() error { - close(c.close) - c.isClosed.Wait() - return c.internalConsumer.Close() -} - -func (c *Consumer) mainLoop() { - for { - select { - case pc := <-c.internalConsumer.Partitions(): - c.isClosed.Add(2) - - go c.handleMessages(pc) - go c.handleErrors(pc.Partition(), pc.Errors()) - - case <-c.close: - c.isClosed.Done() - return - } + for _, p := range c.partitionIDToState { + c.closePartition(p.partitionConsumer) + p.wg.Wait() } + c.logger.Info("Closing parent consumer") + return c.internalConsumer.Close() } func (c *Consumer) handleMessages(pc sc.PartitionConsumer) { - c.logger.Info("Starting message handler") - defer c.isClosed.Done() + c.logger.Info("Starting message handler", zap.Int32("partition", pc.Partition())) + c.partitionIDToState[pc.Partition()].wg.Add(1) + defer c.partitionIDToState[pc.Partition()].wg.Done() defer c.closePartition(pc) msgMetrics := c.newMsgMetrics(pc.Partition()) @@ -109,6 +112,7 @@ func (c *Consumer) handleMessages(pc sc.PartitionConsumer) { msgProcessor.Process(&saramaMessageWrapper{msg}) } + c.logger.Info("Finished handling messages", zap.Int32("partition", pc.Partition())) } func (c *Consumer) closePartition(partitionConsumer sc.PartitionConsumer) { @@ -118,12 +122,14 @@ func (c *Consumer) closePartition(partitionConsumer sc.PartitionConsumer) { } func (c *Consumer) handleErrors(partition int32, errChan <-chan *sarama.ConsumerError) { - c.logger.Info("Starting error handler") - defer c.isClosed.Done() + c.logger.Info("Starting error handler", zap.Int32("partition", partition)) + c.partitionIDToState[partition].wg.Add(1) + defer c.partitionIDToState[partition].wg.Done() errMetrics := c.newErrMetrics(partition) for err := range errChan { errMetrics.errCounter.Inc(1) c.logger.Error("Error consuming from Kafka", zap.Error(err)) } + c.logger.Info("Finished handling errors", zap.Int32("partition", partition)) } diff --git a/cmd/ingester/app/consumer/consumer_test.go b/cmd/ingester/app/consumer/consumer_test.go index 747b6f10c96..fad87ad0436 100644 --- a/cmd/ingester/app/consumer/consumer_test.go +++ b/cmd/ingester/app/consumer/consumer_test.go @@ -15,31 +15,36 @@ package consumer import ( + "fmt" "sync" "testing" "time" "github.com/Shopify/sarama" + smocks "github.com/Shopify/sarama/mocks" "github.com/bsm/sarama-cluster" "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" "github.com/uber/jaeger-lib/metrics" "github.com/uber/jaeger-lib/metrics/testutils" "go.uber.org/zap" kmocks "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/mocks" + "github.com/jaegertracing/jaeger/cmd/ingester/app/processor" pmocks "github.com/jaegertracing/jaeger/cmd/ingester/app/processor/mocks" + "github.com/jaegertracing/jaeger/pkg/kafka/consumer" ) //go:generate mockery -dir ../../../../pkg/kafka/config/ -name Consumer //go:generate mockery -dir ../../../../../vendor/github.com/bsm/sarama-cluster/ -name PartitionConsumer -type consumerTest struct { - saramaConsumer *kmocks.Consumer - consumer *Consumer - partitionConsumer *kmocks.PartitionConsumer -} +const ( + topic = "morekuzambu" + partition = int32(316) + msgOffset = int64(1111110111111) +) func TestConstructor(t *testing.T) { newConsumer, err := New(Params{}) @@ -47,125 +52,161 @@ func TestConstructor(t *testing.T) { assert.NotNil(t, newConsumer) } -func withWrappedConsumer(fn func(c *consumerTest)) { - sc := &kmocks.Consumer{} - logger, _ := zap.NewDevelopment() - metricsFactory := metrics.NewLocalFactory(0) - c := &consumerTest{ - saramaConsumer: sc, - consumer: &Consumer{ - metricsFactory: metricsFactory, - logger: logger, - close: make(chan struct{}), - isClosed: sync.WaitGroup{}, - internalConsumer: sc, - processorFactory: ProcessorFactory{ - topic: "topic", - consumer: sc, - metricsFactory: metricsFactory, - logger: logger, - baseProcessor: &pmocks.SpanProcessor{}, - parallelism: 1, - }, - }, - } +// partitionConsumerWrapper wraps a Sarama partition consumer into a Sarama cluster partition consumer +type partitionConsumerWrapper struct { + topic string + partition int32 - c.partitionConsumer = &kmocks.PartitionConsumer{} + sarama.PartitionConsumer +} + +func (s partitionConsumerWrapper) Partition() int32 { + return s.partition +} + +func (s partitionConsumerWrapper) Topic() string { + return s.topic +} + +func newSaramaClusterConsumer(saramaPartitionConsumer sarama.PartitionConsumer) *kmocks.Consumer { pcha := make(chan cluster.PartitionConsumer, 1) - pcha <- c.partitionConsumer - c.saramaConsumer.On("Partitions").Return((<-chan cluster.PartitionConsumer)(pcha)) - c.saramaConsumer.On("Close").Return(nil) - c.saramaConsumer.On("MarkPartitionOffset", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + pcha <- &partitionConsumerWrapper{ + topic: topic, + partition: partition, + PartitionConsumer: saramaPartitionConsumer, + } + saramaClusterConsumer := &kmocks.Consumer{} + saramaClusterConsumer.On("Partitions").Return((<-chan cluster.PartitionConsumer)(pcha)) + saramaClusterConsumer.On("Close").Return(nil) + saramaClusterConsumer.On("MarkPartitionOffset", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + return saramaClusterConsumer +} + +func newConsumer( + factory metrics.Factory, + topic string, + processor processor.SpanProcessor, + consumer consumer.Consumer) *Consumer { - fn(c) + logger, _ := zap.NewDevelopment() + return &Consumer{ + metricsFactory: factory, + logger: logger, + internalConsumer: consumer, + partitionIDToState: make(map[int32]*consumerState), + + processorFactory: ProcessorFactory{ + topic: topic, + consumer: consumer, + metricsFactory: factory, + logger: logger, + baseProcessor: processor, + parallelism: 1, + }, + } } func TestSaramaConsumerWrapper_MarkPartitionOffset(t *testing.T) { - withWrappedConsumer(func(c *consumerTest) { - topic := "morekuzambu" - partition := int32(316) - offset := int64(1111110111111) - metadata := "meatbag" - c.saramaConsumer.On("MarkPartitionOffset", topic, partition, offset, metadata).Return() + sc := &kmocks.Consumer{} + metadata := "meatbag" + sc.On("MarkPartitionOffset", topic, partition, msgOffset, metadata).Return() + sc.MarkPartitionOffset(topic, partition, msgOffset, metadata) + sc.AssertCalled(t, "MarkPartitionOffset", topic, partition, msgOffset, metadata) +} - c.saramaConsumer.MarkPartitionOffset(topic, partition, offset, metadata) +func TestSaramaConsumerWrapper_start_Messages(t *testing.T) { + localFactory := metrics.NewLocalFactory(0) + + msg := &sarama.ConsumerMessage{} - c.saramaConsumer.AssertCalled(t, "MarkPartitionOffset", topic, partition, offset, metadata) + isProcessed := sync.WaitGroup{} + isProcessed.Add(1) + mp := &pmocks.SpanProcessor{} + mp.On("Process", &saramaMessageWrapper{msg}).Return(func(msg processor.Message) error { + isProcessed.Done() + return nil }) -} -func TestSaramaConsumerWrapper_start_Messages(t *testing.T) { - withWrappedConsumer(func(c *consumerTest) { - msg := &sarama.ConsumerMessage{} - msg.Offset = 0 - msgCh := make(chan *sarama.ConsumerMessage, 1) - msgCh <- msg - - errCh := make(chan *sarama.ConsumerError, 1) - c.partitionConsumer.On("Partition").Return(int32(0)) - c.partitionConsumer.On("Errors").Return((<-chan *sarama.ConsumerError)(errCh)) - c.partitionConsumer.On("Messages").Return((<-chan *sarama.ConsumerMessage)(msgCh)) - c.partitionConsumer.On("HighWaterMarkOffset").Return(int64(1234)) - c.partitionConsumer.On("Close").Return(nil) - - mp := &pmocks.SpanProcessor{} - mp.On("Process", &saramaMessageWrapper{msg}).Return(nil) - c.consumer.processorFactory.baseProcessor = mp - - c.consumer.Start() - time.Sleep(100 * time.Millisecond) - close(msgCh) - close(errCh) - c.consumer.Close() - - mp.AssertExpectations(t) - - f := (c.consumer.metricsFactory).(*metrics.LocalFactory) - partitionTag := map[string]string{"partition": "0"} - testutils.AssertCounterMetrics(t, f, testutils.ExpectedMetric{ - Name: "sarama-consumer.messages", - Tags: partitionTag, - Value: 1, - }) - testutils.AssertGaugeMetrics(t, f, testutils.ExpectedMetric{ - Name: "sarama-consumer.current-offset", - Tags: partitionTag, - Value: 0, - }) - testutils.AssertGaugeMetrics(t, f, testutils.ExpectedMetric{ - Name: "sarama-consumer.offset-lag", - Tags: partitionTag, - Value: 1233, - }) + saramaConsumer := smocks.NewConsumer(t, &sarama.Config{}) + mc := saramaConsumer.ExpectConsumePartition(topic, partition, msgOffset) + mc.ExpectMessagesDrainedOnClose() + + saramaPartitionConsumer, e := saramaConsumer.ConsumePartition(topic, partition, msgOffset) + require.NoError(t, e) + + undertest := newConsumer(localFactory, topic, mp, newSaramaClusterConsumer(saramaPartitionConsumer)) + + undertest.partitionIDToState = map[int32]*consumerState{ + partition: { + partitionConsumer: &partitionConsumerWrapper{ + topic: topic, + partition: partition, + PartitionConsumer: &kmocks.PartitionConsumer{}, + }, + }, + } + + undertest.Start() + + mc.YieldMessage(msg) + isProcessed.Wait() + + mp.AssertExpectations(t) + // Ensure that the partition consumer was updated in the map + assert.Equal(t, saramaPartitionConsumer.HighWaterMarkOffset(), + undertest.partitionIDToState[partition].partitionConsumer.HighWaterMarkOffset()) + undertest.Close() + + partitionTag := map[string]string{"partition": fmt.Sprint(partition)} + testutils.AssertCounterMetrics(t, localFactory, testutils.ExpectedMetric{ + Name: "sarama-consumer.messages", + Tags: partitionTag, + Value: 1, + }) + testutils.AssertGaugeMetrics(t, localFactory, testutils.ExpectedMetric{ + Name: "sarama-consumer.current-offset", + Tags: partitionTag, + Value: 1, + }) + testutils.AssertGaugeMetrics(t, localFactory, testutils.ExpectedMetric{ + Name: "sarama-consumer.offset-lag", + Tags: partitionTag, + Value: 0, }) } func TestSaramaConsumerWrapper_start_Errors(t *testing.T) { - withWrappedConsumer(func(c *consumerTest) { - errCh := make(chan *sarama.ConsumerError, 1) - errCh <- &sarama.ConsumerError{ - Topic: "some-topic", - Err: errors.New("some error"), + localFactory := metrics.NewLocalFactory(0) + + saramaConsumer := smocks.NewConsumer(t, &sarama.Config{}) + mc := saramaConsumer.ExpectConsumePartition(topic, partition, msgOffset) + mc.ExpectErrorsDrainedOnClose() + + saramaPartitionConsumer, e := saramaConsumer.ConsumePartition(topic, partition, msgOffset) + require.NoError(t, e) + + undertest := newConsumer(localFactory, topic, &pmocks.SpanProcessor{}, newSaramaClusterConsumer(saramaPartitionConsumer)) + + undertest.Start() + mc.YieldError(errors.New("Daisy, Daisy")) + + for i := 0; i < 1000; i++ { + time.Sleep(time.Millisecond) + + c, _ := localFactory.Snapshot() + if len(c) == 0 { + continue } - msgCh := make(chan *sarama.ConsumerMessage) - - c.partitionConsumer.On("Partition").Return(int32(0)) - c.partitionConsumer.On("Errors").Return((<-chan *sarama.ConsumerError)(errCh)) - c.partitionConsumer.On("Messages").Return((<-chan *sarama.ConsumerMessage)(msgCh)) - c.partitionConsumer.On("Close").Return(nil) - - c.consumer.Start() - time.Sleep(100 * time.Millisecond) - close(msgCh) - close(errCh) - c.consumer.Close() - f := (c.consumer.metricsFactory).(*metrics.LocalFactory) - partitionTag := map[string]string{"partition": "0"} - testutils.AssertCounterMetrics(t, f, testutils.ExpectedMetric{ + partitionTag := map[string]string{"partition": fmt.Sprint(partition)} + testutils.AssertCounterMetrics(t, localFactory, testutils.ExpectedMetric{ Name: "sarama-consumer.errors", Tags: partitionTag, Value: 1, }) - }) + undertest.Close() + return + } + + t.Fail() }