Skip to content

Commit

Permalink
Merge pull request #2207 from shivamkm07/upgrade_mqtt
Browse files Browse the repository at this point in the history
Upgrade mqtt component to upstream
  • Loading branch information
berndverst committed Oct 21, 2022
2 parents 0326f13 + bbae625 commit 05b1423
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 14 deletions.
5 changes: 1 addition & 4 deletions go.mod
Expand Up @@ -54,7 +54,7 @@ require (
github.com/dghubble/go-twitter v0.0.0-20220816163853-8a0df96f1e6d
github.com/dghubble/oauth1 v0.7.1
github.com/didip/tollbooth v4.0.2+incompatible
github.com/eclipse/paho.mqtt.golang v1.4.1
github.com/eclipse/paho.mqtt.golang v1.4.2-0.20221018190109-a1800d8df9a4
github.com/fasthttp-contrib/sessions v0.0.0-20160905201309-74f6ac73d5d5
github.com/ghodss/yaml v1.0.0
github.com/go-redis/redis/v8 v8.11.5
Expand Down Expand Up @@ -362,8 +362,5 @@ replace github.com/gobwas/pool => github.com/gobwas/pool v0.2.1

replace github.com/toolkits/concurrent => github.com/niean/gotools v0.0.0-20151221085310-ff3f51fc5c60

// this fork is used due to a feature missing from upstream
replace github.com/eclipse/paho.mqtt.golang => github.com/shivamkm07/paho.mqtt.golang v1.3.6-0.20220106130409-e28a1db639f8

// this is a fork which addresses a performance issues due to go routines
replace dubbo.apache.org/dubbo-go/v3 => dubbo.apache.org/dubbo-go/v3 v3.0.3-0.20220610080020-48691a404537
4 changes: 2 additions & 2 deletions go.sum
Expand Up @@ -468,6 +468,8 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/eclipse/paho.mqtt.golang v1.4.2-0.20221018190109-a1800d8df9a4 h1:yJj84YKRTY+zu/s9peWf0kuSq38zKT4KJUaFcJ1uRJM=
github.com/eclipse/paho.mqtt.golang v1.4.2-0.20221018190109-a1800d8df9a4/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA=
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/emicklei/go-restful/v3 v3.8.0 h1:eCZ8ulSerjdAiaNpF7GxXIE7ZCMo1moN1qX+S609eVw=
github.com/emicklei/go-restful/v3 v3.8.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
Expand Down Expand Up @@ -1276,8 +1278,6 @@ github.com/shirou/gopsutil v3.20.11+incompatible h1:LJr4ZQK4mPpIV5gOa4jCOKOGb4ty
github.com/shirou/gopsutil v3.20.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/gopsutil/v3 v3.21.6 h1:vU7jrp1Ic/2sHB7w6UNs7MIkn7ebVtTb5D9j45o9VYE=
github.com/shirou/gopsutil/v3 v3.21.6/go.mod h1:JfVbDpIBLVzT8oKbvMg9P3wEIMDDpVn+LwHTKj0ST88=
github.com/shivamkm07/paho.mqtt.golang v1.3.6-0.20220106130409-e28a1db639f8 h1:BXKXQzeHuVnSrHAKjvq9ICrgPC27tJ/hXWLMQo36c5s=
github.com/shivamkm07/paho.mqtt.golang v1.3.6-0.20220106130409-e28a1db639f8/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sijms/go-ora/v2 v2.5.3 h1:klGKmhqRONVTtIzTdfYTvrW94kdJkdmZl93u2A3vchI=
github.com/sijms/go-ora/v2 v2.5.3/go.mod h1:EHxlY6x7y9HAsdfumurRfTd+v8NrEOTR3Xl4FWlH6xk=
Expand Down
5 changes: 2 additions & 3 deletions pubsub/mqtt/mqtt.go
Expand Up @@ -247,9 +247,6 @@ func (m *mqttPubSub) startSubscription(ctx context.Context) error {
// onMessage returns the callback to be invoked when there's a new message from a topic
func (m *mqttPubSub) onMessage(ctx context.Context) func(client mqtt.Client, mqttMsg mqtt.Message) {
return func(client mqtt.Client, mqttMsg mqtt.Message) {
// Turn off auto-ACK
mqttMsg.AutoAckOff()

ack := false
defer func() {
// Do not send N/ACKs on retained messages
Expand Down Expand Up @@ -336,6 +333,8 @@ func (m *mqttPubSub) connect(ctx context.Context, clientID string) (mqtt.Client,
return nil, err
}
opts := m.createClientOptions(uri, clientID)
// Turn off auto-ack
opts.SetAutoAckDisabled(true)
client := mqtt.NewClient(opts)

// Add all routes before we connect to catch messages that may be delivered before client.Subscribe is invoked
Expand Down
4 changes: 1 addition & 3 deletions tests/certification/pubsub/mqtt/go.mod
Expand Up @@ -9,7 +9,7 @@ require (
github.com/dapr/dapr v1.9.0-rc.3
github.com/dapr/go-sdk v1.5.1-0.20221004175845-b465b1fa0721
github.com/dapr/kit v0.0.3-0.20220930182601-272e358ba6a7
github.com/eclipse/paho.mqtt.golang v1.4.1
github.com/eclipse/paho.mqtt.golang v1.4.2-0.20221018190109-a1800d8df9a4
github.com/stretchr/testify v1.8.0
go.uber.org/multierr v1.8.0
)
Expand Down Expand Up @@ -142,5 +142,3 @@ require (
replace github.com/dapr/components-contrib/tests/certification => ../../

replace github.com/dapr/components-contrib => ../../../../

replace github.com/eclipse/paho.mqtt.golang => github.com/shivamkm07/paho.mqtt.golang v1.3.6-0.20220106130409-e28a1db639f8
4 changes: 2 additions & 2 deletions tests/certification/pubsub/mqtt/go.sum
Expand Up @@ -107,6 +107,8 @@ github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3
github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/eclipse/paho.mqtt.golang v1.4.2-0.20221018190109-a1800d8df9a4 h1:yJj84YKRTY+zu/s9peWf0kuSq38zKT4KJUaFcJ1uRJM=
github.com/eclipse/paho.mqtt.golang v1.4.2-0.20221018190109-a1800d8df9a4/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA=
github.com/emicklei/go-restful/v3 v3.8.0 h1:eCZ8ulSerjdAiaNpF7GxXIE7ZCMo1moN1qX+S609eVw=
github.com/emicklei/go-restful/v3 v3.8.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
Expand Down Expand Up @@ -469,8 +471,6 @@ github.com/savsgio/gotils v0.0.0-20220530130905-52f3993e8d6d h1:Q+gqLBOPkFGHyCJx
github.com/savsgio/gotils v0.0.0-20220530130905-52f3993e8d6d/go.mod h1:Gy+0tqhJvgGlqnTF8CVGP0AaGRjwBtXs/a5PA0Y3+A4=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/shivamkm07/paho.mqtt.golang v1.3.6-0.20220106130409-e28a1db639f8 h1:BXKXQzeHuVnSrHAKjvq9ICrgPC27tJ/hXWLMQo36c5s=
github.com/shivamkm07/paho.mqtt.golang v1.3.6-0.20220106130409-e28a1db639f8/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
Expand Down

0 comments on commit 05b1423

Please sign in to comment.