Skip to content

Commit

Permalink
fix: consume and publish blocked after rabbitmq reconnecting (#2492)
Browse files Browse the repository at this point in the history
* Support direct generation of grpc method when package and service names of proto files are different.

* fix req.Interface() return nil.

* Get rid of dependence on 'Micro-Topic'

* Revert "Get rid of dependence on 'Micro-Topic'"

This reverts commit 3ff6944.

* Revert "fix req.Interface() return nil."

This reverts commit 90a1b34.

* Revert "Revert "fix req.Interface() return nil.""

This reverts commit e64737b.

* Revert "Revert "Get rid of dependence on 'Micro-Topic'""

This reverts commit 141bb0a.

* fix: consume and publish blocked after reconnecting

Co-authored-by: maxinglun <maxinglun@zhijiaxing.net>
  • Loading branch information
bosima and maxinglun committed Apr 28, 2022
1 parent 3677719 commit 8293988
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 35 deletions.
64 changes: 29 additions & 35 deletions plugins/broker/rabbitmq/connection.go
Expand Up @@ -11,8 +11,8 @@ import (
"sync"
"time"

"go-micro.dev/v4/logger"
"github.com/streadway/amqp"
"go-micro.dev/v4/logger"
)

var (
Expand Down Expand Up @@ -129,42 +129,36 @@ func (r *rabbitMQConn) reconnect(secure bool, config *amqp.Config) {
chanNotifyClose := make(chan *amqp.Error)
channel := r.ExchangeChannel.channel
channel.NotifyClose(chanNotifyClose)
channelNotifyReturn := make(chan amqp.Return)
channel.NotifyReturn(channelNotifyReturn)

// block until closed
select {
case result, ok := <-channelNotifyReturn:
if !ok {
// Channel closed, probably also the channel or connection.

// To avoid deadlocks it is necessary to consume the messages from all channels.
for notifyClose != nil || chanNotifyClose != nil {
// block until closed
select {
case err := <-chanNotifyClose:
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error(err)
}
// block all resubscribe attempt - they are useless because there is no connection to rabbitmq
// create channel 'waitConnection' (at this point channel is nil or closed, create it without unnecessary checks)
r.Lock()
r.connected = false
r.waitConnection = make(chan struct{})
r.Unlock()
chanNotifyClose = nil
case err := <-notifyClose:
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error(err)
}
// block all resubscribe attempt - they are useless because there is no connection to rabbitmq
// create channel 'waitConnection' (at this point channel is nil or closed, create it without unnecessary checks)
r.Lock()
r.connected = false
r.waitConnection = make(chan struct{})
r.Unlock()
notifyClose = nil
case <-r.close:
return
}
// Do what you need with messageFailing.
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("notify error reason: %s, description: %s", result.ReplyText, result.Exchange)
}
case err := <-chanNotifyClose:
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error(err)
}
// block all resubscribe attempt - they are useless because there is no connection to rabbitmq
// create channel 'waitConnection' (at this point channel is nil or closed, create it without unnecessary checks)
r.Lock()
r.connected = false
r.waitConnection = make(chan struct{})
r.Unlock()
case err := <-notifyClose:
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error(err)
}
// block all resubscribe attempt - they are useless because there is no connection to rabbitmq
// create channel 'waitConnection' (at this point channel is nil or closed, create it without unnecessary checks)
r.Lock()
r.connected = false
r.waitConnection = make(chan struct{})
r.Unlock()
case <-r.close:
return
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions plugins/broker/rabbitmq/rabbitmq.go
Expand Up @@ -102,6 +102,10 @@ func (s *subscriber) resubscribe() {
return
//wait until we reconect to rabbit
case <-s.r.conn.waitConnection:
// When the connection is disconnected, the waitConnection will be re-assigned, so '<-s.r.conn.waitConnection' maybe blocked.
// Here, it returns once a second, and then the latest waitconnection will be used
case <-time.After(time.Second):
continue
}

// it may crash (panic) in case of Consume without connection, so recheck it
Expand Down

0 comments on commit 8293988

Please sign in to comment.