From f030069fc8f68db166a41f92ace0d58a8a60387d Mon Sep 17 00:00:00 2001 From: Spencer Torres Date: Thu, 17 Mar 2022 09:38:00 -0400 Subject: [PATCH] Fix deadlock on DeferredConfirmations (#47) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 解决连接关闭一直wait问题 * Moved func to different struct, added test & doc * change test to consider all .Wait() calls * add integration test * use default exchange for test * nonexistent exchange * more unique nonexistent exchange name Co-authored-by: shifengbin --- confirms.go | 18 +++++++++++++++++- confirms_test.go | 19 +++++++++++++++++++ integration_test.go | 31 +++++++++++++++++++++++++++++++ 3 files changed, 67 insertions(+), 1 deletion(-) diff --git a/confirms.go b/confirms.go index 654d755..3eba406 100644 --- a/confirms.go +++ b/confirms.go @@ -98,11 +98,14 @@ func (c *confirms) Multiple(confirmed Confirmation) { c.resequence() } -// Close closes all listeners, discarding any out of sequence confirmations +// Cleans up the confirms struct and its dependencies. +// Closes all listeners, discarding any out of sequence confirmations func (c *confirms) Close() error { c.m.Lock() defer c.m.Unlock() + c.deferredConfirmations.Close() + for _, l := range c.listeners { close(l) } @@ -158,6 +161,19 @@ func (d *deferredConfirmations) ConfirmMultiple(confirmation Confirmation) { } } +// Nacks all pending DeferredConfirmations being blocked by dc.Wait() +func (d *deferredConfirmations) Close() { + d.m.Lock() + defer d.m.Unlock() + + for k, v := range d.confirmations { + v.confirmation = Confirmation{DeliveryTag: k, Ack: false} + v.wg.Done() + delete(d.confirmations, k) + } +} + +// Waits for publisher confirmation. Returns true if server successfully received the publishing. func (d *DeferredConfirmation) Wait() bool { d.wg.Wait() return d.confirmation.Ack diff --git a/confirms_test.go b/confirms_test.go index ea8acdb..5539bf3 100644 --- a/confirms_test.go +++ b/confirms_test.go @@ -216,3 +216,22 @@ func TestDeferredConfirmationsConfirmMultiple(t *testing.T) { t.Fatal("expected to receive true for result, received false") } } + +func TestDeferredConfirmationsClose(t *testing.T) { + dcs := newDeferredConfirmations() + var wg sync.WaitGroup + var result bool + dc1 := dcs.Add(1) + dc2 := dcs.Add(2) + dc3 := dcs.Add(3) + wg.Add(1) + go func() { + result = !dc1.Wait() && !dc2.Wait() && !dc3.Wait() + wg.Done() + }() + dcs.Close() + wg.Wait() + if !result { + t.Fatal("expected to receive false for nacked confirmations, received true") + } +} diff --git a/integration_test.go b/integration_test.go index e7617e6..2fcdd11 100644 --- a/integration_test.go +++ b/integration_test.go @@ -1810,6 +1810,37 @@ func TestIntegrationGetNextPublishSeqNo(t *testing.T) { } } +// https://github.com/rabbitmq/amqp091-go/pull/44 +func TestShouldNotWaitAfterConnectionClosedIssue44(t *testing.T) { + conn := integrationConnection(t, "TestShouldNotWaitAfterConnectionClosedIssue44") + ch, err := conn.Channel() + if err != nil { + t.Fatalf("channel error: %v", err) + } + err = ch.Confirm(false) + if err != nil { + t.Fatalf("confirm error: %v", err) + } + closed := conn.NotifyClose(make(chan *Error, 1)) + go func() { + <-closed + }() + + confirm, err := ch.PublishWithDeferredConfirm("test-issue44", "issue44", false, false, Publishing{Body: []byte("abc")}) + if err != nil { + t.Fatalf("PublishWithDeferredConfirm error: %v", err) + } + + ch.Close() + conn.Close() + + ack := confirm.Wait() + + if ack != false { + t.Fatalf("ack returned should be false %v", ack) + } +} + /* * Support for integration tests */