Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream restart bug fix by introducing manager #412

Merged
merged 12 commits into from
Sep 1, 2022
99 changes: 99 additions & 0 deletions internal/stream/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package stream

import (
"context"

"github.com/razorpay/metro/internal/subscriber"
"github.com/razorpay/metro/internal/subscription"
"github.com/razorpay/metro/pkg/httpclient"
"github.com/razorpay/metro/pkg/logger"
)

// PushStreamManager manages push stream
type PushStreamManager struct {
ctx context.Context
cancelFunc func()
doneCh chan struct{}
config *httpclient.Config
ps *PushStream
}

// NewPushStreamManager return a push stream manager obj which is used to manage push stream
func NewPushStreamManager(ctx context.Context, nodeID string, subName string, subscriptionCore subscription.ICore, subscriberCore subscriber.ICore, config *httpclient.Config) (*PushStreamManager, error) {
ps, err := newPushStream(ctx, nodeID, subName, subscriptionCore, subscriberCore, config)
if err != nil {
return nil, err
}
ctx, cancelFunc := context.WithCancel(ctx)
return &PushStreamManager{
ctx: ctx,
cancelFunc: cancelFunc,
ps: ps,
doneCh: make(chan struct{}),
config: config,
}, nil
}

// Run starts the push stream manager that is used to manage underlying stream
func (psm *PushStreamManager) Run() {
defer close(psm.doneCh)

logger.Ctx(psm.ctx).Infow("push stream manager: started running stream manager", "subscription", psm.ps.subscription.Name)
psm.startPushStream()

go func() {
for {
select {
case <-psm.ctx.Done():
if err := psm.ps.Stop(); err != nil {
logger.Ctx(psm.ctx).Infow("push stream manager: error stopping stream", "subscription", psm.ps.subscription.Name, "error", err.Error())
}
return
case err := <-psm.ps.GetErrorChannel():
logger.Ctx(psm.ctx).Infow("push stream manager: restarting stream handler", "subscription", psm.ps.subscription.Name, "error", err.Error())
psm.restartPushStream()
}
}
}()
}

// Stop stops the stream manager along with the underlying stream
func (psm *PushStreamManager) Stop() {
logger.Ctx(psm.ctx).Infow("push stream manager: stop invoked", "subscription", psm.ps.subscription.Name)
psm.cancelFunc()
<-psm.doneCh
vnktram marked this conversation as resolved.
Show resolved Hide resolved
}

func newPushStream(ctx context.Context, nodeID string, subName string, subscriptionCore subscription.ICore, subscriberCore subscriber.ICore, config *httpclient.Config) (*PushStream, error) {
ps, err := NewPushStream(ctx, nodeID, subName, subscriptionCore, subscriberCore, config)
if err != nil {
logger.Ctx(ctx).Errorw("push stream manager: Failed to setup push stream for subscription", "logFields", map[string]interface{}{
"subscription": subName,
"nodeID": nodeID,
})
return nil, err
}
return ps, nil
}

func (psm *PushStreamManager) startPushStream() {
// run the stream in a separate go routine, this go routine is not part of the worker error group
// as the worker should continue to run if a single subscription stream exists with error
go func(ctx context.Context) {
err := psm.ps.Start()
if err != nil {
logger.Ctx(ctx).Errorw(
"push stream manager: stream exited",
"subscription", psm.ps.subscription.Name,
"error", err.Error(),
)
}
}(psm.ctx)
}

func (psm *PushStreamManager) restartPushStream() {
psm.ps.Stop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please wrap this under a semaphore

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also let's verify if stop returns an error before we start a new one.

Suggested change
psm.ps.Stop()
err := psm.ps.Stop()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vnktram Why we want to wrap this inside a semaphore. This is called from the select for loop, and at a time, it will be executed by one go routine

psm.ps, _ = newPushStream(psm.ctx, psm.ps.nodeID, psm.ps.subscription.Name, psm.ps.subscriptionCore, psm.ps.subscriberCore, psm.config)
psm.startPushStream()
workerComponentRestartCount.WithLabelValues(env, "stream", psm.ps.subscription.Topic, psm.ps.subscription.Name).Inc()
}
115 changes: 115 additions & 0 deletions internal/stream/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package stream

