New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(Publisher): Add DB.SubscribeAsync API. #1834
Open
rigelbm
wants to merge
1
commit into
dgraph-io:main
Choose a base branch
from
rigelbm:subscribe-async
base: main
Could not load branches
Branch not found: {{ refName }}
Could not load tags
Nothing to show
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
rigelbm
requested review from
akon-dey,
gajanan-dgraph,
billprovince,
joshua-goldstein and
skrdgraph
as code owners
December 4, 2022 20:07
@akon-dey @joshua-goldstein Gentle ping. Is this something you are interested in? |
The Problem: In one of my personal projects, I have an API that uses `DB.Subscribe` to susbcribe to changes to the DB and add these changes to an unbounded queue. An over-simplified version of it would be: ``` func (x *X) Watch() { go func() { _ = x.db.Subscribe( context.Background(), func(kvs *pb.KVList) error { x.queue.Add(kvs) return nil }, []pb.Match{{Prefix: []byte{"foobar"}}}) }() } ``` The way I test it, in psudo-Go, is: ``` func TestWatch() { x := ... x.Watch() doChangesToDb(x.db) verifyQueue(x.queue) } ``` The problem, as I hope you can see, is a race condition. There's no guarantee I have actually subscribed before I exit `x.Watch()`. By the time I call `doChangesToDb(x.db)`, depending on the timing of the goroutine in `x.Watch()`, I might miss some or even all changes. Because `DB.Subscribe` is blocking, there's no way to know for certain that you have actually subscribed, in case you need to know. The only guaranteed way is to wait for the first cb call, but that's not always convenient or even possible. The next best workaround is to wait for the moment just before the `DB.Subscribe` call: ``` func (x *X) Watch() { wg := sync.WaitGroup{} wg.Add(1) go func() { wg.Done() _ = x.db.Subscribe( context.Background(), func(kvs *pb.KVList) error { x.queue.Add(kvs) return nil }, []pb.Match{{Prefix: []byte{"foobar"}}}) }() wg.Wait() } ``` This workaround can be seen used extensively on `publisher_test.go`. The problem with it is that, although very likely to work, it is not guaranteed. You see, Golang reserves the right to preempt any goroutine, even if they aren't blocked. The Go scheduler will mark any goroutine that takes more than 10ms as preemptible. If the time between the `wg.Done()` call and the `db.pub.newSubscriber(c, matches)` call (inside `DB.Subscribe`) is just long enough, the goroutine might be preempted and you will end up with the same problem as before. Who knows. Maybe GC kicked in at the wrong time. Although this is very unlikely to happen, I would sleep much better if it were actually impossible (I wish to depend on this behaviour not only for the tests, but for the actual correctness of my project). The Solution: I hope it became clear that the problem is caused by the API being blocking. The solution then, is to add a non-blocking version of the API. The proposed API receives only the `[]pb.Match` query, and returns a `<-chan *KVList` channel and a `UnsubscribeFunc` function. The channel is to be used by consumers to read the changes, while the function is how you cancel the operation. I believe this API to be much more idiomatic Go, as it uses channels for communication, making it possible for the caller to `select` and `for range` on it. You can see how much simpler the calling code becomes in the new `publisher_test.go`, where I add a new version of each test using the new API, while keeping the old tests intact. I have also rewritten the original `DB.Subscribe` to use the new `DB.SubscribeAsync` underneath, so as to reuse code, and make both behaviours are the same. This is my first PR to badger. Please, be kind :). Also, thank you for the awesome project and for any time spent reviewing this PR. You folks rock!
rigelbm
force-pushed
the
subscribe-async
branch
from
March 28, 2023 16:13
a433e0c
to
ec5f31e
Compare
Updated PR to latest |
Thanks for the PR, I will take a look at the change. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Problem
In one of my personal projects, I have an API that uses
DB.Subscribe
to susbcribe to changes to the DB and add these changes to an unbounded queue. An over-simplified version of it would be:The way I test it, in psudo-Go, is:
The problem, as I hope you can see, is a race condition. There's no guarantee I have actually subscribed before I exit
x.Watch()
. By the time I calldoChangesToDb(x.db)
, depending on the timing of the goroutine inx.Watch()
, I might miss some or even all changes. BecauseDB.Subscribe
is blocking, there's no way to know for certain that you have actually subscribed, in case you need to know. The only guaranteed way is to wait for the first cb call, but that's not always convenient or even possible. The next best workaround is to wait for the moment just before theDB.Subscribe
call:This workaround can be seen used extensively on
publisher_test.go
. The problem with it is that, although very likely to work, it is not guaranteed. You see, Golang reserves the right to preempt any goroutine, even if they aren't blocked. The Go scheduler will mark any goroutine that takes more than 10ms as preemptible. If the time between thewg.Done()
call and thedb.pub.newSubscriber(c, matches)
call (insideDB.Subscribe
) is just long enough, the goroutine might be preempted and you will end up with the same problem as before. Who knows. Maybe GC kicked in at the wrong time. Although this is very unlikely to happen, I would sleep much better if it were actually impossible (I wish to depend on this behaviour not only for the tests, but for the actual correctness of my project).Solution
I hope it became clear that the problem is caused by the API being blocking. The solution then, is to add a non-blocking version of the API. The proposed API receives only the
[]pb.Match
query, and returns a<-chan *KVList
channel and aUnsubscribeFunc
function. The channel is to be used by consumers to read the changes, while the function is how you cancel the operation. I believe this API to be much more idiomatic Go, as it uses channels for communication, making it possible for the caller toselect
andfor range
on it. You can see how much simpler the calling code becomes in the newpublisher_test.go
, where I add a new version of each test using the new API, while keeping the old tests intact.I have also rewritten the original
DB.Subscribe
to use the newDB.SubscribeAsync
underneath, so as to reuse code, and make both behaviours are the same.This is my first PR to badger. Please, be kind :). Also, thank you for the awesome project and for any time spent reviewing this PR. You folks rock!