Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream restart bug fix by introducing manager #412

Merged
merged 12 commits into from
Sep 1, 2022
104 changes: 104 additions & 0 deletions internal/stream/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package stream

import (
"context"

"github.com/razorpay/metro/internal/subscriber"
"github.com/razorpay/metro/internal/subscription"
"github.com/razorpay/metro/pkg/httpclient"
"github.com/razorpay/metro/pkg/logger"
)

type PushStreamManager struct {
ctx context.Context
cancelFunc func()
doneCh chan struct{}
config *httpclient.Config
ps *PushStream
}

func NewPushStreamManager(ctx context.Context, nodeID string, subName string, subscriptionCore subscription.ICore, subscriberCore subscriber.ICore, config *httpclient.Config) (*PushStreamManager, error) {
ps, err := NewPushStream(ctx, nodeID, subName, subscriptionCore, subscriberCore, config)
if err != nil {
logger.Ctx(ctx).Errorw("push stream manager: Failed to setup push stream for subscription", "logFields", map[string]interface{}{
"subscription": subName,
"nodeID": nodeID,
})
return nil, err
}
ctx, cancelFunc := context.WithCancel(ctx)
return &PushStreamManager{
ctx: ctx,
cancelFunc: cancelFunc,
ps: ps,
doneCh: make(chan struct{}),
config: config,
}, nil
}

// Run starts the push stream manager that is used to manage underlying stream
func (psm *PushStreamManager) Run() {
defer close(psm.doneCh)

logger.Ctx(psm.ctx).Infow("push stream manager: started running stream manager", "subscription", psm.ps.subscription.Name)

// run the stream in a separate go routine, this go routine is not part of the worker error group
// as the worker should continue to run if a single subscription stream exists with error
go func(ctx context.Context) {
err := psm.ps.Start()
if err != nil {
logger.Ctx(ctx).Infow("push stream manager: stream exited",
"subscription", psm.ps.subscription.Name,
"error", err.Error(),
)
}
}(psm.ctx)

go func() {
defer logger.Ctx(psm.ctx).Infow("push stream manager: exiting stream manager", "subscription", psm.ps.subscription.Name)

for {
select {
case <-psm.ctx.Done():
psm.ps.Stop()
return
case <-psm.ps.GetErrorChannel():
logger.Ctx(psm.ctx).Infow("push stream manager: restarting stream handler", "subscription", psm.ps.subscription.Name)
psm.restartPushStream()
}
}
}()
}

