From 8293988499abfb1db4086d6795a62296cea72bed Mon Sep 17 00:00:00 2001 From: bosima Date: Thu, 28 Apr 2022 17:55:06 +0800 Subject: [PATCH] fix: consume and publish blocked after rabbitmq reconnecting (#2492) * Support direct generation of grpc method when package and service names of proto files are different. * fix req.Interface() return nil. * Get rid of dependence on 'Micro-Topic' * Revert "Get rid of dependence on 'Micro-Topic'" This reverts commit 3ff69443364d39f5fda3a32fc2249826e2d207dd. * Revert "fix req.Interface() return nil." This reverts commit 90a1b34195e07772fa6f2074e1cf22237ac2a87f. * Revert "Revert "fix req.Interface() return nil."" This reverts commit e64737b7da8d1767c4456881f6730f1c196cea60. * Revert "Revert "Get rid of dependence on 'Micro-Topic'"" This reverts commit 141bb0a557c81cb6d1c651b085b3e65483d5e681. * fix: consume and publish blocked after reconnecting Co-authored-by: maxinglun --- plugins/broker/rabbitmq/connection.go | 64 ++++++++++++--------------- plugins/broker/rabbitmq/rabbitmq.go | 4 ++ 2 files changed, 33 insertions(+), 35 deletions(-) diff --git a/plugins/broker/rabbitmq/connection.go b/plugins/broker/rabbitmq/connection.go index 9adca67dd0..c2724528c6 100644 --- a/plugins/broker/rabbitmq/connection.go +++ b/plugins/broker/rabbitmq/connection.go @@ -11,8 +11,8 @@ import ( "sync" "time" - "go-micro.dev/v4/logger" "github.com/streadway/amqp" + "go-micro.dev/v4/logger" ) var ( @@ -129,42 +129,36 @@ func (r *rabbitMQConn) reconnect(secure bool, config *amqp.Config) { chanNotifyClose := make(chan *amqp.Error) channel := r.ExchangeChannel.channel channel.NotifyClose(chanNotifyClose) - channelNotifyReturn := make(chan amqp.Return) - channel.NotifyReturn(channelNotifyReturn) - - // block until closed - select { - case result, ok := <-channelNotifyReturn: - if !ok { - // Channel closed, probably also the channel or connection. + + // To avoid deadlocks it is necessary to consume the messages from all channels. + for notifyClose != nil || chanNotifyClose != nil { + // block until closed + select { + case err := <-chanNotifyClose: + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Error(err) + } + // block all resubscribe attempt - they are useless because there is no connection to rabbitmq + // create channel 'waitConnection' (at this point channel is nil or closed, create it without unnecessary checks) + r.Lock() + r.connected = false + r.waitConnection = make(chan struct{}) + r.Unlock() + chanNotifyClose = nil + case err := <-notifyClose: + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Error(err) + } + // block all resubscribe attempt - they are useless because there is no connection to rabbitmq + // create channel 'waitConnection' (at this point channel is nil or closed, create it without unnecessary checks) + r.Lock() + r.connected = false + r.waitConnection = make(chan struct{}) + r.Unlock() + notifyClose = nil + case <-r.close: return } - // Do what you need with messageFailing. - if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Errorf("notify error reason: %s, description: %s", result.ReplyText, result.Exchange) - } - case err := <-chanNotifyClose: - if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Error(err) - } - // block all resubscribe attempt - they are useless because there is no connection to rabbitmq - // create channel 'waitConnection' (at this point channel is nil or closed, create it without unnecessary checks) - r.Lock() - r.connected = false - r.waitConnection = make(chan struct{}) - r.Unlock() - case err := <-notifyClose: - if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Error(err) - } - // block all resubscribe attempt - they are useless because there is no connection to rabbitmq - // create channel 'waitConnection' (at this point channel is nil or closed, create it without unnecessary checks) - r.Lock() - r.connected = false - r.waitConnection = make(chan struct{}) - r.Unlock() - case <-r.close: - return } } } diff --git a/plugins/broker/rabbitmq/rabbitmq.go b/plugins/broker/rabbitmq/rabbitmq.go index 9903d13dc8..e2b7c7f91f 100644 --- a/plugins/broker/rabbitmq/rabbitmq.go +++ b/plugins/broker/rabbitmq/rabbitmq.go @@ -102,6 +102,10 @@ func (s *subscriber) resubscribe() { return //wait until we reconect to rabbit case <-s.r.conn.waitConnection: + // When the connection is disconnected, the waitConnection will be re-assigned, so '<-s.r.conn.waitConnection' maybe blocked. + // Here, it returns once a second, and then the latest waitconnection will be used + case <-time.After(time.Second): + continue } // it may crash (panic) in case of Consume without connection, so recheck it