Skip to content

Commit

Permalink
Stream restart bug fix by introducing manager (#412)
Browse files Browse the repository at this point in the history
  • Loading branch information
sakshi-goyal-razorpay committed Sep 1, 2022
1 parent 5523f71 commit 408b950
Show file tree
Hide file tree
Showing 5 changed files with 241 additions and 90 deletions.
102 changes: 102 additions & 0 deletions internal/stream/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package stream

import (
"context"

"github.com/razorpay/metro/internal/subscriber"
"github.com/razorpay/metro/internal/subscription"
"github.com/razorpay/metro/pkg/httpclient"
"github.com/razorpay/metro/pkg/logger"
)

// PushStreamManager manages push stream
type PushStreamManager struct {
ctx context.Context
cancelFunc func()
doneCh chan struct{}
config *httpclient.Config
ps *PushStream
}

// NewPushStreamManager return a push stream manager obj which is used to manage push stream
func NewPushStreamManager(ctx context.Context, nodeID string, subName string, subscriptionCore subscription.ICore, subscriberCore subscriber.ICore, config *httpclient.Config) (*PushStreamManager, error) {
ps, err := newPushStream(ctx, nodeID, subName, subscriptionCore, subscriberCore, config)
if err != nil {
return nil, err
}
ctx, cancelFunc := context.WithCancel(ctx)
return &PushStreamManager{
ctx: ctx,
cancelFunc: cancelFunc,
ps: ps,
doneCh: make(chan struct{}),
config: config,
}, nil
}

// Run starts the push stream manager that is used to manage underlying stream
func (psm *PushStreamManager) Run() {
defer close(psm.doneCh)

logger.Ctx(psm.ctx).Infow("push stream manager: started running stream manager", "subscription", psm.ps.subscription.Name)
psm.startPushStream()

go func() {
for {
select {
case <-psm.ctx.Done():
if err := psm.ps.Stop(); err != nil {
logger.Ctx(psm.ctx).Infow("push stream manager: error stopping stream", "subscription", psm.ps.subscription.Name, "error", err.Error())
}
return
case err := <-psm.ps.GetErrorChannel():
logger.Ctx(psm.ctx).Infow("push stream manager: restarting stream handler", "subscription", psm.ps.subscription.Name, "error", err.Error())
psm.restartPushStream()
}
}
}()
}

// Stop stops the stream manager along with the underlying stream
func (psm *PushStreamManager) Stop() {
logger.Ctx(psm.ctx).Infow("push stream manager: stop invoked", "subscription", psm.ps.subscription.Name)
psm.cancelFunc()
<-psm.doneCh
}

func newPushStream(ctx context.Context, nodeID string, subName string, subscriptionCore subscription.ICore, subscriberCore subscriber.ICore, config *httpclient.Config) (*PushStream, error) {
ps, err := NewPushStream(ctx, nodeID, subName, subscriptionCore, subscriberCore, config)
if err != nil {
logger.Ctx(ctx).Errorw("push stream manager: Failed to setup push stream for subscription", "logFields", map[string]interface{}{
"subscription": subName,
"nodeID": nodeID,
})
return nil, err
}
return ps, nil
}

func (psm *PushStreamManager) startPushStream() {
// run the stream in a separate go routine, this go routine is not part of the worker error group
// as the worker should continue to run if a single subscription stream exists with error
go func(ctx context.Context) {
err := psm.ps.Start()
if err != nil {
logger.Ctx(ctx).Errorw(
"push stream manager: stream exited",
"subscription", psm.ps.subscription.Name,
"error", err.Error(),
)
}
}(psm.ctx)
}

func (psm *PushStreamManager) restartPushStream() {
if err := psm.ps.Stop(); err != nil {
logger.Ctx(psm.ctx).Errorw("push stream manager: stream stop error", "subscription", psm.ps.subscription.Name, "error", err.Error())
return
}
psm.ps, _ = newPushStream(psm.ctx, psm.ps.nodeID, psm.ps.subscription.Name, psm.ps.subscriptionCore, psm.ps.subscriberCore, psm.config)
psm.startPushStream()
workerComponentRestartCount.WithLabelValues(env, "stream", psm.ps.subscription.Topic, psm.ps.subscription.Name).Inc()
}
119 changes: 119 additions & 0 deletions internal/stream/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package stream

import (
"context"
"fmt"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/google/uuid"
"github.com/razorpay/metro/internal/subscriber"
mocks2 "github.com/razorpay/metro/internal/subscriber/mocks"
mocks1 "github.com/razorpay/metro/internal/subscription/mocks/core"
"github.com/razorpay/metro/pkg/httpclient"
"github.com/stretchr/testify/assert"
)

func TestNewPushStreamManager(t *testing.T) {
ctrl := gomock.NewController(t)
ctx := context.Background()

tests := []struct {
wantErr bool
}{
{
wantErr: false,
},
{
wantErr: true,
},
}

for _, test := range tests {
got, err := NewPushStreamManager(
ctx,
uuid.New().String(),
subName,
getSubscriptionCoreMock(ctrl, test.wantErr),
getSubscriberCoreMock(ctx, ctrl),
&httpclient.Config{},
)
assert.Equal(t, test.wantErr, err != nil)
assert.Equal(t, got == nil, test.wantErr)
}
}

func TestPushStreamManager_Run(t *testing.T) {
ctrl := gomock.NewController(t)
ctx := context.Background()

psm, err := NewPushStreamManager(
ctx,
uuid.New().String(),
subName,
getSubscriptionCoreMock(ctrl, false),
getSubscriberCoreMock(ctx, ctrl),
&httpclient.Config{},
)
assert.NoError(t, err)
assert.NotNil(t, psm)

psm.Run()
<-time.NewTicker(1 * time.Second).C

streamObj := psm.ps
psm.ps.GetErrorChannel() <- fmt.Errorf("Something went wrong")
<-time.NewTicker(1 * time.Second).C
assert.NotNil(t, psm.ps)
assert.NotEqual(t, psm.ps, streamObj)
}

func TestPushStreamManager_Stop(t *testing.T) {
ctrl := gomock.NewController(t)
ctx := context.Background()

psm, err := NewPushStreamManager(
ctx,
uuid.New().String(),
subName,
getSubscriptionCoreMock(ctrl, false),
getSubscriberCoreMock(ctx, ctrl),
&httpclient.Config{},
)
assert.NoError(t, err)
assert.NotNil(t, psm)
psm.Run()

// Stop the stream manager and it should be stopped without any error
psm.Stop()
assert.NotNil(t, psm.ctx.Err())
assert.Equal(t, psm.ctx.Err(), context.Canceled)
}

func getSubscriberCoreMock(ctx context.Context, ctrl *gomock.Controller) *mocks2.MockICore {
subscriberCoreMock := mocks2.NewMockICore(ctrl)
subModel := getMockSubModel("")

subscriberCoreMock.EXPECT().NewSubscriber(
ctx,
gomock.Any(),
subModel,
defaultTimeoutMs,
defaultMaxOutstandingMsgs,
defaultMaxOuttandingBytes,
gomock.AssignableToTypeOf(make(chan *subscriber.PullRequest)),
gomock.AssignableToTypeOf(make(chan *subscriber.AckMessage)),
gomock.AssignableToTypeOf(make(chan *subscriber.ModAckMessage))).AnyTimes().Return(getMockSubscriber(ctx, ctrl), nil)
return subscriberCoreMock
}

func getSubscriptionCoreMock(ctrl *gomock.Controller, wantErr bool) *mocks1.MockICore {
subscriptionCoreMock := mocks1.NewMockICore(ctrl)
if wantErr {
subscriptionCoreMock.EXPECT().Get(gomock.Any(), subName).AnyTimes().Return(nil, fmt.Errorf("Something went wrong"))
} else {
subscriptionCoreMock.EXPECT().Get(gomock.Any(), subName).AnyTimes().Return(getMockSubModel(""), nil)
}
return subscriptionCoreMock
}
72 changes: 7 additions & 65 deletions internal/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ type PushStream struct {
processor *processor
doneCh chan struct{}
counter int64
restartChan chan bool
stopChan chan bool
errChan chan error
}

const (
Expand All @@ -46,14 +45,9 @@ const (
defaultMaxOuttandingBytes int64 = 0
)

// GetRestartChannel returns the chan where restart request is received
func (ps *PushStream) GetRestartChannel() chan bool {
return ps.restartChan
}

// GetStopChannel returns the chan where stop request is received
func (ps *PushStream) GetStopChannel() chan bool {
return ps.stopChan
// GetErrorChannel returns the chan where stream errors are received
func (ps *PushStream) GetErrorChannel() chan error {
return ps.errChan
}

// Start reads the messages from the broker and publish them to the subscription endpoint
Expand All @@ -76,7 +70,7 @@ func (ps *PushStream) Start() error {
ps.subs, err = ps.subscriberCore.NewSubscriber(subscriberCtx, ps.nodeID, ps.subscription, defaultTimeoutMs,
defaultMaxOutstandingMsgs, defaultMaxOuttandingBytes, subscriberRequestCh, subscriberAckCh, subscriberModAckCh)
if err != nil {
ps.restartChan <- true
ps.errChan <- err
logger.Ctx(ps.ctx).Errorw("worker: error creating subscriber", "subscription", ps.subscription.Name, "error", err.Error())
return err
}
Expand Down Expand Up @@ -105,7 +99,7 @@ func (ps *PushStream) Start() error {
logger.Ctx(ps.ctx).Errorw("worker: error from subscriber, restarting", "logFields", ps.getLogFields(), "error", err.Error())
workerSubscriberErrors.WithLabelValues(env, ps.subscription.Topic, ps.subscription.Name, err.Error(), ps.subs.GetID()).Inc()
if err = ps.restartSubsciber(); err != nil {
ps.restartChan <- true
ps.errChan <- err
return err
}
} else {
Expand Down Expand Up @@ -182,31 +176,6 @@ func (ps *PushStream) Stop() error {
return nil
}

// Restart is used to restart the push subscription processing
func (ps *PushStream) Restart(ctx context.Context) {
logger.Ctx(ps.ctx).Infow("worker: push stream restart invoked", "subscription", ps.subscription.Name)
err := ps.Stop()
if err != nil {
logger.Ctx(ctx).Errorw(
"worker: push stream stop error",
"subscription", ps.subscription.Name,
"error", err.Error(),
)
return
}
go func(ctx context.Context) {
err := ps.Start()
if err != nil {
logger.Ctx(ctx).Errorw(
"worker: push stream restart error",
"subscription", ps.subscription.Name,
"error", err.Error(),
)
}
}(ctx)
workerComponentRestartCount.WithLabelValues(env, "stream", ps.subscription.Topic, ps.subscription.Name).Inc()
}

func (ps *PushStream) stopSubscriber() {
// stop the subscriber
if ps.subs != nil {
Expand Down Expand Up @@ -364,8 +333,7 @@ func NewPushStream(ctx context.Context, nodeID string, subName string, subscript
doneCh: doneCh,
httpClient: httpclient,
counter: 0,
restartChan: make(chan bool),
stopChan: make(chan bool),
errChan: make(chan error),
}, nil
}

Expand Down Expand Up @@ -413,29 +381,3 @@ func getRequestBytes(pushRequest *metrov1.PushEndpointRequest) *bytes.Buffer {
}
return bytes.NewBuffer(b)
}

// RunPushStreamManager return a runs a push stream manager used to handle restart/stop requests
func (ps *PushStream) RunPushStreamManager(ctx context.Context) {
logger.Ctx(ctx).Infow("worker: started running stream manager", "subscription", ps.subscription.Name)

go func() {
defer logger.Ctx(ctx).Infow("worker: exiting stream manager", "subscription", ps.subscription.Name)

for {
select {
case <-ctx.Done():
return
case <-ps.GetRestartChannel():
logger.Ctx(ctx).Infow("worker: restarting stream handler", "subscription", ps.subscription.Name)
ps.Restart(ctx)
case <-ps.GetStopChannel():
logger.Ctx(ctx).Infow("worker: stopping stream handler", "subscription", ps.subscription.Name)
err := ps.Stop()
if err != nil {
logger.Ctx(ctx).Infow("worker: error in exiting stream handler ", "error", err.Error())
}
return
}
}
}()
}
3 changes: 2 additions & 1 deletion internal/stream/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,9 @@ func getMockSubscriber(ctx context.Context, ctrl *gomock.Controller) *mocks3.Moc
ReceivedMessages: messages,
}
counter++
} else {
resCh <- &metrov1.PullResponse{}
}
resCh <- &metrov1.PullResponse{}
case <-cancelChan:
return
}
Expand Down

0 comments on commit 408b950

Please sign in to comment.