Skip to content

Commit

Permalink
stream: handle application termination in producer daemon;
Browse files Browse the repository at this point in the history
Upon termination of the application, we start shutting down the producer
daemon and cancel all contexts. However, we have to delay cancling the
context used to write the message to the output (by one second by
default) to allow the message to still be written. Should we fail to
write the message in that timeframe, we handle the resulting cancel
error by logging a warning.
  • Loading branch information
ajscholl committed Jan 15, 2021
1 parent 84cb5f7 commit 8d3155b
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 26 deletions.
16 changes: 13 additions & 3 deletions pkg/stream/producer_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/applike/gosoline/pkg/cfg"
"github.com/applike/gosoline/pkg/clock"
"github.com/applike/gosoline/pkg/coffin"
"github.com/applike/gosoline/pkg/exec"
"github.com/applike/gosoline/pkg/kernel"
"github.com/applike/gosoline/pkg/mon"
"sync"
Expand Down Expand Up @@ -111,12 +112,14 @@ func (d *ProducerDaemon) Run(kernelCtx context.Context) error {
d.ticker = d.tickerFactory(d.settings.Interval)

cfn := coffin.New()
cfn.GoWithContextf(kernelCtx, d.tickerLoop, "panic during running the ticker loop")

// start the output loops before the ticker look - the output loop can't terminate until
// we call close, while the ticker can if the context is already canceled
for i := 0; i < d.settings.RunnerCount; i++ {
cfn.GoWithContextf(kernelCtx, d.outputLoop, "panic during running the ticker loop")
}

cfn.GoWithContextf(kernelCtx, d.tickerLoop, "panic during running the ticker loop")

select {
case <-cfn.Dying():
if err := d.close(); err != nil {
Expand Down Expand Up @@ -270,8 +273,15 @@ func (d *ProducerDaemon) outputLoop(ctx context.Context) error {
return nil
}

// no need to have some delayed cancel context or so here - if you need this, your output should've already provided that
if err := d.output.Write(ctx, batch); err != nil {
d.logger.Errorf(err, "can not write messages to output in producer %s", d.name)
if exec.IsRequestCanceled(err) {
// we were not fast enough to write all messages and have just lost some messages.
// however, if this would be a problem, you shouldn't be using the producer daemon at all.
d.logger.Warnf("can not write messages to output in producer %s because of canceled context", d.name)
} else {
d.logger.Errorf(err, "can not write messages to output in producer %s", d.name)
}
}

d.writeMetricBatchSize(len(batch))
Expand Down
120 changes: 97 additions & 23 deletions pkg/stream/producer_daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import (
"context"
"fmt"
"github.com/applike/gosoline/pkg/clock"
"github.com/applike/gosoline/pkg/exec"
"github.com/applike/gosoline/pkg/mon"
monMocks "github.com/applike/gosoline/pkg/mon/mocks"
"github.com/applike/gosoline/pkg/stream"
streamMocks "github.com/applike/gosoline/pkg/stream/mocks"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"testing"
"time"
Expand All @@ -15,25 +18,39 @@ import (
type ProducerDaemonTestSuite struct {
suite.Suite

ctx context.Context
cancel context.CancelFunc
wait chan error
output *streamMocks.Output
ticker *clock.FakeTicker
daemon *stream.ProducerDaemon
ctx context.Context
cancel context.CancelFunc
wait chan error
output *streamMocks.Output
ticker *clock.FakeTicker
executor exec.Executor
daemon *stream.ProducerDaemon
}

func (s *ProducerDaemonTestSuite) SetupTest() {
s.ctx, s.cancel = context.WithCancel(context.Background())
s.wait = make(chan error)
}

func (s *ProducerDaemonTestSuite) SetupDaemon(batchSize int, aggregationSize int, interval time.Duration, marshaller stream.AggregateMarshaller) {
logger := monMocks.NewLoggerMockedAll()
func (s *ProducerDaemonTestSuite) SetupDaemon(maxLogLevel string, batchSize int, aggregationSize int, interval time.Duration, marshaller stream.AggregateMarshaller) {
logger := monMocks.NewLoggerMockedUntilLevel(maxLogLevel)
metric := monMocks.NewMetricWriterMockedAll()

s.output = new(streamMocks.Output)
s.ticker = clock.NewFakeTicker()
s.executor = exec.NewBackoffExecutor(logger, &exec.ExecutableResource{
Type: "test",
Name: "test-output",
}, &exec.BackoffSettings{
Enabled: true,
Blocking: true,
CancelDelay: time.Second,
InitialInterval: time.Millisecond * 50,
RandomizationFactor: 0.5,
Multiplier: 1.5,
MaxInterval: time.Second * 3,
MaxElapsedTime: time.Second * 15,
}, exec.CheckRequestCanceled)

tickerFactory := func(_ time.Duration) clock.Ticker {
return s.ticker
Expand Down Expand Up @@ -65,16 +82,36 @@ func (s *ProducerDaemonTestSuite) stop() error {
return err
}

func (s *ProducerDaemonTestSuite) expectMessage(msg []stream.WritableMessage) {
call := s.output.On("Write", s.ctx, msg)
call.Run(func(args mock.Arguments) {
ctx := args.Get(0).(context.Context)

// simulate an executor like the real output would use
_, err := s.executor.Execute(ctx, func(ctx context.Context) (interface{}, error) {
// return a context canceled error should the context already have been canceled (as the real output would)
select {
case <-ctx.Done():
return nil, context.Canceled
default:
return nil, nil
}
})

call.Return(err)
}).Once()
}

func (s *ProducerDaemonTestSuite) TestRun() {
s.SetupDaemon(1, 1, time.Hour, stream.MarshalJsonMessage)
s.SetupDaemon(mon.Info, 1, 1, time.Hour, stream.MarshalJsonMessage)
err := s.stop()

s.NoError(err, "there should be no error on run")
s.output.AssertExpectations(s.T())
}

func (s *ProducerDaemonTestSuite) TestWriteBatch() {
s.SetupDaemon(2, 1, time.Hour, stream.MarshalJsonMessage)
s.SetupDaemon(mon.Info, 2, 1, time.Hour, stream.MarshalJsonMessage)

messages := []stream.WritableMessage{
&stream.Message{Body: "1"},
Expand All @@ -89,8 +126,8 @@ func (s *ProducerDaemonTestSuite) TestWriteBatch() {
expected2 := []stream.WritableMessage{
&stream.Message{Body: "3"},
}
s.output.On("Write", s.ctx, expected1).Return(nil)
s.output.On("Write", s.ctx, expected2).Return(nil)
s.expectMessage(expected1)
s.expectMessage(expected2)

err := s.daemon.Write(context.Background(), messages)
s.NoError(err, "there should be no error on write")
Expand All @@ -102,7 +139,7 @@ func (s *ProducerDaemonTestSuite) TestWriteBatch() {
}

func (s *ProducerDaemonTestSuite) TestWriteBatchOnClose() {
s.SetupDaemon(3, 1, time.Hour, stream.MarshalJsonMessage)
s.SetupDaemon(mon.Info, 3, 1, time.Hour, stream.MarshalJsonMessage)

messages := []stream.WritableMessage{
&stream.Message{Body: "1"},
Expand All @@ -116,7 +153,7 @@ func (s *ProducerDaemonTestSuite) TestWriteBatchOnClose() {
&stream.Message{Body: "1"},
&stream.Message{Body: "2"},
}
s.output.On("Write", s.ctx, expected1).Return(nil)
s.expectMessage(expected1)

err = s.stop()

Expand All @@ -125,7 +162,7 @@ func (s *ProducerDaemonTestSuite) TestWriteBatchOnClose() {
}

func (s *ProducerDaemonTestSuite) TestWriteBatchOnTick() {
s.SetupDaemon(3, 1, time.Hour, stream.MarshalJsonMessage)
s.SetupDaemon(mon.Info, 3, 1, time.Hour, stream.MarshalJsonMessage)

messages := []stream.WritableMessage{
&stream.Message{Body: "1"},
Expand All @@ -139,7 +176,7 @@ func (s *ProducerDaemonTestSuite) TestWriteBatchOnTick() {
&stream.Message{Body: "1"},
&stream.Message{Body: "2"},
}
s.output.On("Write", s.ctx, expected1).Return(nil)
s.expectMessage(expected1)

s.ticker.Trigger(time.Now())

Expand All @@ -150,7 +187,7 @@ func (s *ProducerDaemonTestSuite) TestWriteBatchOnTick() {
}

func (s *ProducerDaemonTestSuite) TestWriteBatchOnTickAfterWrite() {
s.SetupDaemon(2, 1, time.Hour, stream.MarshalJsonMessage)
s.SetupDaemon(mon.Info, 2, 1, time.Hour, stream.MarshalJsonMessage)

messages := []stream.WritableMessage{
&stream.Message{Body: "1"},
Expand All @@ -162,15 +199,15 @@ func (s *ProducerDaemonTestSuite) TestWriteBatchOnTickAfterWrite() {
&stream.Message{Body: "1"},
&stream.Message{Body: "2"},
}
s.output.On("Write", s.ctx, expected1).Return(nil)
s.expectMessage(expected1)

err := s.daemon.Write(context.Background(), messages)
s.NoError(err, "there should be no error on write")

expected2 := []stream.WritableMessage{
&stream.Message{Body: "3"},
}
s.output.On("Write", s.ctx, expected2).Return(nil)
s.expectMessage(expected2)

s.ticker.Trigger(time.Now())
time.Sleep(time.Millisecond)
Expand All @@ -181,7 +218,7 @@ func (s *ProducerDaemonTestSuite) TestWriteBatchOnTickAfterWrite() {
}

func (s *ProducerDaemonTestSuite) TestWriteAggregate() {
s.SetupDaemon(2, 3, time.Hour, stream.MarshalJsonMessage)
s.SetupDaemon(mon.Info, 2, 3, time.Hour, stream.MarshalJsonMessage)

messages := []stream.WritableMessage{
&stream.Message{Body: "1"},
Expand All @@ -195,7 +232,7 @@ func (s *ProducerDaemonTestSuite) TestWriteAggregate() {
s.NoError(err)

expected := []stream.WritableMessage{aggregateMessage}
s.output.On("Write", s.ctx, expected).Return(nil)
s.expectMessage(expected)

err = s.daemon.Write(context.Background(), messages)
s.NoError(err, "there should be no error on write")
Expand All @@ -207,7 +244,7 @@ func (s *ProducerDaemonTestSuite) TestWriteAggregate() {
}

func (s *ProducerDaemonTestSuite) TestAggregateErrorOnWrite() {
s.SetupDaemon(2, 3, time.Hour, func(body interface{}, attributes ...map[string]interface{}) (*stream.Message, error) {
s.SetupDaemon(mon.Info, 2, 3, time.Hour, func(body interface{}, attributes ...map[string]interface{}) (*stream.Message, error) {
return nil, fmt.Errorf("aggregate marshal error")
})

Expand All @@ -232,7 +269,7 @@ func (s *ProducerDaemonTestSuite) TestAggregateErrorOnWrite() {
}

func (s *ProducerDaemonTestSuite) TestAggregateErrorOnClose() {
s.SetupDaemon(2, 3, time.Hour, func(body interface{}, attributes ...map[string]interface{}) (*stream.Message, error) {
s.SetupDaemon(mon.Info, 2, 3, time.Hour, func(body interface{}, attributes ...map[string]interface{}) (*stream.Message, error) {
return nil, fmt.Errorf("aggregate marshal error")
})

Expand All @@ -254,6 +291,43 @@ func (s *ProducerDaemonTestSuite) TestAggregateErrorOnClose() {
s.output.AssertExpectations(s.T())
}

func (s *ProducerDaemonTestSuite) TestWriteWithCanceledError() {
s.SetupDaemon(mon.Warn, 5, 5, time.Hour, stream.MarshalJsonMessage)

messages := []stream.WritableMessage{
&stream.Message{Body: "1"},
&stream.Message{Body: "2"},
}
aggregateMessage, err := stream.MarshalJsonMessage(messages, map[string]interface{}{
stream.AttributeAggregate: true,
})
s.NoError(err)

expected := []stream.WritableMessage{aggregateMessage}
s.output.On("Write", s.ctx, expected).Run(func(args mock.Arguments) {
ctx := args.Get(0).(context.Context)
select {
case _, ok := <-ctx.Done():
s.False(ok, "expected the context to have been canceled")
default:
s.Fail("expected the context to have been canceled")
}
}).Return(context.Canceled).Once()

_, err = stream.MarshalJsonMessage(messages, map[string]interface{}{
stream.AttributeAggregate: true,
})
s.NoError(err)

err = s.daemon.Write(context.Background(), messages)
s.NoError(err, "there should be no error on write")

err = s.stop()

s.NoError(err)
s.output.AssertExpectations(s.T())
}

func TestProducerDaemonTestSuite(t *testing.T) {
suite.Run(t, new(ProducerDaemonTestSuite))
}

0 comments on commit 8d3155b

Please sign in to comment.