Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a general purpose unbounded buffer implementation #3099

Merged
merged 3 commits into from Nov 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
78 changes: 16 additions & 62 deletions balancer_conn_wrappers.go
Expand Up @@ -25,6 +25,7 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/buffer"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/resolver"
)
Expand All @@ -35,73 +36,25 @@ type scStateUpdate struct {
state connectivity.State
}

// scStateUpdateBuffer is an unbounded channel for scStateChangeTuple.
// TODO make a general purpose buffer that uses interface{}.
type scStateUpdateBuffer struct {
c chan *scStateUpdate
mu sync.Mutex
backlog []*scStateUpdate
}

func newSCStateUpdateBuffer() *scStateUpdateBuffer {
return &scStateUpdateBuffer{
c: make(chan *scStateUpdate, 1),
}
}

func (b *scStateUpdateBuffer) put(t *scStateUpdate) {
b.mu.Lock()
defer b.mu.Unlock()
if len(b.backlog) == 0 {
select {
case b.c <- t:
return
default:
}
}
b.backlog = append(b.backlog, t)
}

func (b *scStateUpdateBuffer) load() {
b.mu.Lock()
defer b.mu.Unlock()
if len(b.backlog) > 0 {
select {
case b.c <- b.backlog[0]:
b.backlog[0] = nil
b.backlog = b.backlog[1:]
default:
}
}
}

// get returns the channel that the scStateUpdate will be sent to.
//
// Upon receiving, the caller should call load to send another
// scStateChangeTuple onto the channel if there is any.
func (b *scStateUpdateBuffer) get() <-chan *scStateUpdate {
return b.c
}

// ccBalancerWrapper is a wrapper on top of cc for balancers.
// It implements balancer.ClientConn interface.
type ccBalancerWrapper struct {
cc *ClientConn
balancerMu sync.Mutex // synchronizes calls to the balancer
balancer balancer.Balancer
stateChangeQueue *scStateUpdateBuffer
done *grpcsync.Event
cc *ClientConn
balancerMu sync.Mutex // synchronizes calls to the balancer
balancer balancer.Balancer
scBuffer *buffer.Unbounded
done *grpcsync.Event

mu sync.Mutex
subConns map[*acBalancerWrapper]struct{}
}

