Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid deadlock in publisher and subscriber #1749

Merged
merged 3 commits into from Sep 24, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 19 additions & 5 deletions db.go
Expand Up @@ -2095,11 +2095,11 @@ func (db *DB) Subscribe(ctx context.Context, cb func(kv *KVList) error, matches
}

c := z.NewCloser(1)
recvCh, id := db.pub.newSubscriber(c, matches)
s := db.pub.newSubscriber(c, matches)
slurp := func(batch *pb.KVList) error {
for {
select {
case kvs := <-recvCh:
case kvs := <-s.sendCh:
batch.Kv = append(batch.Kv, kvs.Kv...)
default:
if len(batch.GetKv()) > 0 {
Expand All @@ -2109,6 +2109,16 @@ func (db *DB) Subscribe(ctx context.Context, cb func(kv *KVList) error, matches
}
}
}

drain := func() {
for {
select {
case <- s.sendCh:
default:
return
}
}
}
for {
select {
case <-c.HasBeenClosed():
Expand All @@ -2120,15 +2130,19 @@ func (db *DB) Subscribe(ctx context.Context, cb func(kv *KVList) error, matches
return err
case <-ctx.Done():
c.Done()
db.pub.deleteSubscriber(id)
atomic.StoreUint64(s.active, 0)
drain()
db.pub.deleteSubscriber(s.id)
// Delete the subscriber to avoid further updates.
return ctx.Err()
case batch := <-recvCh:
case batch := <-s.sendCh:
err := slurp(batch)
if err != nil {
c.Done()
atomic.StoreUint64(s.active, 0)
drain()
// Delete the subscriber if there is an error by the callback.
db.pub.deleteSubscriber(id)
db.pub.deleteSubscriber(s.id)
return err
}
}
Expand Down
21 changes: 16 additions & 5 deletions publisher.go
Expand Up @@ -18,6 +18,7 @@ package badger

import (
"sync"
"sync/atomic"

"github.com/dgraph-io/badger/v3/pb"
"github.com/dgraph-io/badger/v3/trie"
Expand All @@ -26,9 +27,13 @@ import (
)

type subscriber struct {
id uint64
matches []pb.Match
sendCh chan<- *pb.KVList
sendCh chan *pb.KVList
subCloser *z.Closer
// this will be atomic pointer which will be used to
// track whether the subscriber is active or not
active *uint64
}

type publisher struct {
Expand Down Expand Up @@ -106,26 +111,32 @@ func (p *publisher) publishUpdates(reqs requests) {
}

for id, kvs := range batchedUpdates {
p.subscribers[id].sendCh <- kvs
if atomic.LoadUint64(p.subscribers[id].active) == 1 {
p.subscribers[id].sendCh <- kvs
}
}
}

func (p *publisher) newSubscriber(c *z.Closer, matches []pb.Match) (<-chan *pb.KVList, uint64) {
func (p *publisher) newSubscriber(c *z.Closer, matches []pb.Match) subscriber {
p.Lock()
defer p.Unlock()
ch := make(chan *pb.KVList, 1000)
id := p.nextID
// Increment next ID.
p.nextID++
p.subscribers[id] = subscriber{
active := uint64(1)
s := subscriber{
active: &active,
id: id,
matches: matches,
sendCh: ch,
subCloser: c,
}
p.subscribers[id] = s
for _, m := range matches {
p.indexer.AddMatch(m, id)
}
return ch, id
return s
}

// cleanSubscribers stops all the subscribers. Ideally, It should be called while closing DB.
Expand Down
55 changes: 55 additions & 0 deletions publisher_test.go
Expand Up @@ -18,14 +18,69 @@ package badger
import (
"context"
"fmt"
"github.com/pkg/errors"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/dgraph-io/badger/v3/pb"
)

// This test will result in deadlock for commits before this.
// Exiting this test gracefully will be the proof that the
// publisher is no longer stuck in deadlock.
func TestPublisherDeadlock(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
var subWg sync.WaitGroup
subWg.Add(1)

var firstUpdate sync.WaitGroup
firstUpdate.Add(1)


var subDone sync.WaitGroup
subDone.Add(1)
go func() {
subWg.Done()
match := pb.Match{Prefix: []byte("ke"), IgnoreBytes: ""}
db.Subscribe(context.Background(), func(kvs *pb.KVList) error {
aman-bansal marked this conversation as resolved.
Show resolved Hide resolved
firstUpdate.Done()
time.Sleep(time.Second * 20)
aman-bansal marked this conversation as resolved.
Show resolved Hide resolved
return errors.New("error returned")
}, []pb.Match{match})
subDone.Done()
}()
subWg.Wait()
go db.Update(func(txn *Txn) error {
e := NewEntry([]byte(fmt.Sprintf("key%d", 0)), []byte(fmt.Sprintf("value%d", 0)))
return txn.SetEntry(e)
})

firstUpdate.Wait()
req := int64(0)
for i := 1; i < 1110; i++ {
time.Sleep(time.Millisecond * 10)
go func(i int) {
db.Update(func(txn *Txn) error {
e := NewEntry([]byte(fmt.Sprintf("key%d", i)), []byte(fmt.Sprintf("value%d", i)))
return txn.SetEntry(e)
})
atomic.AddInt64(&req, 1)
}(i)
}
for {
if atomic.LoadInt64(&req) == 1109 {
break
}
time.Sleep(time.Second)
}
subDone.Wait()
})
}

func TestPublisherOrdering(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
order := []string{}
Expand Down