import (
"context"
"fmt"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/google/uuid"
"github.com/razorpay/metro/internal/subscriber"
mocks2 "github.com/razorpay/metro/internal/subscriber/mocks"
mocks1 "github.com/razorpay/metro/internal/subscription/mocks/core"
"github.com/razorpay/metro/pkg/httpclient"
"github.com/stretchr/testify/assert"
)

func TestNewPushStreamManager(t *testing.T) {
ctrl := gomock.NewController(t)
ctx := context.Background()

tests := []struct {
wantErr bool
}{
{
wantErr: false,
},
{
wantErr: true,
},
}

for _, test := range tests {
got, err := NewPushStreamManager(
ctx,
uuid.New().String(),
subName,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
undeclared name: subName (typecheck)

getSubscriptionCoreMock(ctrl, test.wantErr),
getSubscriberCoreMock(ctx, ctrl),
&httpclient.Config{},
)
assert.Equal(t, test.wantErr, err != nil)
assert.Equal(t, got == nil, test.wantErr)
}
}

func TestPushStreamManager_Run(t *testing.T) {
ctrl := gomock.NewController(t)
ctx := context.Background()

psm, err := NewPushStreamManager(
ctx,
uuid.New().String(),
subName,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
undeclared name: subName (typecheck)

getSubscriptionCoreMock(ctrl, false),
getSubscriberCoreMock(ctx, ctrl),
&httpclient.Config{},
)
assert.NoError(t, err)
assert.NotNil(t, psm)
psm.Run()
streamObj := psm.ps
psm.ps.GetErrorChannel() <- fmt.Errorf("Something went wrong")
<-time.NewTicker(1 * time.Second).C
assert.NotNil(t, psm.ps)
assert.NotEqual(t, psm.ps, streamObj)
}

func TestPushStreamManager_Stop(t *testing.T) {
ctrl := gomock.NewController(t)
ctx := context.Background()

psm, err := NewPushStreamManager(
ctx,
uuid.New().String(),
subName,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
undeclared name: subName (typecheck)

getSubscriptionCoreMock(ctrl, false),
getSubscriberCoreMock(ctx, ctrl),
&httpclient.Config{},
)
assert.NoError(t, err)
assert.NotNil(t, psm)
psm.Run()

// Stop the stream manager and it should be stopped without any error
psm.Stop()
assert.NotNil(t, psm.ctx.Err())
assert.Equal(t, psm.ctx.Err(), context.Canceled)
}

func getSubscriberCoreMock(ctx context.Context, ctrl *gomock.Controller) *mocks2.MockICore {
subscriberCoreMock := mocks2.NewMockICore(ctrl)
subModel := getMockSubModel("")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
undeclared name: getMockSubModel (typecheck)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is defined in same test package but a different file but golang cli linting is not checking all the files.
golangci/golangci-lint#1574
Seems like we need to change some configs

subscriberCoreMock.EXPECT().NewSubscriber(
ctx,
gomock.Any(),
subModel,
defaultTimeoutMs,
defaultMaxOutstandingMsgs,
defaultMaxOuttandingBytes,
gomock.AssignableToTypeOf(make(chan *subscriber.PullRequest)),
gomock.AssignableToTypeOf(make(chan *subscriber.AckMessage)),
gomock.AssignableToTypeOf(make(chan *subscriber.ModAckMessage))).AnyTimes().Return(getMockSubscriber(ctx, ctrl), nil)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
undeclared name: getMockSubscriber (typecheck)

return subscriberCoreMock
}

