Skip to content

Commit

Permalink
Fix deadlock on DeferredConfirmations (#47)
Browse files Browse the repository at this point in the history
* 解决连接关闭一直wait问题

* Moved func to different struct, added test & doc

* change test to consider all .Wait() calls

* add integration test

* use default exchange for test

* nonexistent exchange

* more unique nonexistent exchange name

Co-authored-by: shifengbin <imfengbin@foxmail.com>
  • Loading branch information
SpencerTorres and shifengbin committed Mar 17, 2022
1 parent b52110e commit f030069
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 1 deletion.
18 changes: 17 additions & 1 deletion confirms.go
Expand Up @@ -98,11 +98,14 @@ func (c *confirms) Multiple(confirmed Confirmation) {
c.resequence()
}

// Close closes all listeners, discarding any out of sequence confirmations
// Cleans up the confirms struct and its dependencies.
// Closes all listeners, discarding any out of sequence confirmations
func (c *confirms) Close() error {
c.m.Lock()
defer c.m.Unlock()

c.deferredConfirmations.Close()

for _, l := range c.listeners {
close(l)
}
Expand Down Expand Up @@ -158,6 +161,19 @@ func (d *deferredConfirmations) ConfirmMultiple(confirmation Confirmation) {
}
}

// Nacks all pending DeferredConfirmations being blocked by dc.Wait()
func (d *deferredConfirmations) Close() {
d.m.Lock()
defer d.m.Unlock()

for k, v := range d.confirmations {
v.confirmation = Confirmation{DeliveryTag: k, Ack: false}
v.wg.Done()
delete(d.confirmations, k)
}
}

// Waits for publisher confirmation. Returns true if server successfully received the publishing.
func (d *DeferredConfirmation) Wait() bool {
d.wg.Wait()
return d.confirmation.Ack
Expand Down
19 changes: 19 additions & 0 deletions confirms_test.go
Expand Up @@ -216,3 +216,22 @@ func TestDeferredConfirmationsConfirmMultiple(t *testing.T) {
t.Fatal("expected to receive true for result, received false")
}
}

func TestDeferredConfirmationsClose(t *testing.T) {
dcs := newDeferredConfirmations()
var wg sync.WaitGroup
var result bool
dc1 := dcs.Add(1)
dc2 := dcs.Add(2)
dc3 := dcs.Add(3)
wg.Add(1)
go func() {
result = !dc1.Wait() && !dc2.Wait() && !dc3.Wait()
wg.Done()
}()
dcs.Close()
wg.Wait()
if !result {
t.Fatal("expected to receive false for nacked confirmations, received true")
}
}
31 changes: 31 additions & 0 deletions integration_test.go
Expand Up @@ -1810,6 +1810,37 @@ func TestIntegrationGetNextPublishSeqNo(t *testing.T) {
}
}

// https://github.com/rabbitmq/amqp091-go/pull/44
func TestShouldNotWaitAfterConnectionClosedIssue44(t *testing.T) {
conn := integrationConnection(t, "TestShouldNotWaitAfterConnectionClosedIssue44")
ch, err := conn.Channel()
if err != nil {
t.Fatalf("channel error: %v", err)
}
err = ch.Confirm(false)
if err != nil {
t.Fatalf("confirm error: %v", err)
}
closed := conn.NotifyClose(make(chan *Error, 1))
go func() {
<-closed
}()

confirm, err := ch.PublishWithDeferredConfirm("test-issue44", "issue44", false, false, Publishing{Body: []byte("abc")})
if err != nil {
t.Fatalf("PublishWithDeferredConfirm error: %v", err)
}

ch.Close()
conn.Close()

ack := confirm.Wait()

if ack != false {
t.Fatalf("ack returned should be false %v", ack)
}
}

/*
* Support for integration tests
*/
Expand Down

0 comments on commit f030069

Please sign in to comment.