diff --git a/go.mod b/go.mod index 0d227b693a..188a64cc36 100644 --- a/go.mod +++ b/go.mod @@ -54,7 +54,7 @@ require ( github.com/dghubble/go-twitter v0.0.0-20220816163853-8a0df96f1e6d github.com/dghubble/oauth1 v0.7.1 github.com/didip/tollbooth v4.0.2+incompatible - github.com/eclipse/paho.mqtt.golang v1.4.1 + github.com/eclipse/paho.mqtt.golang v1.4.2-0.20221018190109-a1800d8df9a4 github.com/fasthttp-contrib/sessions v0.0.0-20160905201309-74f6ac73d5d5 github.com/ghodss/yaml v1.0.0 github.com/go-redis/redis/v8 v8.11.5 @@ -362,8 +362,5 @@ replace github.com/gobwas/pool => github.com/gobwas/pool v0.2.1 replace github.com/toolkits/concurrent => github.com/niean/gotools v0.0.0-20151221085310-ff3f51fc5c60 -// this fork is used due to a feature missing from upstream -replace github.com/eclipse/paho.mqtt.golang => github.com/shivamkm07/paho.mqtt.golang v1.3.6-0.20220106130409-e28a1db639f8 - // this is a fork which addresses a performance issues due to go routines replace dubbo.apache.org/dubbo-go/v3 => dubbo.apache.org/dubbo-go/v3 v3.0.3-0.20220610080020-48691a404537 diff --git a/go.sum b/go.sum index 9c2264ead1..a28741789c 100644 --- a/go.sum +++ b/go.sum @@ -468,6 +468,8 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8 github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/eclipse/paho.mqtt.golang v1.4.2-0.20221018190109-a1800d8df9a4 h1:yJj84YKRTY+zu/s9peWf0kuSq38zKT4KJUaFcJ1uRJM= +github.com/eclipse/paho.mqtt.golang v1.4.2-0.20221018190109-a1800d8df9a4/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA= github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= github.com/emicklei/go-restful/v3 v3.8.0 h1:eCZ8ulSerjdAiaNpF7GxXIE7ZCMo1moN1qX+S609eVw= github.com/emicklei/go-restful/v3 v3.8.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= @@ -1276,8 +1278,6 @@ github.com/shirou/gopsutil v3.20.11+incompatible h1:LJr4ZQK4mPpIV5gOa4jCOKOGb4ty github.com/shirou/gopsutil v3.20.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shirou/gopsutil/v3 v3.21.6 h1:vU7jrp1Ic/2sHB7w6UNs7MIkn7ebVtTb5D9j45o9VYE= github.com/shirou/gopsutil/v3 v3.21.6/go.mod h1:JfVbDpIBLVzT8oKbvMg9P3wEIMDDpVn+LwHTKj0ST88= -github.com/shivamkm07/paho.mqtt.golang v1.3.6-0.20220106130409-e28a1db639f8 h1:BXKXQzeHuVnSrHAKjvq9ICrgPC27tJ/hXWLMQo36c5s= -github.com/shivamkm07/paho.mqtt.golang v1.3.6-0.20220106130409-e28a1db639f8/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sijms/go-ora/v2 v2.5.3 h1:klGKmhqRONVTtIzTdfYTvrW94kdJkdmZl93u2A3vchI= github.com/sijms/go-ora/v2 v2.5.3/go.mod h1:EHxlY6x7y9HAsdfumurRfTd+v8NrEOTR3Xl4FWlH6xk= diff --git a/pubsub/mqtt/mqtt.go b/pubsub/mqtt/mqtt.go index 5f1e1f1801..3e56d0f57f 100644 --- a/pubsub/mqtt/mqtt.go +++ b/pubsub/mqtt/mqtt.go @@ -247,9 +247,6 @@ func (m *mqttPubSub) startSubscription(ctx context.Context) error { // onMessage returns the callback to be invoked when there's a new message from a topic func (m *mqttPubSub) onMessage(ctx context.Context) func(client mqtt.Client, mqttMsg mqtt.Message) { return func(client mqtt.Client, mqttMsg mqtt.Message) { - // Turn off auto-ACK - mqttMsg.AutoAckOff() - ack := false defer func() { // Do not send N/ACKs on retained messages @@ -336,6 +333,8 @@ func (m *mqttPubSub) connect(ctx context.Context, clientID string) (mqtt.Client, return nil, err } opts := m.createClientOptions(uri, clientID) + // Turn off auto-ack + opts.SetAutoAckDisabled(true) client := mqtt.NewClient(opts) // Add all routes before we connect to catch messages that may be delivered before client.Subscribe is invoked diff --git a/tests/certification/pubsub/mqtt/go.mod b/tests/certification/pubsub/mqtt/go.mod index 7624ae2487..f2efdae7a3 100644 --- a/tests/certification/pubsub/mqtt/go.mod +++ b/tests/certification/pubsub/mqtt/go.mod @@ -9,7 +9,7 @@ require ( github.com/dapr/dapr v1.9.0-rc.3 github.com/dapr/go-sdk v1.5.1-0.20221004175845-b465b1fa0721 github.com/dapr/kit v0.0.3-0.20220930182601-272e358ba6a7 - github.com/eclipse/paho.mqtt.golang v1.4.1 + github.com/eclipse/paho.mqtt.golang v1.4.2-0.20221018190109-a1800d8df9a4 github.com/stretchr/testify v1.8.0 go.uber.org/multierr v1.8.0 ) @@ -142,5 +142,3 @@ require ( replace github.com/dapr/components-contrib/tests/certification => ../../ replace github.com/dapr/components-contrib => ../../../../ - -replace github.com/eclipse/paho.mqtt.golang => github.com/shivamkm07/paho.mqtt.golang v1.3.6-0.20220106130409-e28a1db639f8 diff --git a/tests/certification/pubsub/mqtt/go.sum b/tests/certification/pubsub/mqtt/go.sum index 3d150b3213..165ef0271f 100644 --- a/tests/certification/pubsub/mqtt/go.sum +++ b/tests/certification/pubsub/mqtt/go.sum @@ -107,6 +107,8 @@ github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3 github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/eclipse/paho.mqtt.golang v1.4.2-0.20221018190109-a1800d8df9a4 h1:yJj84YKRTY+zu/s9peWf0kuSq38zKT4KJUaFcJ1uRJM= +github.com/eclipse/paho.mqtt.golang v1.4.2-0.20221018190109-a1800d8df9a4/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA= github.com/emicklei/go-restful/v3 v3.8.0 h1:eCZ8ulSerjdAiaNpF7GxXIE7ZCMo1moN1qX+S609eVw= github.com/emicklei/go-restful/v3 v3.8.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -469,8 +471,6 @@ github.com/savsgio/gotils v0.0.0-20220530130905-52f3993e8d6d h1:Q+gqLBOPkFGHyCJx github.com/savsgio/gotils v0.0.0-20220530130905-52f3993e8d6d/go.mod h1:Gy+0tqhJvgGlqnTF8CVGP0AaGRjwBtXs/a5PA0Y3+A4= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= -github.com/shivamkm07/paho.mqtt.golang v1.3.6-0.20220106130409-e28a1db639f8 h1:BXKXQzeHuVnSrHAKjvq9ICrgPC27tJ/hXWLMQo36c5s= -github.com/shivamkm07/paho.mqtt.golang v1.3.6-0.20220106130409-e28a1db639f8/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=