func getSubscriptionCoreMock(ctrl *gomock.Controller, wantErr bool) *mocks1.MockICore {
subscriptionCoreMock := mocks1.NewMockICore(ctrl)
if wantErr {
subscriptionCoreMock.EXPECT().Get(gomock.Any(), subName).AnyTimes().Return(nil, fmt.Errorf("Something went wrong"))
} else {
subscriptionCoreMock.EXPECT().Get(gomock.Any(), subName).AnyTimes().Return(getMockSubModel(""), nil)
}
return subscriptionCoreMock
}
72 changes: 7 additions & 65 deletions internal/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ type PushStream struct {
processor *processor
doneCh chan struct{}
counter int64
restartChan chan bool
stopChan chan bool
errChan chan error
}

const (
Expand All @@ -46,14 +45,9 @@ const (
defaultMaxOuttandingBytes int64 = 0
)

// GetRestartChannel returns the chan where restart request is received
func (ps *PushStream) GetRestartChannel() chan bool {
return ps.restartChan
}

// GetStopChannel returns the chan where stop request is received
func (ps *PushStream) GetStopChannel() chan bool {
return ps.stopChan
// GetErrorChannel returns the chan where stream errors are received
func (ps *PushStream) GetErrorChannel() chan error {
return ps.errChan
}

// Start reads the messages from the broker and publish them to the subscription endpoint
Expand All @@ -76,7 +70,7 @@ func (ps *PushStream) Start() error {
ps.subs, err = ps.subscriberCore.NewSubscriber(subscriberCtx, ps.nodeID, ps.subscription, defaultTimeoutMs,
defaultMaxOutstandingMsgs, defaultMaxOuttandingBytes, subscriberRequestCh, subscriberAckCh, subscriberModAckCh)
if err != nil {
ps.restartChan <- true
ps.errChan <- err
logger.Ctx(ps.ctx).Errorw("worker: error creating subscriber", "subscription", ps.subscription.Name, "error", err.Error())
return err
}
Expand Down Expand Up @@ -105,7 +99,7 @@ func (ps *PushStream) Start() error {
logger.Ctx(ps.ctx).Errorw("worker: error from subscriber, restarting", "logFields", ps.getLogFields(), "error", err.Error())
workerSubscriberErrors.WithLabelValues(env, ps.subscription.Topic, ps.subscription.Name, err.Error(), ps.subs.GetID()).Inc()
if err = ps.restartSubsciber(); err != nil {
ps.restartChan <- true
ps.errChan <- err
return err
}
} else {
Expand Down Expand Up @@ -182,31 +176,6 @@ func (ps *PushStream) Stop() error {
return nil
}

// Restart is used to restart the push subscription processing
func (ps *PushStream) Restart(ctx context.Context) {
logger.Ctx(ps.ctx).Infow("worker: push stream restart invoked", "subscription", ps.subscription.Name)
err := ps.Stop()
if err != nil {
logger.Ctx(ctx).Errorw(
"worker: push stream stop error",
"subscription", ps.subscription.Name,
"error", err.Error(),
)
return
}
go func(ctx context.Context) {
err := ps.Start()
if err != nil {
logger.Ctx(ctx).Errorw(
"worker: push stream restart error",
"subscription", ps.subscription.Name,
"error", err.Error(),
)
}
}(ctx)
workerComponentRestartCount.WithLabelValues(env, "stream", ps.subscription.Topic, ps.subscription.Name).Inc()
}

func (ps *PushStream) stopSubscriber() {
// stop the subscriber
if ps.subs != nil {
Expand Down Expand Up @@ -364,8 +333,7 @@ func NewPushStream(ctx context.Context, nodeID string, subName string, subscript
doneCh: doneCh,
httpClient: httpclient,
counter: 0,
restartChan: make(chan bool),
stopChan: make(chan bool),
errChan: make(chan error),
}, nil
}

Expand Down Expand Up @@ -413,29 +381,3 @@ func getRequestBytes(pushRequest *metrov1.PushEndpointRequest) *bytes.Buffer {
}
return bytes.NewBuffer(b)
}

// RunPushStreamManager return a runs a push stream manager used to handle restart/stop requests
func (ps *PushStream) RunPushStreamManager(ctx context.Context) {
logger.Ctx(ctx).Infow("worker: started running stream manager", "subscription", ps.subscription.Name)

go func() {
defer logger.Ctx(ctx).Infow("worker: exiting stream manager", "subscription", ps.subscription.Name)

for {
select {
case <-ctx.Done():
return
case <-ps.GetRestartChannel():
logger.Ctx(ctx).Infow("worker: restarting stream handler", "subscription", ps.subscription.Name)
ps.Restart(ctx)
case <-ps.GetStopChannel():
logger.Ctx(ctx).Infow("worker: stopping stream handler", "subscription", ps.subscription.Name)
err := ps.Stop()
if err != nil {
logger.Ctx(ctx).Infow("worker: error in exiting stream handler ", "error", err.Error())
}
return
}
}
}()
}
35 changes: 11 additions & 24 deletions internal/tasks/subscriptiontask.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,10 @@ func (sm *SubscriptionTask) handleNodeBindingUpdates(ctx context.Context, newBin
logger.Ctx(ctx).Infow("binding removed", "key", oldKey)
if handler, ok := sm.pushHandlers.Load(oldKey); ok && handler != nil {
logger.Ctx(ctx).Infow("handler found, calling stop", "key", oldKey)
handler.(*stream.PushStream).GetStopChannel() <- true
logger.Ctx(ctx).Infow("handler stopped", "key", oldKey)
go func(ctx context.Context) {
handler.(*stream.PushStreamManager).Stop()
logger.Ctx(ctx).Infow("handler stopped", "key", oldKey)
}(ctx)
sm.pushHandlers.Delete(oldKey)
}
}
Expand All @@ -215,26 +217,15 @@ func (sm *SubscriptionTask) handleNodeBindingUpdates(ctx context.Context, newBin
"subscription": newBinding.SubscriptionID,
"nodeID": newBinding.NodeID,
})
handler, err := stream.NewPushStream(ctx, newBinding.ID, newBinding.SubscriptionID, sm.subscriptionCore, sm.subscriber, sm.httpConfig)
handler, err := stream.NewPushStreamManager(ctx, newBinding.ID, newBinding.SubscriptionID, sm.subscriptionCore, sm.subscriber, sm.httpConfig)
if err != nil {
logger.Ctx(ctx).Errorw("subscriptionstask: Failed to setup handler for subscription", "logFields", map[string]interface{}{
"subscription": newBinding.SubscriptionID,
"bindingKey": newBinding.Key,
"bindingKey": newBinding.Key(),
})
}

handler.RunPushStreamManager(ctx)

// run the stream in a separate go routine, this go routine is not part of the worker error group
// as the worker should continue to run if a single subscription stream exists with error
go func(ctx context.Context) {
err := handler.Start()
if err != nil {
logger.Ctx(ctx).Errorw("[worker]: push stream handler exited",
"subscription", newBinding.SubscriptionID,
"error", err.Error())
}
}(ctx)
handler.Run()

// store the handler
sm.pushHandlers.Store(newBinding.Key(), handler)
Expand All @@ -251,17 +242,13 @@ func (sm *SubscriptionTask) stopPushHandlers(ctx context.Context) {
wg := sync.WaitGroup{}
sm.pushHandlers.Range(func(_, handler interface{}) bool {
wg.Add(1)
ps := handler.(*stream.PushStream)
go func(ps *stream.PushStream, wg *sync.WaitGroup) {
psm := handler.(*stream.PushStreamManager)
go func(psm *stream.PushStreamManager, wg *sync.WaitGroup) {
defer wg.Done()

err := ps.Stop()
if err != nil {
logger.Ctx(ctx).Errorw("error stopping stream handler", "error", err)
return
}
psm.Stop()
logger.Ctx(ctx).Infow("successfully stopped stream handler")
}(ps, &wg)
}(psm, &wg)
return true
})
wg.Wait()
Expand Down