Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpcc: Fix potential deadlock in stream Sync #134

Merged
merged 2 commits into from Jun 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion rpcc/conn.go
Expand Up @@ -446,12 +446,12 @@ func (c *Conn) send(ctx context.Context, call *rpcCall) (err error) {
func (c *Conn) notify(method string, data []byte) {
c.mu.Lock()
stream := c.streams[method]
c.mu.Unlock()
if stream != nil {
// Stream writer must be able to handle incoming writes
// even after it has been removed (unsubscribed).
stream.write(method, data)
}
c.mu.Unlock()
}

// listen registers a new stream listener (chan) for the RPC notification
Expand Down
27 changes: 16 additions & 11 deletions rpcc/stream_sync_test.go
Expand Up @@ -2,6 +2,7 @@ package rpcc

import (
"context"
"fmt"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -81,9 +82,7 @@ func (s *fakeStream) Ready() <-chan struct{} { return nil }
func (s *fakeStream) RecvMsg(m interface{}) error { return nil }
func (s *fakeStream) Close() error { return nil }

var (
_ Stream = (*fakeStream)(nil)
)
var _ Stream = (*fakeStream)(nil)

func TestSyncError(t *testing.T) {
conn1, connCancel1 := newTestStreamConn()
Expand Down Expand Up @@ -149,7 +148,6 @@ func TestSyncError(t *testing.T) {
t.Error("Expected error, got nil")
}
})

}
}

Expand All @@ -163,18 +161,25 @@ func TestStreamSyncNotifyDeadlock(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer s1.Close()
s2, err := NewStream(ctx, "test2", conn)
if err != nil {
t.Fatal(err)
}
defer s2.Close()

go conn.notify("test1", []byte(`{"hello": "world"}`))
go conn.notify("test2", []byte(`{"hello": "world"}`))

// This could cause a deadlock due to competition for same mutexes:
// https://github.com/mafredri/cdp/issues/90
// https://github.com/mafredri/cdp/pull/91
err = Sync(s1, s2)
syncErr := make(chan error)
go func() {
// This could cause a deadlock due to competition for same mutexes:
// https://github.com/mafredri/cdp/issues/90
// https://github.com/mafredri/cdp/pull/91
syncErr <- Sync(s1, s2)
}()
for i := 0; i < 10; i++ {
conn.notify("test1", []byte(fmt.Sprintf(`{"hello": "world", "count": %d}`, i)))
conn.notify("test2", []byte(fmt.Sprintf(`{"hello": "world", "count": %d}`, i)))
}
err = <-syncErr
if err != nil {
t.Fatal(err)
}
Expand Down