From e4e2ad70a81a4225d431cf9e82dc856d9b50e1c3 Mon Sep 17 00:00:00 2001 From: aman bansal Date: Fri, 24 Sep 2021 20:39:44 +0530 Subject: [PATCH] fix: avoid deadlock in publisher and subscriber (#1749) (#1751) * fix: avoid deadlock in publisher and subscriber --- db.go | 24 +++++++++++++++---- publisher.go | 21 +++++++++++++---- publisher_test.go | 60 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 95 insertions(+), 10 deletions(-) diff --git a/db.go b/db.go index 7bedd1dec..85fdb6ee8 100644 --- a/db.go +++ b/db.go @@ -1869,11 +1869,11 @@ func (db *DB) Subscribe(ctx context.Context, cb func(kv *KVList) error, matches } c := z.NewCloser(1) - recvCh, id := db.pub.newSubscriber(c, matches) + s := db.pub.newSubscriber(c, matches) slurp := func(batch *pb.KVList) error { for { select { - case kvs := <-recvCh: + case kvs := <-s.sendCh: batch.Kv = append(batch.Kv, kvs.Kv...) default: if len(batch.GetKv()) > 0 { @@ -1883,6 +1883,16 @@ func (db *DB) Subscribe(ctx context.Context, cb func(kv *KVList) error, matches } } } + + drain := func() { + for { + select { + case <- s.sendCh: + default: + return + } + } + } for { select { case <-c.HasBeenClosed(): @@ -1894,15 +1904,19 @@ func (db *DB) Subscribe(ctx context.Context, cb func(kv *KVList) error, matches return err case <-ctx.Done(): c.Done() - db.pub.deleteSubscriber(id) + atomic.StoreUint64(s.active, 0) + drain() + db.pub.deleteSubscriber(s.id) // Delete the subscriber to avoid further updates. return ctx.Err() - case batch := <-recvCh: + case batch := <-s.sendCh: err := slurp(batch) if err != nil { c.Done() + atomic.StoreUint64(s.active, 0) + drain() // Delete the subscriber if there is an error by the callback. - db.pub.deleteSubscriber(id) + db.pub.deleteSubscriber(s.id) return err } } diff --git a/publisher.go b/publisher.go index 6694433f2..f4c31b2ea 100644 --- a/publisher.go +++ b/publisher.go @@ -18,6 +18,7 @@ package badger import ( "sync" + "sync/atomic" "github.com/dgraph-io/badger/v3/pb" "github.com/dgraph-io/badger/v3/trie" @@ -26,9 +27,13 @@ import ( ) type subscriber struct { + id uint64 matches []pb.Match - sendCh chan<- *pb.KVList + sendCh chan *pb.KVList subCloser *z.Closer + // this will be atomic pointer which will be used to + // track whether the subscriber is active or not + active *uint64 } type publisher struct { @@ -106,26 +111,32 @@ func (p *publisher) publishUpdates(reqs requests) { } for id, kvs := range batchedUpdates { - p.subscribers[id].sendCh <- kvs + if atomic.LoadUint64(p.subscribers[id].active) == 1 { + p.subscribers[id].sendCh <- kvs + } } } -func (p *publisher) newSubscriber(c *z.Closer, matches []pb.Match) (<-chan *pb.KVList, uint64) { +func (p *publisher) newSubscriber(c *z.Closer, matches []pb.Match) subscriber { p.Lock() defer p.Unlock() ch := make(chan *pb.KVList, 1000) id := p.nextID // Increment next ID. p.nextID++ - p.subscribers[id] = subscriber{ + active := uint64(1) + s := subscriber{ + active: &active, + id: id, matches: matches, sendCh: ch, subCloser: c, } + p.subscribers[id] = s for _, m := range matches { p.indexer.AddMatch(m, id) } - return ch, id + return s } // cleanSubscribers stops all the subscribers. Ideally, It should be called while closing DB. diff --git a/publisher_test.go b/publisher_test.go index a573f8b0a..c29622439 100644 --- a/publisher_test.go +++ b/publisher_test.go @@ -18,14 +18,74 @@ package badger import ( "context" "fmt" + "github.com/pkg/errors" "sync" + "sync/atomic" "testing" + "time" "github.com/stretchr/testify/require" "github.com/dgraph-io/badger/v3/pb" ) +// This test will result in deadlock for commits before this. +// Exiting this test gracefully will be the proof that the +// publisher is no longer stuck in deadlock. +func TestPublisherDeadlock(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + var subWg sync.WaitGroup + subWg.Add(1) + + var firstUpdate sync.WaitGroup + firstUpdate.Add(1) + + + var subDone sync.WaitGroup + subDone.Add(1) + go func() { + subWg.Done() + match := pb.Match{Prefix: []byte("ke"), IgnoreBytes: ""} + err := db.Subscribe(context.Background(), func(kvs *pb.KVList) error { + firstUpdate.Done() + time.Sleep(time.Second * 20) + return errors.New("error returned") + }, []pb.Match{match}) + require.Error(t, err, errors.New("error returned")) + subDone.Done() + }() + subWg.Wait() + go func() { + err := db.Update(func(txn *Txn) error { + e := NewEntry([]byte(fmt.Sprintf("key%d", 0)), []byte(fmt.Sprintf("value%d", 0))) + return txn.SetEntry(e) + }) + require.NoError(t, err) + } () + + firstUpdate.Wait() + req := int64(0) + for i := 1; i < 1110; i++ { + time.Sleep(time.Millisecond * 10) + go func(i int) { + err := db.Update(func(txn *Txn) error { + e := NewEntry([]byte(fmt.Sprintf("key%d", i)), []byte(fmt.Sprintf("value%d", i))) + return txn.SetEntry(e) + }) + require.NoError(t, err) + atomic.AddInt64(&req, 1) + }(i) + } + for { + if atomic.LoadInt64(&req) == 1109 { + break + } + time.Sleep(time.Second) + } + subDone.Wait() + }) +} + func TestPublisherOrdering(t *testing.T) { runBadgerTest(t, nil, func(t *testing.T, db *DB) { order := []string{}