From 0d435a690ea21a3f64b0534d1fa244f512601493 Mon Sep 17 00:00:00 2001 From: maxinglun Date: Thu, 7 Apr 2022 20:16:13 +0800 Subject: [PATCH 1/8] Support direct generation of grpc method when package and service names of proto files are different. --- cmd/protoc-gen-micro/plugin/micro/micro.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/cmd/protoc-gen-micro/plugin/micro/micro.go b/cmd/protoc-gen-micro/plugin/micro/micro.go index be7f017706..91c1ab584a 100644 --- a/cmd/protoc-gen-micro/plugin/micro/micro.go +++ b/cmd/protoc-gen-micro/plugin/micro/micro.go @@ -131,7 +131,8 @@ func (g *micro) generateService(file *generator.FileDescriptor, service *pb.Serv origServName := service.GetName() serviceName := strings.ToLower(service.GetName()) - if pkg := file.GetPackage(); pkg != "" { + pkg := file.GetPackage() + if pkg != "" { serviceName = pkg } servName := generator.CamelCase(origServName) @@ -209,7 +210,7 @@ func (g *micro) generateService(file *generator.FileDescriptor, service *pb.Serv descExpr = fmt.Sprintf("&%s.Streams[%d]", serviceDescVar, streamIndex) streamIndex++ } - g.generateClientMethod(serviceName, servName, serviceDescVar, method, descExpr) + g.generateClientMethod(pkg, serviceName, servName, serviceDescVar, method, descExpr) } g.P("// Server API for ", servName, " service") @@ -331,8 +332,12 @@ func (g *micro) generateClientSignature(servName string, method *pb.MethodDescri return fmt.Sprintf("%s(ctx %s.Context%s, opts ...%s.CallOption) (%s, error)", methName, contextPkg, reqArg, clientPkg, respName) } -func (g *micro) generateClientMethod(reqServ, servName, serviceDescVar string, method *pb.MethodDescriptorProto, descExpr string) { +func (g *micro) generateClientMethod(pkg, reqServ, servName, serviceDescVar string, method *pb.MethodDescriptorProto, descExpr string) { reqMethod := fmt.Sprintf("%s.%s", servName, method.GetName()) + useGrpc := g.gen.Param["use_grpc"] + if useGrpc != "" { + reqMethod = fmt.Sprintf("/%s.%s/%s", pkg, servName, method.GetName()) + } methName := generator.CamelCase(method.GetName()) inType := g.typeName(method.GetInputType()) outType := g.typeName(method.GetOutputType()) From 90a1b34195e07772fa6f2074e1cf22237ac2a87f Mon Sep 17 00:00:00 2001 From: maxinglun Date: Sat, 9 Apr 2022 12:17:13 +0800 Subject: [PATCH 2/8] fix req.Interface() return nil. --- server/rpc_router.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/rpc_router.go b/server/rpc_router.go index 7985b9ab6d..bcef5bb3f5 100644 --- a/server/rpc_router.go +++ b/server/rpc_router.go @@ -557,7 +557,7 @@ func (router *router) ProcessMessage(ctx context.Context, msg Message) (err erro } // read the body into the handler request value - if err = cc.ReadBody(req.Interface()); err != nil { + if err = cc.ReadBody(req.Addr().Interface()); err != nil { return err } From 3ff69443364d39f5fda3a32fc2249826e2d207dd Mon Sep 17 00:00:00 2001 From: maxinglun Date: Sat, 9 Apr 2022 21:00:55 +0800 Subject: [PATCH 3/8] Get rid of dependence on 'Micro-Topic' --- plugins/broker/rabbitmq/rabbitmq.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/plugins/broker/rabbitmq/rabbitmq.go b/plugins/broker/rabbitmq/rabbitmq.go index 03244b718d..9903d13dc8 100644 --- a/plugins/broker/rabbitmq/rabbitmq.go +++ b/plugins/broker/rabbitmq/rabbitmq.go @@ -7,9 +7,9 @@ import ( "sync" "time" + "github.com/streadway/amqp" "go-micro.dev/v4/broker" "go-micro.dev/v4/cmd" - "github.com/streadway/amqp" ) type rbroker struct { @@ -267,6 +267,13 @@ func (r *rbroker) Subscribe(topic string, handler broker.Handler, opts ...broker for k, v := range msg.Headers { header[k], _ = v.(string) } + + // Get rid of dependence on 'Micro-Topic' + msgTopic := header["Micro-Topic"] + if msgTopic == "" { + header["Micro-Topic"] = msg.RoutingKey + } + m := &broker.Message{ Header: header, Body: msg.Body, From 141bb0a557c81cb6d1c651b085b3e65483d5e681 Mon Sep 17 00:00:00 2001 From: maxinglun Date: Sun, 10 Apr 2022 10:41:02 +0800 Subject: [PATCH 4/8] Revert "Get rid of dependence on 'Micro-Topic'" This reverts commit 3ff69443364d39f5fda3a32fc2249826e2d207dd. --- plugins/broker/rabbitmq/rabbitmq.go | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/plugins/broker/rabbitmq/rabbitmq.go b/plugins/broker/rabbitmq/rabbitmq.go index 9903d13dc8..03244b718d 100644 --- a/plugins/broker/rabbitmq/rabbitmq.go +++ b/plugins/broker/rabbitmq/rabbitmq.go @@ -7,9 +7,9 @@ import ( "sync" "time" - "github.com/streadway/amqp" "go-micro.dev/v4/broker" "go-micro.dev/v4/cmd" + "github.com/streadway/amqp" ) type rbroker struct { @@ -267,13 +267,6 @@ func (r *rbroker) Subscribe(topic string, handler broker.Handler, opts ...broker for k, v := range msg.Headers { header[k], _ = v.(string) } - - // Get rid of dependence on 'Micro-Topic' - msgTopic := header["Micro-Topic"] - if msgTopic == "" { - header["Micro-Topic"] = msg.RoutingKey - } - m := &broker.Message{ Header: header, Body: msg.Body, From e64737b7da8d1767c4456881f6730f1c196cea60 Mon Sep 17 00:00:00 2001 From: maxinglun Date: Sun, 10 Apr 2022 10:41:14 +0800 Subject: [PATCH 5/8] Revert "fix req.Interface() return nil." This reverts commit 90a1b34195e07772fa6f2074e1cf22237ac2a87f. --- server/rpc_router.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/rpc_router.go b/server/rpc_router.go index bcef5bb3f5..7985b9ab6d 100644 --- a/server/rpc_router.go +++ b/server/rpc_router.go @@ -557,7 +557,7 @@ func (router *router) ProcessMessage(ctx context.Context, msg Message) (err erro } // read the body into the handler request value - if err = cc.ReadBody(req.Addr().Interface()); err != nil { + if err = cc.ReadBody(req.Interface()); err != nil { return err } From 47b322922587c74420aa4c9ca4a009e5d89842ee Mon Sep 17 00:00:00 2001 From: maxinglun Date: Sun, 10 Apr 2022 20:56:39 +0800 Subject: [PATCH 6/8] Revert "Revert "fix req.Interface() return nil."" This reverts commit e64737b7da8d1767c4456881f6730f1c196cea60. --- server/rpc_router.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/rpc_router.go b/server/rpc_router.go index 7985b9ab6d..bcef5bb3f5 100644 --- a/server/rpc_router.go +++ b/server/rpc_router.go @@ -557,7 +557,7 @@ func (router *router) ProcessMessage(ctx context.Context, msg Message) (err erro } // read the body into the handler request value - if err = cc.ReadBody(req.Interface()); err != nil { + if err = cc.ReadBody(req.Addr().Interface()); err != nil { return err } From 0f7c8c32764b94bbc352e9eef6cbfcf2f6bc6cfd Mon Sep 17 00:00:00 2001 From: maxinglun Date: Sun, 10 Apr 2022 21:01:54 +0800 Subject: [PATCH 7/8] Revert "Revert "Get rid of dependence on 'Micro-Topic'"" This reverts commit 141bb0a557c81cb6d1c651b085b3e65483d5e681. --- plugins/broker/rabbitmq/rabbitmq.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/plugins/broker/rabbitmq/rabbitmq.go b/plugins/broker/rabbitmq/rabbitmq.go index 03244b718d..9903d13dc8 100644 --- a/plugins/broker/rabbitmq/rabbitmq.go +++ b/plugins/broker/rabbitmq/rabbitmq.go @@ -7,9 +7,9 @@ import ( "sync" "time" + "github.com/streadway/amqp" "go-micro.dev/v4/broker" "go-micro.dev/v4/cmd" - "github.com/streadway/amqp" ) type rbroker struct { @@ -267,6 +267,13 @@ func (r *rbroker) Subscribe(topic string, handler broker.Handler, opts ...broker for k, v := range msg.Headers { header[k], _ = v.(string) } + + // Get rid of dependence on 'Micro-Topic' + msgTopic := header["Micro-Topic"] + if msgTopic == "" { + header["Micro-Topic"] = msg.RoutingKey + } + m := &broker.Message{ Header: header, Body: msg.Body, From d20db4264a75aee8842ce566f521a0ea04332eeb Mon Sep 17 00:00:00 2001 From: maxinglun Date: Wed, 27 Apr 2022 14:22:31 +0800 Subject: [PATCH 8/8] fix: consume and publish blocked after reconnecting --- plugins/broker/rabbitmq/connection.go | 64 ++++++++++++--------------- plugins/broker/rabbitmq/rabbitmq.go | 4 ++ 2 files changed, 33 insertions(+), 35 deletions(-) diff --git a/plugins/broker/rabbitmq/connection.go b/plugins/broker/rabbitmq/connection.go index 9adca67dd0..c2724528c6 100644 --- a/plugins/broker/rabbitmq/connection.go +++ b/plugins/broker/rabbitmq/connection.go @@ -11,8 +11,8 @@ import ( "sync" "time" - "go-micro.dev/v4/logger" "github.com/streadway/amqp" + "go-micro.dev/v4/logger" ) var ( @@ -129,42 +129,36 @@ func (r *rabbitMQConn) reconnect(secure bool, config *amqp.Config) { chanNotifyClose := make(chan *amqp.Error) channel := r.ExchangeChannel.channel channel.NotifyClose(chanNotifyClose) - channelNotifyReturn := make(chan amqp.Return) - channel.NotifyReturn(channelNotifyReturn) - - // block until closed - select { - case result, ok := <-channelNotifyReturn: - if !ok { - // Channel closed, probably also the channel or connection. + + // To avoid deadlocks it is necessary to consume the messages from all channels. + for notifyClose != nil || chanNotifyClose != nil { + // block until closed + select { + case err := <-chanNotifyClose: + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Error(err) + } + // block all resubscribe attempt - they are useless because there is no connection to rabbitmq + // create channel 'waitConnection' (at this point channel is nil or closed, create it without unnecessary checks) + r.Lock() + r.connected = false + r.waitConnection = make(chan struct{}) + r.Unlock() + chanNotifyClose = nil + case err := <-notifyClose: + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Error(err) + } + // block all resubscribe attempt - they are useless because there is no connection to rabbitmq + // create channel 'waitConnection' (at this point channel is nil or closed, create it without unnecessary checks) + r.Lock() + r.connected = false + r.waitConnection = make(chan struct{}) + r.Unlock() + notifyClose = nil + case <-r.close: return } - // Do what you need with messageFailing. - if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Errorf("notify error reason: %s, description: %s", result.ReplyText, result.Exchange) - } - case err := <-chanNotifyClose: - if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Error(err) - } - // block all resubscribe attempt - they are useless because there is no connection to rabbitmq - // create channel 'waitConnection' (at this point channel is nil or closed, create it without unnecessary checks) - r.Lock() - r.connected = false - r.waitConnection = make(chan struct{}) - r.Unlock() - case err := <-notifyClose: - if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Error(err) - } - // block all resubscribe attempt - they are useless because there is no connection to rabbitmq - // create channel 'waitConnection' (at this point channel is nil or closed, create it without unnecessary checks) - r.Lock() - r.connected = false - r.waitConnection = make(chan struct{}) - r.Unlock() - case <-r.close: - return } } } diff --git a/plugins/broker/rabbitmq/rabbitmq.go b/plugins/broker/rabbitmq/rabbitmq.go index 9903d13dc8..e2b7c7f91f 100644 --- a/plugins/broker/rabbitmq/rabbitmq.go +++ b/plugins/broker/rabbitmq/rabbitmq.go @@ -102,6 +102,10 @@ func (s *subscriber) resubscribe() { return //wait until we reconect to rabbit case <-s.r.conn.waitConnection: + // When the connection is disconnected, the waitConnection will be re-assigned, so '<-s.r.conn.waitConnection' maybe blocked. + // Here, it returns once a second, and then the latest waitconnection will be used + case <-time.After(time.Second): + continue } // it may crash (panic) in case of Consume without connection, so recheck it