Skip to content

Commit

Permalink
refactor(pubsublite): retryableStream cleanups (#3392)
Browse files Browse the repository at this point in the history
Added initialResponseRequired for clarity. Use unsafeTerminate when mutex is already held.
  • Loading branch information
tmdiep committed Dec 22, 2020
1 parent 81fc2d1 commit 87b972c
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 22 deletions.
6 changes: 3 additions & 3 deletions pubsublite/internal/wire/assigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,12 @@ func (a *assigner) newStream(ctx context.Context) (grpc.ClientStream, error) {
return a.assignmentClient.AssignPartitions(ctx)
}

func (a *assigner) initialRequest() (interface{}, bool) {
return a.initialReq, false // No initial response expected
func (a *assigner) initialRequest() (interface{}, initialResponseRequired) {
return a.initialReq, initialResponseRequired(false)
}

func (a *assigner) validateInitialResponse(_ interface{}) error {
// Should not be called.
// Should not be called as initialResponseRequired=false above.
return errors.New("pubsublite: unexpected initial response")
}

Expand Down
6 changes: 2 additions & 4 deletions pubsublite/internal/wire/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,8 @@ func (c *committer) newStream(ctx context.Context) (grpc.ClientStream, error) {
return c.cursorClient.StreamingCommitCursor(ctx)
}

func (c *committer) initialRequest() (req interface{}, needsResp bool) {
req = c.initialReq
needsResp = true
return
func (c *committer) initialRequest() (interface{}, initialResponseRequired) {
return c.initialReq, initialResponseRequired(true)
}

func (c *committer) validateInitialResponse(response interface{}) error {
Expand Down
4 changes: 2 additions & 2 deletions pubsublite/internal/wire/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ func (pp *singlePartitionPublisher) newStream(ctx context.Context) (grpc.ClientS
return pp.pubClient.Publish(addTopicRoutingMetadata(ctx, pp.topic))
}

func (pp *singlePartitionPublisher) initialRequest() (interface{}, bool) {
return pp.initialReq, true
func (pp *singlePartitionPublisher) initialRequest() (interface{}, initialResponseRequired) {
return pp.initialReq, initialResponseRequired(true)
}

func (pp *singlePartitionPublisher) validateInitialResponse(response interface{}) error {
Expand Down
21 changes: 12 additions & 9 deletions pubsublite/internal/wire/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ const (
streamTerminated streamStatus = 3
)

type initialResponseRequired bool

// streamHandler provides hooks for different Pub/Sub Lite streaming APIs
// (e.g. publish, subscribe, streaming cursor, etc.) to use retryableStream.
// All Pub/Sub Lite streaming APIs implement a similar handshaking protocol,
Expand All @@ -56,7 +58,7 @@ type streamHandler interface {
newStream(context.Context) (grpc.ClientStream, error)
// initialRequest should return the initial request and whether an initial
// response is expected.
initialRequest() (interface{}, bool)
initialRequest() (interface{}, initialResponseRequired)
validateInitialResponse(interface{}) error

// onStreamStatusChange is used to notify stream handlers when the stream has
Expand Down Expand Up @@ -120,10 +122,9 @@ func (rs *retryableStream) Start() {
rs.mu.Lock()
defer rs.mu.Unlock()

if rs.status != streamUninitialized {
return
if rs.status == streamUninitialized {
go rs.connectStream()
}
go rs.connectStream()
}

// Stop gracefully closes the stream without error.
Expand All @@ -136,6 +137,7 @@ func (rs *retryableStream) Stop() {
// in progress.
func (rs *retryableStream) Send(request interface{}) (sent bool) {
rs.mu.Lock()
defer rs.mu.Unlock()

if rs.stream != nil {
err := rs.stream.SendMsg(request)
Expand All @@ -150,13 +152,9 @@ func (rs *retryableStream) Send(request interface{}) (sent bool) {
case isRetryableSendError(err):
go rs.connectStream()
default:
rs.mu.Unlock() // terminate acquires the mutex.
rs.terminate(err)
return
rs.unsafeTerminate(err)
}
}

rs.mu.Unlock()
return
}

Expand Down Expand Up @@ -196,6 +194,8 @@ func (rs *retryableStream) unsafeClearStream() {
func (rs *retryableStream) setCancel(cancel context.CancelFunc) {
rs.mu.Lock()
defer rs.mu.Unlock()

rs.unsafeClearStream()
rs.cancelStream = cancel
}

Expand Down Expand Up @@ -337,7 +337,10 @@ func (rs *retryableStream) listen(recvStream grpc.ClientStream) {
func (rs *retryableStream) terminate(err error) {
rs.mu.Lock()
defer rs.mu.Unlock()
rs.unsafeTerminate(err)
}

func (rs *retryableStream) unsafeTerminate(err error) {
if rs.status == streamTerminated {
return
}
Expand Down
4 changes: 2 additions & 2 deletions pubsublite/internal/wire/streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ func (sh *testStreamHandler) validateInitialResponse(response interface{}) error
return nil
}

func (sh *testStreamHandler) initialRequest() (interface{}, bool) {
return sh.InitialReq, true
func (sh *testStreamHandler) initialRequest() (interface{}, initialResponseRequired) {
return sh.InitialReq, initialResponseRequired(true)
}

func (sh *testStreamHandler) onStreamStatusChange(status streamStatus) {
Expand Down
4 changes: 2 additions & 2 deletions pubsublite/internal/wire/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,8 @@ func (s *subscribeStream) newStream(ctx context.Context) (grpc.ClientStream, err
return s.subClient.Subscribe(addSubscriptionRoutingMetadata(ctx, s.subscription))
}

func (s *subscribeStream) initialRequest() (interface{}, bool) {
return s.initialReq, true
func (s *subscribeStream) initialRequest() (interface{}, initialResponseRequired) {
return s.initialReq, initialResponseRequired(true)
}

func (s *subscribeStream) validateInitialResponse(response interface{}) error {
Expand Down

0 comments on commit 87b972c

Please sign in to comment.