New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stream restart bug fix by introducing manager #412
Conversation
subscriberCoreMock := mocks2.NewMockICore(ctrl) | ||
workerID := uuid.New().String() | ||
httpConfig := &httpclient.Config{} | ||
subModel := getMockSubModel("") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚫 [golangci] reported by reviewdog 🐶
undeclared name: getMockSubModel
(typecheck)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is defined in same test package but a different file but golang cli linting is not checking all the files.
golangci/golangci-lint#1574
Seems like we need to change some configs
internal/stream/manager_test.go
Outdated
subModel := getMockSubModel("") | ||
|
||
if wantErr { | ||
subscriptionCoreMock.EXPECT().Get(gomock.Any(), subName).AnyTimes().Return(nil, fmt.Errorf("Something went wrong")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚫 [golangci] reported by reviewdog 🐶
undeclared name: subName
(typecheck)
internal/stream/manager_test.go
Outdated
if wantErr { | ||
subscriptionCoreMock.EXPECT().Get(gomock.Any(), subName).AnyTimes().Return(nil, fmt.Errorf("Something went wrong")) | ||
} else { | ||
subscriptionCoreMock.EXPECT().Get(gomock.Any(), subName).AnyTimes().Return(getMockSubModel(""), nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚫 [golangci] reported by reviewdog 🐶
undeclared name: subName
(typecheck)
defaultMaxOuttandingBytes, | ||
gomock.AssignableToTypeOf(make(chan *subscriber.PullRequest)), | ||
gomock.AssignableToTypeOf(make(chan *subscriber.AckMessage)), | ||
gomock.AssignableToTypeOf(make(chan *subscriber.ModAckMessage))).AnyTimes().Return(getMockSubscriber(ctx, ctrl), nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚫 [golangci] reported by reviewdog 🐶
undeclared name: getMockSubscriber
(typecheck)
internal/stream/manager_test.go
Outdated
gomock.AssignableToTypeOf(make(chan *subscriber.PullRequest)), | ||
gomock.AssignableToTypeOf(make(chan *subscriber.AckMessage)), | ||
gomock.AssignableToTypeOf(make(chan *subscriber.ModAckMessage))).AnyTimes().Return(getMockSubscriber(ctx, ctrl), nil) | ||
return NewPushStreamManager(ctx, workerID, subName, subscriptionCoreMock, subscriberCoreMock, httpConfig) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚫 [golangci] reported by reviewdog 🐶
undeclared name: subName
(typecheck)
Codecov Report
@@ Coverage Diff @@
## master #412 +/- ##
==========================================
+ Coverage 59.17% 59.54% +0.36%
==========================================
Files 124 125 +1
Lines 9570 9580 +10
==========================================
+ Hits 5663 5704 +41
+ Misses 3546 3515 -31
Partials 361 361
Flags with carried forward coverage won't be shown. Click here to find out more.
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. |
got, err := NewPushStreamManager( | ||
ctx, | ||
uuid.New().String(), | ||
subName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚫 [golangci] reported by reviewdog 🐶
undeclared name: subName
(typecheck)
psm, err := NewPushStreamManager( | ||
ctx, | ||
uuid.New().String(), | ||
subName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚫 [golangci] reported by reviewdog 🐶
undeclared name: subName
(typecheck)
psm, err := NewPushStreamManager( | ||
ctx, | ||
uuid.New().String(), | ||
subName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚫 [golangci] reported by reviewdog 🐶
undeclared name: subName
(typecheck)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me but please resolve the comments by reviewdog.
this is defined in same test package but a different file but golang cli linting is not checking all the files. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
internal/stream/manager.go
Outdated
} | ||
|
||
func (psm *PushStreamManager) restartPushStream() { | ||
psm.ps.Stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please wrap this under a semaphore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also let's verify if stop returns an error before we start a new one.
psm.ps.Stop() | |
err := psm.ps.Stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vnktram Why we want to wrap this inside a semaphore. This is called from the select for loop, and at a time, it will be executed by one go routine
Description
Stream restart was not working because same ctx was used due to which we were not able to restart it. Have moved all the stream manager code to separate module, and also, re-initialised stream before restarting.
Fixes https://razorpay.atlassian.net/browse/METRO-127
Type of change
How Has This Been Tested?
Is it a breaking change?
No, only the restart and stop functionality will be affected in case something is broken
Can this change be tested effectively via canary? No, as it only affects worker component, and canary is not done for worker yet
Checklist: