Skip to content

Commit

Permalink
fix: avoid deadlock in publisher and subscriber (#1749) (#1751)
Browse files Browse the repository at this point in the history
* fix: avoid deadlock in publisher and subscriber
  • Loading branch information
aman-bansal authored and mangalaman93 committed Feb 14, 2023
1 parent 3fbcb17 commit 6db8e30
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 10 deletions.
24 changes: 19 additions & 5 deletions db.go
Expand Up @@ -1869,11 +1869,11 @@ func (db *DB) Subscribe(ctx context.Context, cb func(kv *KVList) error, matches
}

c := z.NewCloser(1)
recvCh, id := db.pub.newSubscriber(c, matches)
s := db.pub.newSubscriber(c, matches)
slurp := func(batch *pb.KVList) error {
for {
select {
case kvs := <-recvCh:
case kvs := <-s.sendCh:
batch.Kv = append(batch.Kv, kvs.Kv...)
default:
if len(batch.GetKv()) > 0 {
Expand All @@ -1883,6 +1883,16 @@ func (db *DB) Subscribe(ctx context.Context, cb func(kv *KVList) error, matches
}
}
}

drain := func() {
for {
select {
case <- s.sendCh:
default:
return
}
}
}
for {
select {
case <-c.HasBeenClosed():
Expand All @@ -1894,15 +1904,19 @@ func (db *DB) Subscribe(ctx context.Context, cb func(kv *KVList) error, matches
return err
case <-ctx.Done():
c.Done()
db.pub.deleteSubscriber(id)
atomic.StoreUint64(s.active, 0)
drain()
db.pub.deleteSubscriber(s.id)
// Delete the subscriber to avoid further updates.
return ctx.Err()
case batch := <-recvCh:
case batch := <-s.sendCh:
err := slurp(batch)
if err != nil {
c.Done()
atomic.StoreUint64(s.active, 0)
drain()
// Delete the subscriber if there is an error by the callback.
db.pub.deleteSubscriber(id)
db.pub.deleteSubscriber(s.id)
return err
}
}
Expand Down
21 changes: 16 additions & 5 deletions publisher.go
Expand Up @@ -18,6 +18,7 @@ package badger

import (
"sync"
"sync/atomic"

"github.com/dgraph-io/badger/v3/pb"
"github.com/dgraph-io/badger/v3/trie"
Expand All @@ -26,9 +27,13 @@ import (
)

type subscriber struct {
id uint64
matches []pb.Match
sendCh chan<- *pb.KVList
sendCh chan *pb.KVList
subCloser *z.Closer
// this will be atomic pointer which will be used to
// track whether the subscriber is active or not
active *uint64
}

type publisher struct {
Expand Down Expand Up @@ -106,26 +111,32 @@ func (p *publisher) publishUpdates(reqs requests) {
}

for id, kvs := range batchedUpdates {
p.subscribers[id].sendCh <- kvs
if atomic.LoadUint64(p.subscribers[id].active) == 1 {
p.subscribers[id].sendCh <- kvs
}
}
}

func (p *publisher) newSubscriber(c *z.Closer, matches []pb.Match) (<-chan *pb.KVList, uint64) {
func (p *publisher) newSubscriber(c *z.Closer, matches []pb.Match) subscriber {
p.Lock()
defer p.Unlock()
ch := make(chan *pb.KVList, 1000)
id := p.nextID
// Increment next ID.
p.nextID++
p.subscribers[id] = subscriber{
active := uint64(1)
s := subscriber{
active: &active,
id: id,
matches: matches,
sendCh: ch,
subCloser: c,
}
p.subscribers[id] = s
for _, m := range matches {
p.indexer.AddMatch(m, id)
}
return ch, id
return s
}

// cleanSubscribers stops all the subscribers. Ideally, It should be called while closing DB.
Expand Down
60 changes: 60 additions & 0 deletions publisher_test.go
Expand Up @@ -18,14 +18,74 @@ package badger
import (
"context"
"fmt"
"github.com/pkg/errors"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/dgraph-io/badger/v3/pb"
)

// This test will result in deadlock for commits before this.
// Exiting this test gracefully will be the proof that the
// publisher is no longer stuck in deadlock.
func TestPublisherDeadlock(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
var subWg sync.WaitGroup
subWg.Add(1)

var firstUpdate sync.WaitGroup
firstUpdate.Add(1)


var subDone sync.WaitGroup
subDone.Add(1)
go func() {
subWg.Done()
match := pb.Match{Prefix: []byte("ke"), IgnoreBytes: ""}
err := db.Subscribe(context.Background(), func(kvs *pb.KVList) error {
firstUpdate.Done()
time.Sleep(time.Second * 20)
return errors.New("error returned")
}, []pb.Match{match})
require.Error(t, err, errors.New("error returned"))
subDone.Done()
}()
subWg.Wait()
go func() {
err := db.Update(func(txn *Txn) error {
e := NewEntry([]byte(fmt.Sprintf("key%d", 0)), []byte(fmt.Sprintf("value%d", 0)))
return txn.SetEntry(e)
})
require.NoError(t, err)
} ()

firstUpdate.Wait()
req := int64(0)
for i := 1; i < 1110; i++ {
time.Sleep(time.Millisecond * 10)
go func(i int) {
err := db.Update(func(txn *Txn) error {
e := NewEntry([]byte(fmt.Sprintf("key%d", i)), []byte(fmt.Sprintf("value%d", i)))
return txn.SetEntry(e)
})
require.NoError(t, err)
atomic.AddInt64(&req, 1)
}(i)
}
for {
if atomic.LoadInt64(&req) == 1109 {
break
}
time.Sleep(time.Second)
}
subDone.Wait()
})
}

func TestPublisherOrdering(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
order := []string{}
Expand Down

0 comments on commit 6db8e30

Please sign in to comment.