Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid deadlock in publisher and subscriber (#1749) #1751

Merged
merged 1 commit into from Sep 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 19 additions & 5 deletions db.go
Expand Up @@ -1869,11 +1869,11 @@ func (db *DB) Subscribe(ctx context.Context, cb func(kv *KVList) error, matches
}

c := z.NewCloser(1)
recvCh, id := db.pub.newSubscriber(c, matches)
s := db.pub.newSubscriber(c, matches)
slurp := func(batch *pb.KVList) error {
for {
select {
case kvs := <-recvCh:
case kvs := <-s.sendCh:
batch.Kv = append(batch.Kv, kvs.Kv...)
default:
if len(batch.GetKv()) > 0 {
Expand All @@ -1883,6 +1883,16 @@ func (db *DB) Subscribe(ctx context.Context, cb func(kv *KVList) error, matches
}
}
}

drain := func() {
for {
select {
case <- s.sendCh:
default:
return
}
}
}
for {
select {
case <-c.HasBeenClosed():
Expand All @@ -1894,15 +1904,19 @@ func (db *DB) Subscribe(ctx context.Context, cb func(kv *KVList) error, matches
return err
case <-ctx.Done():
c.Done()
db.pub.deleteSubscriber(id)
atomic.StoreUint64(s.active, 0)
drain()
db.pub.deleteSubscriber(s.id)
// Delete the subscriber to avoid further updates.
return ctx.Err()
case batch := <-recvCh:
case batch := <-s.sendCh:
err := slurp(batch)
if err != nil {
c.Done()
atomic.StoreUint64(s.active, 0)
drain()
// Delete the subscriber if there is an error by the callback.
db.pub.deleteSubscriber(id)
db.pub.deleteSubscriber(s.id)
return err
}
}
Expand Down
21 changes: 16 additions & 5 deletions publisher.go
Expand Up @@ -18,6 +18,7 @@ package badger

import (
"sync"
"sync/atomic"

"github.com/dgraph-io/badger/v3/pb"
"github.com/dgraph-io/badger/v3/trie"
Expand All @@ -26,9 +27,13 @@ import (
)

type subscriber struct {
id uint64
matches []pb.Match
sendCh chan<- *pb.KVList
sendCh chan *pb.KVList
subCloser *z.Closer
// this will be atomic pointer which will be used to
// track whether the subscriber is active or not
active *uint64
}

type publisher struct {
Expand Down Expand Up @@ -106,26 +111,32 @@ func (p *publisher) publishUpdates(reqs requests) {
}

for id, kvs := range batchedUpdates {
p.subscribers[id].sendCh <- kvs
if atomic.LoadUint64(p.subscribers[id].active) == 1 {
p.subscribers[id].sendCh <- kvs
}
}
}

func (p *publisher) newSubscriber(c *z.Closer, matches []pb.Match) (<-chan *pb.KVList, uint64) {
func (p *publisher) newSubscriber(c *z.Closer, matches []pb.Match) subscriber {
p.Lock()
defer p.Unlock()
ch := make(chan *pb.KVList, 1000)
id := p.nextID
// Increment next ID.
p.nextID++
p.subscribers[id] = subscriber{
active := uint64(1)
s := subscriber{
active: &active,
id: id,
matches: matches,
sendCh: ch,
subCloser: c,
}
p.subscribers[id] = s
for _, m := range matches {
p.indexer.AddMatch(m, id)
}
return ch, id
return s
}

// cleanSubscribers stops all the subscribers. Ideally, It should be called while closing DB.
Expand Down
60 changes: 60 additions & 0 deletions publisher_test.go
Expand Up @@ -18,14 +18,74 @@ package badger
import (
"context"
"fmt"
"github.com/pkg/errors"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/dgraph-io/badger/v3/pb"
)

// This test will result in deadlock for commits before this.
// Exiting this test gracefully will be the proof that the
// publisher is no longer stuck in deadlock.
func TestPublisherDeadlock(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
var subWg sync.WaitGroup
subWg.Add(1)

var firstUpdate sync.WaitGroup
firstUpdate.Add(1)


var subDone sync.WaitGroup
subDone.Add(1)
go func() {
subWg.Done()
match := pb.Match{Prefix: []byte("ke"), IgnoreBytes: ""}
err := db.Subscribe(context.Background(), func(kvs *pb.KVList) error {
firstUpdate.Done()
time.Sleep(time.Second * 20)
return errors.New("error returned")
}, []pb.Match{match})
require.Error(t, err, errors.New("error returned"))
subDone.Done()
}()
subWg.Wait()
go func() {
err := db.Update(func(txn *Txn) error {
e := NewEntry([]byte(fmt.Sprintf("key%d", 0)), []byte(fmt.Sprintf("value%d", 0)))
return txn.SetEntry(e)
})
require.NoError(t, err)
} ()

firstUpdate.Wait()
req := int64(0)
for i := 1; i < 1110; i++ {
time.Sleep(time.Millisecond * 10)
go func(i int) {
err := db.Update(func(txn *Txn) error {
e := NewEntry([]byte(fmt.Sprintf("key%d", i)), []byte(fmt.Sprintf("value%d", i)))
return txn.SetEntry(e)
})
require.NoError(t, err)
atomic.AddInt64(&req, 1)
}(i)
}
for {
if atomic.LoadInt64(&req) == 1109 {
break
}
time.Sleep(time.Second)
}
subDone.Wait()
})
}

func TestPublisherOrdering(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
order := []string{}
Expand Down