// StopPushStream stops the stream manager along with the underlying stream
func (psm *PushStreamManager) Stop() error {
logger.Ctx(psm.ctx).Infow("push stream manager: stop invoked", "subscription", psm.ps.subscription.Name)
psm.cancelFunc()
<-psm.doneCh
vnktram marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

func (psm *PushStreamManager) restartPushStream() {
if err := psm.ps.Stop(); err != nil {
logger.Ctx(psm.ctx).Errorw(
"push stream manager: stream stop error",
"subscription", psm.ps.subscription.Name,
"error", err.Error(),
)
return
}

var err error
psm.ps, err = NewPushStream(psm.ctx, psm.ps.nodeID, psm.ps.subscription.Name, psm.ps.subscriptionCore, psm.ps.subscriberCore, psm.config)
go func(ctx context.Context) {
err = psm.ps.Start()
if err != nil {
logger.Ctx(ctx).Errorw(
"push stream manager: stream restart error",
"subscription", psm.ps.subscription.Name,
"error", err.Error(),
)
}
}(psm.ctx)
workerComponentRestartCount.WithLabelValues(env, "stream", psm.ps.subscription.Topic, psm.ps.subscription.Name).Inc()
}
88 changes: 88 additions & 0 deletions internal/stream/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package stream

import (
"context"
"fmt"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/google/uuid"
"github.com/razorpay/metro/internal/subscriber"
mocks2 "github.com/razorpay/metro/internal/subscriber/mocks"
mocks1 "github.com/razorpay/metro/internal/subscription/mocks/core"
"github.com/razorpay/metro/pkg/httpclient"
"github.com/stretchr/testify/assert"
)

func TestNewPushStreamManager(t *testing.T) {
ctrl := gomock.NewController(t)

tests := []struct {
wantErr bool
}{
{
wantErr: false,
},
{
wantErr: true,
},
}

for _, test := range tests {
got, err := getMockPushStreamManager(ctrl, test.wantErr)
assert.Equal(t, test.wantErr, err != nil)
assert.Equal(t, got == nil, test.wantErr)
}
}

func TestPushStreamManager_Run(t *testing.T) {
ctrl := gomock.NewController(t)
psm, err := getMockPushStreamManager(ctrl, false)
assert.NoError(t, err)
assert.NotNil(t, psm)
psm.Run()

// Push an error to errChan of stream and it should still be running
psm.ps.GetErrorChannel() <- fmt.Errorf("Something went wrong")
<-time.NewTicker(1 * time.Second).C
assert.NotNil(t, psm.ps)
}

func TestPushStreamManager_Stop(t *testing.T) {
ctrl := gomock.NewController(t)
psm, err := getMockPushStreamManager(ctrl, false)
assert.NoError(t, err)
assert.NotNil(t, psm)
psm.Run()

// Stop the stream manager and it should be stopped without any error
err = psm.Stop()
assert.NoError(t, err)
}

func getMockPushStreamManager(ctrl *gomock.Controller, wantErr bool) (*PushStreamManager, error) {
ctx := context.Background()
subscriptionCoreMock := mocks1.NewMockICore(ctrl)
subscriberCoreMock := mocks2.NewMockICore(ctrl)
workerID := uuid.New().String()
httpConfig := &httpclient.Config{}
subModel := getMockSubModel("")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
undeclared name: getMockSubModel (typecheck)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is defined in same test package but a different file but golang cli linting is not checking all the files.
golangci/golangci-lint#1574
Seems like we need to change some configs


if wantErr {
subscriptionCoreMock.EXPECT().Get(gomock.Any(), subName).AnyTimes().Return(nil, fmt.Errorf("Something went wrong"))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
undeclared name: subName (typecheck)

} else {
subscriptionCoreMock.EXPECT().Get(gomock.Any(), subName).AnyTimes().Return(getMockSubModel(""), nil)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
undeclared name: subName (typecheck)

}
subscriberCoreMock.EXPECT().NewSubscriber(
ctx,
workerID,
subModel,
defaultTimeoutMs,
defaultMaxOutstandingMsgs,
defaultMaxOuttandingBytes,
gomock.AssignableToTypeOf(make(chan *subscriber.PullRequest)),
gomock.AssignableToTypeOf(make(chan *subscriber.AckMessage)),
gomock.AssignableToTypeOf(make(chan *subscriber.ModAckMessage))).AnyTimes().Return(getMockSubscriber(ctx, ctrl), nil)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
undeclared name: getMockSubscriber (typecheck)

return NewPushStreamManager(ctx, workerID, subName, subscriptionCoreMock, subscriberCoreMock, httpConfig)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
undeclared name: subName (typecheck)

}
72 changes: 7 additions & 65 deletions internal/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ type PushStream struct {
processor *processor
doneCh chan struct{}
counter int64
restartChan chan bool
stopChan chan bool
errChan chan error
}

const (
Expand All @@ -46,14 +45,9 @@ const (
defaultMaxOuttandingBytes int64 = 0
)

// GetRestartChannel returns the chan where restart request is received
func (ps *PushStream) GetRestartChannel() chan bool {
return ps.restartChan
}

// GetStopChannel returns the chan where stop request is received
func (ps *PushStream) GetStopChannel() chan bool {
return ps.stopChan
// GetErrorChannel returns the chan where stream errors are received
func (ps *PushStream) GetErrorChannel() chan error {
return ps.errChan
}

// Start reads the messages from the broker and publish them to the subscription endpoint
Expand All @@ -76,7 +70,7 @@ func (ps *PushStream) Start() error {
ps.subs, err = ps.subscriberCore.NewSubscriber(subscriberCtx, ps.nodeID, ps.subscription, defaultTimeoutMs,
defaultMaxOutstandingMsgs, defaultMaxOuttandingBytes, subscriberRequestCh, subscriberAckCh, subscriberModAckCh)
if err != nil {
ps.restartChan <- true
ps.errChan <- err
logger.Ctx(ps.ctx).Errorw("worker: error creating subscriber", "subscription", ps.subscription.Name, "error", err.Error())
return err
}
Expand Down Expand Up @@ -105,7 +99,7 @@ func (ps *PushStream) Start() error {
logger.Ctx(ps.ctx).Errorw("worker: error from subscriber, restarting", "logFields", ps.getLogFields(), "error", err.Error())
workerSubscriberErrors.WithLabelValues(env, ps.subscription.Topic, ps.subscription.Name, err.Error(), ps.subs.GetID()).Inc()
if err = ps.restartSubsciber(); err != nil {
ps.restartChan <- true
ps.errChan <- err
return err
}
} else {
Expand Down Expand Up @@ -182,31 +176,6 @@ func (ps *PushStream) Stop() error {
return nil
}

// Restart is used to restart the push subscription processing
func (ps *PushStream) Restart(ctx context.Context) {
logger.Ctx(ps.ctx).Infow("worker: push stream restart invoked", "subscription", ps.subscription.Name)
err := ps.Stop()
if err != nil {
logger.Ctx(ctx).Errorw(
"worker: push stream stop error",
"subscription", ps.subscription.Name,
"error", err.Error(),
)
return
}
go func(ctx context.Context) {
err := ps.Start()
if err != nil {
logger.Ctx(ctx).Errorw(
"worker: push stream restart error",
"subscription", ps.subscription.Name,
"error", err.Error(),
)
}
}(ctx)
workerComponentRestartCount.WithLabelValues(env, "stream", ps.subscription.Topic, ps.subscription.Name).Inc()
}

func (ps *PushStream) stopSubscriber() {
// stop the subscriber
if ps.subs != nil {
Expand Down Expand Up @@ -364,8 +333,7 @@ func NewPushStream(ctx context.Context, nodeID string, subName string, subscript
doneCh: doneCh,
httpClient: httpclient,
counter: 0,
restartChan: make(chan bool),
stopChan: make(chan bool),
errChan: make(chan error),
}, nil
}

Expand Down Expand Up @@ -413,29 +381,3 @@ func getRequestBytes(pushRequest *metrov1.PushEndpointRequest) *bytes.Buffer {
}
return bytes.NewBuffer(b)
}

// RunPushStreamManager return a runs a push stream manager used to handle restart/stop requests
func (ps *PushStream) RunPushStreamManager(ctx context.Context) {
logger.Ctx(ctx).Infow("worker: started running stream manager", "subscription", ps.subscription.Name)

go func() {
defer logger.Ctx(ctx).Infow("worker: exiting stream manager", "subscription", ps.subscription.Name)

for {
select {
case <-ctx.Done():
return
case <-ps.GetRestartChannel():
logger.Ctx(ctx).Infow("worker: restarting stream handler", "subscription", ps.subscription.Name)
ps.Restart(ctx)
case <-ps.GetStopChannel():
logger.Ctx(ctx).Infow("worker: stopping stream handler", "subscription", ps.subscription.Name)
err := ps.Stop()
if err != nil {
logger.Ctx(ctx).Infow("worker: error in exiting stream handler ", "error", err.Error())
}
return
}
}
}()
}
25 changes: 7 additions & 18 deletions internal/tasks/subscriptiontask.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (sm *SubscriptionTask) handleNodeBindingUpdates(ctx context.Context, newBin
logger.Ctx(ctx).Infow("binding removed", "key", oldKey)
if handler, ok := sm.pushHandlers.Load(oldKey); ok && handler != nil {
logger.Ctx(ctx).Infow("handler found, calling stop", "key", oldKey)
handler.(*stream.PushStream).GetStopChannel() <- true
handler.(*stream.PushStreamManager).Stop()
sakshi-goyal-razorpay marked this conversation as resolved.
Show resolved Hide resolved
logger.Ctx(ctx).Infow("handler stopped", "key", oldKey)
sm.pushHandlers.Delete(oldKey)
}
Expand All @@ -215,26 +215,15 @@ func (sm *SubscriptionTask) handleNodeBindingUpdates(ctx context.Context, newBin
"subscription": newBinding.SubscriptionID,
"nodeID": newBinding.NodeID,
})
handler, err := stream.NewPushStream(ctx, newBinding.ID, newBinding.SubscriptionID, sm.subscriptionCore, sm.subscriber, sm.httpConfig)
handler, err := stream.NewPushStreamManager(ctx, newBinding.ID, newBinding.SubscriptionID, sm.subscriptionCore, sm.subscriber, sm.httpConfig)
if err != nil {
logger.Ctx(ctx).Errorw("subscriptionstask: Failed to setup handler for subscription", "logFields", map[string]interface{}{
"subscription": newBinding.SubscriptionID,
"bindingKey": newBinding.Key,
})
}

handler.RunPushStreamManager(ctx)

// run the stream in a separate go routine, this go routine is not part of the worker error group
// as the worker should continue to run if a single subscription stream exists with error
go func(ctx context.Context) {
err := handler.Start()
if err != nil {
logger.Ctx(ctx).Errorw("[worker]: push stream handler exited",
"subscription", newBinding.SubscriptionID,
"error", err.Error())
}
}(ctx)
handler.Run()

// store the handler
sm.pushHandlers.Store(newBinding.Key(), handler)
Expand All @@ -251,17 +240,17 @@ func (sm *SubscriptionTask) stopPushHandlers(ctx context.Context) {
wg := sync.WaitGroup{}
sm.pushHandlers.Range(func(_, handler interface{}) bool {
wg.Add(1)
ps := handler.(*stream.PushStream)
go func(ps *stream.PushStream, wg *sync.WaitGroup) {
psm := handler.(*stream.PushStreamManager)
go func(psm *stream.PushStreamManager, wg *sync.WaitGroup) {
defer wg.Done()

err := ps.Stop()
err := psm.Stop()
if err != nil {
logger.Ctx(ctx).Errorw("error stopping stream handler", "error", err)
return
}
logger.Ctx(ctx).Infow("successfully stopped stream handler")
}(ps, &wg)
}(psm, &wg)
return true
})
wg.Wait()
Expand Down