Skip to content

Commit

Permalink
MQTT Pubsub Certification Testing + AutAckOff Fix for MQTT (dapr#1420)
Browse files Browse the repository at this point in the history
* MQTT Certification Test

Signed-off-by: shivam <shivamkm07@gmail.com>

* Using paho.mqtt.golang fork with AutoAck fix

Signed-off-by: shivam <shivamkm07@gmail.com>

* Adding MQTT component in certification.yml

Signed-off-by: shivam <shivamkm07@gmail.com>

Co-authored-by: Artur Souza <artursouza.ms@outlook.com>
Signed-off-by: jigargandhi <jigarr.gandhi@gmail.com>
  • Loading branch information
2 people authored and jigargandhi committed Jan 21, 2022
1 parent 0fd86e1 commit df1e831
Show file tree
Hide file tree
Showing 12 changed files with 2,321 additions and 2 deletions.
1 change: 1 addition & 0 deletions .github/workflows/certification.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ jobs:
PR_COMPONENTS=$(yq -I0 --tojson eval - << EOF
- pubsub.kafka
- pubsub.rabbitmq
- pubsub.mqtt
EOF
)
echo "::set-output name=pr-components::$PR_COMPONENTS"
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -307,3 +307,5 @@ require (
require github.com/sony/gobreaker v0.4.2-0.20210216022020-dd874f9dd33b // indirect

replace k8s.io/client => github.com/kubernetes-client/go v0.0.0-20190928040339-c757968c4c36

replace github.com/eclipse/paho.mqtt.golang => github.com/shivamkm07/paho.mqtt.golang v1.3.6-0.20220106130409-e28a1db639f8
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,6 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/eclipse/paho.mqtt.golang v1.3.5 h1:sWtmgNxYM9P2sP+xEItMozsR3w0cqZFlqnNN1bdl41Y=
github.com/eclipse/paho.mqtt.golang v1.3.5/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc=
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
Expand Down Expand Up @@ -1092,6 +1090,8 @@ github.com/shirou/gopsutil v3.21.10+incompatible h1:AL2kpVykjkqeN+MFe1WcwSBVUjGj
github.com/shirou/gopsutil v3.21.10+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/gopsutil/v3 v3.21.6 h1:vU7jrp1Ic/2sHB7w6UNs7MIkn7ebVtTb5D9j45o9VYE=
github.com/shirou/gopsutil/v3 v3.21.6/go.mod h1:JfVbDpIBLVzT8oKbvMg9P3wEIMDDpVn+LwHTKj0ST88=
github.com/shivamkm07/paho.mqtt.golang v1.3.6-0.20220106130409-e28a1db639f8 h1:BXKXQzeHuVnSrHAKjvq9ICrgPC27tJ/hXWLMQo36c5s=
github.com/shivamkm07/paho.mqtt.golang v1.3.6-0.20220106130409-e28a1db639f8/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 h1:pntxY8Ary0t43dCZ5dqY4YTJCObLY1kIXl0uzMv+7DE=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo=
Expand Down
1 change: 1 addition & 0 deletions pubsub/mqtt/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ func (m *mqttPubSub) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handl
token := m.consumer.SubscribeMultiple(
m.topics,
func(client mqtt.Client, mqttMsg mqtt.Message) {
mqttMsg.AutoAckOff()
msg := pubsub.NewMessage{
Topic: mqttMsg.Topic(),
Data: mqttMsg.Payload(),
Expand Down
43 changes: 43 additions & 0 deletions tests/certification/pubsub/mqtt/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# MQTT certifcation testing

This project aims to test the MQTT Pub/Sub component under various conditions.

## Test plan

### Basic Test

* Bring up a MQTT cluster
* Start 1 sidecar/application(App1)
* Publishes 1000+ unique messages
* App: Simulate periodic errors
* Component: Retries on error
* App: Observes successful messages
* Test: Confirms that all expected messages were received

### Multiple Publishers-Subscribers

* Start second sidecar/application(App2)
* Each of the publishers publish a fixed number of messages to the topic
* Test: Confirms that both applications receive all published messages

### Infra Test

* Start a constant flow of publishing and subscribing(App1)
* Test: Keeps count of total sent/received
* Start another sidecar/application with persistent session(App2)
* Test: Publishes messages in background
* Each of the applications should receive messages
* Stop consumer connected with persistent session(App2)
* Test: Publishes messages in background
* Only App1 should receive messages
* Stop publisher as well so that none of the components are active
* No messages are published and received
* Restart second consumer with persistent session
* App2 receives all lost messages
* Restart publisher so that both components are active
* Test: Confirms that both applications received all published messages and no messages were lost

### Network Test
* Simulate network interruption
* Test: Begins trying to reconnect & publish
* Component: Begins trying to reconnect & re-subscribe
20 changes: 20 additions & 0 deletions tests/certification/pubsub/mqtt/components/consumer1/mqtt.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: messagebus
spec:
type: pubsub.mqtt
version: v1
metadata:
- name: url
value: "tcp://localhost:1884"
- name: consumerID
value: "testConsumer1"
- name: retain
value: true
- name: qos
value: 2
- name: cleanSession
value: false
- name: backOffMaxRetries
value: 5
20 changes: 20 additions & 0 deletions tests/certification/pubsub/mqtt/components/consumer2/mqtt.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: messagebus
spec:
type: pubsub.mqtt
version: v1
metadata:
- name: url
value: "tcp://localhost:1884"
- name: consumerID
value: "testConsumer2"
- name: retain
value: false
- name: qos
value: 2
- name: cleanSession
value: false
- name: backOffMaxRetries
value: 5
8 changes: 8 additions & 0 deletions tests/certification/pubsub/mqtt/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
apiVersion: dapr.io/v1alpha1
kind: Configuration
metadata:
name: pubsubroutingconfig
spec:
features:
- name: PubSub.Routing
enabled: true
8 changes: 8 additions & 0 deletions tests/certification/pubsub/mqtt/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
version: '2'
services:
emqx:
image: emqx/emqx:4.2.7
hostname: emqx
container_name: emqx
ports:
- "1884:1883"
118 changes: 118 additions & 0 deletions tests/certification/pubsub/mqtt/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
module github.com/dapr/components-contrib/tests/certification/pubsub/mqtt

go 1.17

require (
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/dapr/components-contrib v1.5.0-rc.1.0.20220105071850-a013b58d6cee
github.com/dapr/components-contrib/tests/certification v1.4.0-rc2
github.com/dapr/dapr v1.5.2-0.20220106203753-0e6bcbabc8ba
github.com/dapr/go-sdk v1.3.0
github.com/dapr/kit v0.0.2-0.20210614175626-b9074b64d233
github.com/eclipse/paho.mqtt.golang v1.3.5
github.com/stretchr/testify v1.7.0
go.uber.org/multierr v1.7.0
)

require (
contrib.go.opencensus.io/exporter/prometheus v0.4.0 // indirect
contrib.go.opencensus.io/exporter/zipkin v0.1.1 // indirect
github.com/AdhityaRamadhanus/fasthttpcors v0.0.0-20170121111917-d4c07198763a // indirect
github.com/PuerkitoBio/purell v1.1.1 // indirect
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/andybalholm/brotli v1.0.2 // indirect
github.com/antlr/antlr4 v0.0.0-20200503195918-621b933c7a7f // indirect
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.1.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fasthttp/router v1.3.8 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/go-kit/log v0.1.0 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/logr v0.3.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/cel-go v0.9.0 // indirect
github.com/google/go-cmp v0.5.6 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/gnostic v0.5.1 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/grandcat/zeroconf v0.0.0-20190424104450-85eadb44205c // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.2.2 // indirect
github.com/hashicorp/consul/api v1.3.0 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.1 // indirect
github.com/hashicorp/go-immutable-radix v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-rootcerts v1.0.0 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/serf v0.8.2 // indirect
github.com/imdario/mergo v0.3.10 // indirect
github.com/json-iterator/go v1.1.11 // indirect
github.com/klauspost/compress v1.13.4 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/miekg/dns v1.1.35 // indirect
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.4.1 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/openzipkin/zipkin-go v0.2.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.11.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/prometheus/statsd_exporter v0.22.3 // indirect
github.com/savsgio/gotils v0.0.0-20210217112953-d4a072536008 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stoewer/go-strcase v1.2.0 // indirect
github.com/tylertreat/comcast v1.0.1 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.31.1-0.20211216042702-258a4c17b4f4 // indirect
go.opencensus.io v0.23.0 // indirect
go.opentelemetry.io/otel v0.19.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect
golang.org/x/net v0.0.0-20210825183410-e898025ed96a // indirect
golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac // indirect
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
google.golang.org/appengine v1.6.6 // indirect
google.golang.org/genproto v0.0.0-20210831024726-fe130286e0e2 // indirect
google.golang.org/grpc v1.40.0 // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
k8s.io/api v0.20.0 // indirect
k8s.io/apiextensions-apiserver v0.20.0 // indirect
k8s.io/apimachinery v0.20.0 // indirect
k8s.io/client-go v0.20.0 // indirect
k8s.io/klog/v2 v2.4.0 // indirect
k8s.io/utils v0.0.0-20201110183641-67b214c5f920 // indirect
sigs.k8s.io/controller-runtime v0.7.0 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.0.2 // indirect
sigs.k8s.io/yaml v1.2.0 // indirect
)

replace github.com/dapr/components-contrib/tests/certification => ../../

replace github.com/dapr/components-contrib => ../../../../

replace github.com/eclipse/paho.mqtt.golang => github.com/shivamkm07/paho.mqtt.golang v1.3.6-0.20220106130409-e28a1db639f8

// Uncomment for local development for testing with changes
// in the Dapr runtime. Don't commit with this uncommented!
//
// replace github.com/dapr/dapr => ../../../../../dapr

0 comments on commit df1e831

Please sign in to comment.