func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {
ccb := &ccBalancerWrapper{
cc: cc,
stateChangeQueue: newSCStateUpdateBuffer(),
done: grpcsync.NewEvent(),
subConns: make(map[*acBalancerWrapper]struct{}),
cc: cc,
scBuffer: buffer.NewUnbounded(),
done: grpcsync.NewEvent(),
subConns: make(map[*acBalancerWrapper]struct{}),
}
go ccb.watcher()
ccb.balancer = b.Build(ccb, bopts)
Expand All @@ -113,16 +66,17 @@ func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.Bui
func (ccb *ccBalancerWrapper) watcher() {
for {
select {
case t := <-ccb.stateChangeQueue.get():
ccb.stateChangeQueue.load()
case t := <-ccb.scBuffer.Get():
ccb.scBuffer.Load()
if ccb.done.HasFired() {
break
}
ccb.balancerMu.Lock()
su := t.(*scStateUpdate)
if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
ub.UpdateSubConnState(t.sc, balancer.SubConnState{ConnectivityState: t.state})
ub.UpdateSubConnState(su.sc, balancer.SubConnState{ConnectivityState: su.state})
} else {
ccb.balancer.HandleSubConnStateChange(t.sc, t.state)
ccb.balancer.HandleSubConnStateChange(su.sc, su.state)
}
ccb.balancerMu.Unlock()
case <-ccb.done.Done():
Expand Down Expand Up @@ -158,7 +112,7 @@ func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s co
if sc == nil {
return
}
ccb.stateChangeQueue.put(&scStateUpdate{
ccb.scBuffer.Put(&scStateUpdate{
sc: sc,
state: s,
})
Expand Down
78 changes: 78 additions & 0 deletions internal/buffer/unbounded.go
@@ -0,0 +1,78 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Package buffer provides an implementation of an unbounded buffer.
package buffer

import "sync"

// Unbounded is an implementation of an unbounded buffer which does not use
// extra goroutines. This is typically used for passing updates from one entity
// to another within gRPC.
//
// All methods on this type are thread-safe and don't block on anything except
// the underlying mutex used for synchronization.
type Unbounded struct {
c chan interface{}
mu sync.Mutex
backlog []interface{}
}

// NewUnbounded returns a new instance of Unbounded.
func NewUnbounded() *Unbounded {
return &Unbounded{c: make(chan interface{}, 1)}
}

// Put adds t to the unbounded buffer.
func (b *Unbounded) Put(t interface{}) {
b.mu.Lock()
if len(b.backlog) == 0 {
select {
case b.c <- t:
b.mu.Unlock()
return
default:
}
}
b.backlog = append(b.backlog, t)
b.mu.Unlock()
}

// Load sends the earliest buffered data, if any, onto the read channel
// returned by Get(). Users are expected to call this every time they read a
// value from the read channel.
func (b *Unbounded) Load() {
b.mu.Lock()
if len(b.backlog) > 0 {
select {
case b.c <- b.backlog[0]:
b.backlog[0] = nil
b.backlog = b.backlog[1:]
default:
}
}
b.mu.Unlock()
}

// Get returns a read channel on which values added to the buffer, via Put(),
// are sent on.
//
// Upon reading a value from this channel, users are expected to call Load() to
// send the next buffered value onto the channel if there is any.
func (b *Unbounded) Get() <-chan interface{} {
return b.c
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the buffer ordered? If Put(A) happens before Put(C), is Get guaranteed to return A before C? If so, consider this series of events where goroutines 1 and 2 are executing concurrently.

Goroutine 1:

1.1. Put(A)
1.2. Put(B)
1.3. Get()
1.4. Load()

Goroutine 2:

2.1. Put(C)

Between 1.3 and 1.4, let's say 2.1 happens. Since <- b.Get has completed, the chan is empty, so 2.1 puts C into the chan. Now the channel has C, but the backlog has B, so C would be returned before B the next time Get() is called even though B was inserted before C. (1.4's execution doesn't matter anymore because Load's select executes the empty default and simply returns.)

If this isn't ordered, ignore everything I said :D (but maybe add a note on the Get saying that this may return values out of order, though?)

P.S. I know that this is just code being moved, but thought it'd be interesting to mention.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get doesn't actually return values. It simply returns the channel and it is the caller's responsibility to read from that channel. So, in your example when 1.3 returns, it does not mean that the underlying channel in the buffer implementation has been drained. It just means that goroutine-1 has a channel from which it can read the same two values that it pushed.

Copy link
Contributor

@adtac adtac Oct 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that, but to clarify, I meant x := <-b.Get() when I said "1.3. Get()":

barrier := make(chan bool)

go func() {
  b.Put(1)
  b.Put(2)
  v1 := <-b.Get()
  barrier<-true
  b.Load()
  v2 := <-b.Get()
  b.Load()
}()

go func() {
  <-barrier // used to ensure that b.Put(3) strictly happens after b.Put(2)
  b.Put(3)
  v3 := <-b.Get()
  b.Load()
}()

The question is: is v2 guaranteed to be 2?

I think v2 = 3, v3 = 2 is possible if b.Put(3) happens before the post-barrier b.Load(), which breaks ordering, no? Because 2 was inserted before 3.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right with your last statement, but I don't think that breaks ordering. This buffer has the same behavior as a buffered channel. In your example, if you used a vanilla channel with a buffer of 3, you can still run into the same issue that you have mentioned here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. I assumed that since Put is non-blocking (unlike a vanilla channel), a user might expect the result to be in the order that Puts returned (like a vanilla channel). That is, in a vanilla channel of capacity 2, if ch <- x completes before ch <- y, then receiving from the channel guarantees that x will be returned before y. This buffer, however, does not guarantee it, correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that I impose no restriction on which channel send completes first (that's entirely random); just that if x completes first, then x will be returned first when receiving from the channel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coming back to your original question:

The question is: is v2 guaranteed to be 2?

If you can guarantee that the assignment to v2 happens before the assignment to v3, then we can guarantee that v2 will be equal to 2.

If you look at it from the point of view of reading from the channel returned from Get, it still maintains order. Now, if you use the returned channel in different goroutines, the order in which you see values will be the order in which the reads execute.

Maybe I'm missing your point.

}
111 changes: 111 additions & 0 deletions internal/buffer/unbounded_test.go
@@ -0,0 +1,111 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package buffer

import (
"reflect"
"sort"
"sync"
"testing"
)

const (
numWriters = 10
numWrites = 10
)

// wantReads contains the set of values expected to be read by the reader
// goroutine in the tests.
var wantReads []int

func init() {
for i := 0; i < numWriters; i++ {
for j := 0; j < numWrites; j++ {
wantReads = append(wantReads, i)
}
}
}

// TestSingleWriter starts one reader and one writer goroutine and makes sure
// that the reader gets all the value added to the buffer by the writer.
func TestSingleWriter(t *testing.T) {
ub := NewUnbounded()
reads := []int{}

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
ch := ub.Get()
for i := 0; i < numWriters*numWrites; i++ {
r := <-ch
reads = append(reads, r.(int))
ub.Load()
}
}()

wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < numWriters; i++ {
for j := 0; j < numWrites; j++ {
ub.Put(i)
}
}
}()

wg.Wait()
if !reflect.DeepEqual(reads, wantReads) {
t.Errorf("reads: %#v, wantReads: %#v", reads, wantReads)
}
}

// TestMultipleWriters starts multiple writers and one reader goroutine and
// makes sure that the reader gets all the data written by all writers.
func TestMultipleWriters(t *testing.T) {
ub := NewUnbounded()
reads := []int{}

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
ch := ub.Get()
for i := 0; i < numWriters*numWrites; i++ {
r := <-ch
reads = append(reads, r.(int))
ub.Load()
}
}()

wg.Add(numWriters)
for i := 0; i < numWriters; i++ {
go func(index int) {
defer wg.Done()
for j := 0; j < numWrites; j++ {
ub.Put(index)
}
}(i)
}

wg.Wait()
sort.Ints(reads)
if !reflect.DeepEqual(reads, wantReads) {
t.Errorf("reads: %#v, wantReads: %#v", reads, wantReads)
}
}