Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GODRIVER-2533 Fix data race from NumberSessionsInProgress. #1085

Merged
merged 4 commits into from Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion mongo/client.go
Expand Up @@ -792,7 +792,10 @@ func (c *Client) Watch(ctx context.Context, pipeline interface{},
// NumberSessionsInProgress returns the number of sessions that have been started for this client but have not been
// closed (i.e. EndSession has not been called).
func (c *Client) NumberSessionsInProgress() int {
return c.sessionPool.CheckedOut()
// The underlying session pool uses an int64 for checkedOut to allow atomic
// access. We convert to an int here to maintain backward compatability with
// older versions of the driver that did not atomically access checkedOut.
return int(c.sessionPool.CheckedOut())
}

// Timeout returns the timeout set for this client.
Expand Down
37 changes: 37 additions & 0 deletions mongo/integration/sessions_test.go
Expand Up @@ -475,6 +475,43 @@ func TestSessions(t *testing.T) {
assert.True(mt, limitedSessionUse, limitedSessMsg, len(ops))

})

// Regression test for GODRIVER-2533. Note that this test assumes the race detector is enabled
// (GODRIVER-2072).
mt.Run("NumberSessionsInProgress data race", func(mt *mtest.T) {
// Start two goroutines under the same 100ms Context that continously run
// NumberSessionsInProgress and a basic collection operation
// (CountDocuments) simultaneously.
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This select statement seems like it will block until the ctx.Done() channel is closed and will never reach the call below. To get around that, either add a default case or check if the context is done in the for loop.

E.g.

for ctx.Err() == nil {
	_ = mt.Client.NumberSessionsInProgress()
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little concerned about the busy waiting here as it occupies the CPU while waiting. Will this be an issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL about the anti-pattern of busy-waiting, thanks 🧑‍🔧 . I modified the test to only run each operation 5 times and used a channel to sync the goroutines' execution as opposed to a context. Note that the test fails now only about 75% of the time when checkedOut is not atomically accessed (as opposed to closer to 95% of the time with the previous design). I think that's fine, though.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think short busy-wait loops in tests that are trying to exercise a specific non-deterministic behavior are probably OK, but I do think the "run N times" loops are a good way to achieve that also. I left some suggestions about possible ways to further improve the reliability of the tests in my subsequent review.


_ = mt.Client.NumberSessionsInProgress()
}
benjirewis marked this conversation as resolved.
Show resolved Hide resolved
}(ctx)

go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
}
benjirewis marked this conversation as resolved.
Show resolved Hide resolved

_, err := mt.Coll.CountDocuments(context.Background(), bson.D{})
benjirewis marked this conversation as resolved.
Show resolved Hide resolved
assert.Nil(mt, err, "CountDocument error: %v", err)
}
}(ctx)

// Wait for context to be done to ensure goroutines stop using mtest Client
// before test cleanup.
<-ctx.Done()
})
}

func assertCollectionCount(mt *mtest.T, expectedCount int64) {
Expand Down
16 changes: 9 additions & 7 deletions x/mongo/driver/session/session_pool.go
Expand Up @@ -8,6 +8,7 @@ package session

import (
"sync"
"sync/atomic"

"go.mongodb.org/mongo-driver/mongo/description"
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
Expand All @@ -29,13 +30,14 @@ type topologyDescription struct {

// Pool is a pool of server sessions that can be reused.
type Pool struct {
// number of sessions checked out of pool (accessed atomically)
checkedOut int64

descChan <-chan description.Topology
head *Node
tail *Node
latestTopology topologyDescription
mutex sync.Mutex // mutex to protect list and sessionTimeout

checkedOut int // number of sessions checked out of pool
}

func (p *Pool) createServerSession() (*Server, error) {
Expand All @@ -44,7 +46,7 @@ func (p *Pool) createServerSession() (*Server, error) {
return nil, err
}

p.checkedOut++
atomic.AddInt64(&p.checkedOut, 1)
return s, nil
}

Expand Down Expand Up @@ -100,7 +102,7 @@ func (p *Pool) GetSession() (*Server, error) {
p.head = p.head.next
}

p.checkedOut++
atomic.AddInt64(&p.checkedOut, 1)
return session, nil
}

Expand All @@ -118,7 +120,7 @@ func (p *Pool) ReturnSession(ss *Server) {
p.mutex.Lock()
defer p.mutex.Unlock()

p.checkedOut--
atomic.AddInt64(&p.checkedOut, -1)
p.updateTimeout()
// check sessions at end of queue for expired
// stop checking after hitting the first valid session
Expand Down Expand Up @@ -185,6 +187,6 @@ func (p *Pool) String() string {
}

// CheckedOut returns number of sessions checked out from pool.
func (p *Pool) CheckedOut() int {
return p.checkedOut
func (p *Pool) CheckedOut() int64 {
return atomic.LoadInt64(&p.checkedOut)
}