Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deadlock on DeferredConfirmations #47

Merged
merged 7 commits into from Mar 17, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 17 additions & 1 deletion confirms.go
Expand Up @@ -98,11 +98,14 @@ func (c *confirms) Multiple(confirmed Confirmation) {
c.resequence()
}

// Close closes all listeners, discarding any out of sequence confirmations
// Cleans up the confirms struct and its dependencies.
// Closes all listeners, discarding any out of sequence confirmations
func (c *confirms) Close() error {
c.m.Lock()
defer c.m.Unlock()

c.deferredConfirmations.Close()

for _, l := range c.listeners {
close(l)
}
Expand Down Expand Up @@ -158,6 +161,19 @@ func (d *deferredConfirmations) ConfirmMultiple(confirmation Confirmation) {
}
}

// Nacks all pending DeferredConfirmations being blocked by dc.Wait()
func (d *deferredConfirmations) Close() {
d.m.Lock()
defer d.m.Unlock()

for k, v := range d.confirmations {
v.confirmation = Confirmation{DeliveryTag: k, Ack: false}
v.wg.Done()
delete(d.confirmations, k)
}
}

// Waits for publisher confirmation. Returns true if server successfully received the publishing.
func (d *DeferredConfirmation) Wait() bool {
d.wg.Wait()
return d.confirmation.Ack
Expand Down
19 changes: 19 additions & 0 deletions confirms_test.go
Expand Up @@ -216,3 +216,22 @@ func TestDeferredConfirmationsConfirmMultiple(t *testing.T) {
t.Fatal("expected to receive true for result, received false")
}
}

func TestDeferredConfirmationsClose(t *testing.T) {
dcs := newDeferredConfirmations()
var wg sync.WaitGroup
var result bool
dc1 := dcs.Add(1)
dc2 := dcs.Add(2)
dc3 := dcs.Add(3)
wg.Add(1)
go func() {
result = !dc1.Wait() && !dc2.Wait() && !dc3.Wait()
wg.Done()
}()
dcs.Close()
wg.Wait()
if !result {
t.Fatal("expected to receive false for nacked confirmations, received true")
}
}
31 changes: 31 additions & 0 deletions integration_test.go
Expand Up @@ -1810,6 +1810,37 @@ func TestIntegrationGetNextPublishSeqNo(t *testing.T) {
}
}

// https://github.com/rabbitmq/amqp091-go/pull/44
func TestShouldNotWaitAfterConnectionClosedIssue44(t *testing.T) {
conn := integrationConnection(t, "TestShouldNotWaitAfterConnectionClosedIssue44")
ch, err := conn.Channel()
if err != nil {
t.Fatalf("channel error: %v", err)
}
err = ch.Confirm(false)
if err != nil {
t.Fatalf("confirm error: %v", err)
}
closed := conn.NotifyClose(make(chan *Error, 1))
go func() {
<-closed
}()

confirm, err := ch.PublishWithDeferredConfirm("amq.direct", "issue44", false, false, Publishing{Body: []byte("abc")})
if err != nil {
t.Fatalf("PublishWithDeferredConfirm error: %v", err)
}

ch.Close()
conn.Close()

ack := confirm.Wait()

if ack != false {
t.Fatalf("ack returned should be false %v", ack)
}
}

/*
* Support for integration tests
*/
Expand Down