New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Solve the problem that the connection is closed but waiting will always wait #44
Conversation
Thank you for this contribution. I will try to reproduce the problem that this pull request solves. Do you have a way to cause the |
you can use wifi and close it before wait confirm |
before received confirm |
con, err := Dial("xxxxx")
if err != nil {
t.Fatal(err)
}
ch, err := con.Channel()
if err != nil {
t.Fatal(err)
}
err = ch.Confirm(false)
if err != nil {
t.Fatal(err)
}
closed := con.NotifyClose(make(chan *Error, 1))
go func() {
<-closed
t.Log("connection is closed")
}()
//broken point at here or sleep 5 seconds; close wifi
confirm, err := ch.PublishWithDeferredConfirm("xxx", "xxx", false, false, Publishing{
Body: []byte("abc"),
})
if err != nil {
t.Fatal(err)
}
confirm.Wait() |
When programs are processed in parallel, one channel sends normal messages and the other channel sends a non-existent Exchange, also causing the connection to be closed |
can you reproduce the problem with my method? |
Please be patient. The RabbitMQ core team is small and has many different priorities to consider. I might be able to work on this issue next week. |
excuse me! do you forget it? I see some deadlock issues is the same reason we use the package in production hope to resolve the issue |
Just ran into this while testing some edge cases on an app of ours that demands publisher confirmations. I managed to trigger this deadlock by starting the app and then deleting the exchange through RabbitMQ's management UI. Similar to how it's described here, the exchange did not exist and somewhere along the way the client tries to close the connection. The proposed change here is effectively the same conclusion that I and #46 came to; the wait groups need to be cleaned up when the confirmations are closed. This solution returns // module code
func (d *DeferredConfirmation) Wait() bool {
d.wg.Wait()
return d.confirmation.Ack
}
// . . .
// somewhere in your app's publish() logic
if ok := dc.Wait(); !ok {
// Ack was false
} With this expectation in mind for apps that use I made some project-friendly adjustments to be added to this PR here: shifengbin/pull/1 Edit: I just tried my changes from there in my application and it successfully reconnects and ends the wait groups which allows me to catch the error. Now I can continue development. Thanks for reporting this! Hopefully it gets merged so it gets fixed for everyone else. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks good to me. The error happens even when the connection get closed by the client before the confirm wait.
Can you add an integration test like this in order to approve:
// https://github.com/rabbitmq/amqp091-go/pull/44
func TestShouldNotWaitAfterConnectionClosedIssue44(t *testing.T) {
con := integrationConnection(t, "TestShouldNotWaitAfterConnectionClosedIssue44")
ch, err := con.Channel()
if err != nil {
t.Fatalf("channel error: %v", err)
}
err = ch.Confirm(false)
if err != nil {
t.Fatalf("confirm error: %v", err)
}
closed := con.NotifyClose(make(chan *Error, 1))
go func() {
<-closed
}()
//broken point at here or sleep 5 seconds; close wifi
confirm, err :=
ch.PublishWithDeferredConfirm("xxx", "xxx", false, false, Publishing{
Body: []byte("abc"),
})
if err != nil {
t.Fatalf("PublishWithDeferredConfirm error: %v", err)
}
ch.Close()
con.Close()
ack := confirm.Wait()
if ack != false {
t.Fatalf("ack returned should be false %v", ack)
}
}
This PR is superseded by #47 |
steps:
1.connected
2.select confirm
3.publishwithconfirm (network error, connection closed)
4.wait (always wait forever)