Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQTT Pubsub Certification Testing + AutAckOff Fix for MQTT #1420

Merged
merged 4 commits into from
Jan 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/certification.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ jobs:
PR_COMPONENTS=$(yq -I0 --tojson eval - << EOF
- pubsub.kafka
- pubsub.rabbitmq
- pubsub.mqtt
EOF
)
echo "::set-output name=pr-components::$PR_COMPONENTS"
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -305,3 +305,5 @@ require (
require github.com/sony/gobreaker v0.4.2-0.20210216022020-dd874f9dd33b // indirect

replace k8s.io/client => github.com/kubernetes-client/go v0.0.0-20190928040339-c757968c4c36

replace github.com/eclipse/paho.mqtt.golang => github.com/shivamkm07/paho.mqtt.golang v1.3.6-0.20220106130409-e28a1db639f8
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,6 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/eclipse/paho.mqtt.golang v1.3.5 h1:sWtmgNxYM9P2sP+xEItMozsR3w0cqZFlqnNN1bdl41Y=
github.com/eclipse/paho.mqtt.golang v1.3.5/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc=
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
Expand Down Expand Up @@ -1088,6 +1086,8 @@ github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ=
github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/shirou/gopsutil/v3 v3.21.6 h1:vU7jrp1Ic/2sHB7w6UNs7MIkn7ebVtTb5D9j45o9VYE=
github.com/shirou/gopsutil/v3 v3.21.6/go.mod h1:JfVbDpIBLVzT8oKbvMg9P3wEIMDDpVn+LwHTKj0ST88=
github.com/shivamkm07/paho.mqtt.golang v1.3.6-0.20220106130409-e28a1db639f8 h1:BXKXQzeHuVnSrHAKjvq9ICrgPC27tJ/hXWLMQo36c5s=
github.com/shivamkm07/paho.mqtt.golang v1.3.6-0.20220106130409-e28a1db639f8/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 h1:pntxY8Ary0t43dCZ5dqY4YTJCObLY1kIXl0uzMv+7DE=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo=
Expand Down
1 change: 1 addition & 0 deletions pubsub/mqtt/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ func (m *mqttPubSub) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handl
token := m.consumer.SubscribeMultiple(
m.topics,
func(client mqtt.Client, mqttMsg mqtt.Message) {
mqttMsg.AutoAckOff()
msg := pubsub.NewMessage{
Topic: mqttMsg.Topic(),
Data: mqttMsg.Payload(),
Expand Down
43 changes: 43 additions & 0 deletions tests/certification/pubsub/mqtt/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# MQTT certifcation testing

This project aims to test the MQTT Pub/Sub component under various conditions.

## Test plan

### Basic Test

* Bring up a MQTT cluster
* Start 1 sidecar/application(App1)
* Publishes 1000+ unique messages
* App: Simulate periodic errors
* Component: Retries on error
* App: Observes successful messages
* Test: Confirms that all expected messages were received

### Multiple Publishers-Subscribers

* Start second sidecar/application(App2)
* Each of the publishers publish a fixed number of messages to the topic
* Test: Confirms that both applications receive all published messages

### Infra Test

* Start a constant flow of publishing and subscribing(App1)
* Test: Keeps count of total sent/received
* Start another sidecar/application with persistent session(App2)
* Test: Publishes messages in background
* Each of the applications should receive messages
* Stop consumer connected with persistent session(App2)
* Test: Publishes messages in background
* Only App1 should receive messages
* Stop publisher as well so that none of the components are active
* No messages are published and received
* Restart second consumer with persistent session
* App2 receives all lost messages
* Restart publisher so that both components are active
* Test: Confirms that both applications received all published messages and no messages were lost

### Network Test
* Simulate network interruption
* Test: Begins trying to reconnect & publish
* Component: Begins trying to reconnect & re-subscribe
20 changes: 20 additions & 0 deletions tests/certification/pubsub/mqtt/components/consumer1/mqtt.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: messagebus
spec:
type: pubsub.mqtt
version: v1
metadata:
- name: url
value: "tcp://localhost:1884"
- name: consumerID
value: "testConsumer1"
- name: retain
value: true
- name: qos
value: 2
- name: cleanSession
value: false
- name: backOffMaxRetries
value: 5
20 changes: 20 additions & 0 deletions tests/certification/pubsub/mqtt/components/consumer2/mqtt.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: messagebus
spec:
type: pubsub.mqtt
version: v1
metadata:
- name: url
value: "tcp://localhost:1884"
- name: consumerID
value: "testConsumer2"
- name: retain
value: false
- name: qos
value: 2
- name: cleanSession
value: false
- name: backOffMaxRetries
value: 5
8 changes: 8 additions & 0 deletions tests/certification/pubsub/mqtt/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
apiVersion: dapr.io/v1alpha1
kind: Configuration
metadata:
name: pubsubroutingconfig
spec:
features:
- name: PubSub.Routing
enabled: true
8 changes: 8 additions & 0 deletions tests/certification/pubsub/mqtt/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
version: '2'
services:
emqx:
image: emqx/emqx:4.2.7
hostname: emqx
container_name: emqx
ports:
- "1884:1883"
118 changes: 118 additions & 0 deletions tests/certification/pubsub/mqtt/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
module github.com/dapr/components-contrib/tests/certification/pubsub/mqtt

go 1.17

require (
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/dapr/components-contrib v1.5.0-rc.1.0.20220105071850-a013b58d6cee
github.com/dapr/components-contrib/tests/certification v1.4.0-rc2
github.com/dapr/dapr v1.5.2-0.20220106203753-0e6bcbabc8ba
github.com/dapr/go-sdk v1.3.0
github.com/dapr/kit v0.0.2-0.20210614175626-b9074b64d233
github.com/eclipse/paho.mqtt.golang v1.3.5
github.com/stretchr/testify v1.7.0
go.uber.org/multierr v1.7.0
)

require (
contrib.go.opencensus.io/exporter/prometheus v0.4.0 // indirect
contrib.go.opencensus.io/exporter/zipkin v0.1.1 // indirect
github.com/AdhityaRamadhanus/fasthttpcors v0.0.0-20170121111917-d4c07198763a // indirect
github.com/PuerkitoBio/purell v1.1.1 // indirect
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/andybalholm/brotli v1.0.2 // indirect
github.com/antlr/antlr4 v0.0.0-20200503195918-621b933c7a7f // indirect
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.1.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fasthttp/router v1.3.8 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/go-kit/log v0.1.0 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/logr v0.3.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/cel-go v0.9.0 // indirect
github.com/google/go-cmp v0.5.6 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/gnostic v0.5.1 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/grandcat/zeroconf v0.0.0-20190424104450-85eadb44205c // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.2.2 // indirect
github.com/hashicorp/consul/api v1.3.0 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.1 // indirect
github.com/hashicorp/go-immutable-radix v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-rootcerts v1.0.0 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/serf v0.8.2 // indirect
github.com/imdario/mergo v0.3.10 // indirect
github.com/json-iterator/go v1.1.11 // indirect
github.com/klauspost/compress v1.13.4 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/miekg/dns v1.1.35 // indirect
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.4.1 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/openzipkin/zipkin-go v0.2.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.11.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/prometheus/statsd_exporter v0.22.3 // indirect
github.com/savsgio/gotils v0.0.0-20210217112953-d4a072536008 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stoewer/go-strcase v1.2.0 // indirect
github.com/tylertreat/comcast v1.0.1 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.31.1-0.20211216042702-258a4c17b4f4 // indirect
go.opencensus.io v0.23.0 // indirect
go.opentelemetry.io/otel v0.19.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect
golang.org/x/net v0.0.0-20210825183410-e898025ed96a // indirect
golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac // indirect
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
google.golang.org/appengine v1.6.6 // indirect
google.golang.org/genproto v0.0.0-20210831024726-fe130286e0e2 // indirect
google.golang.org/grpc v1.40.0 // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
k8s.io/api v0.20.0 // indirect
k8s.io/apiextensions-apiserver v0.20.0 // indirect
k8s.io/apimachinery v0.20.0 // indirect
k8s.io/client-go v0.20.0 // indirect
k8s.io/klog/v2 v2.4.0 // indirect
k8s.io/utils v0.0.0-20201110183641-67b214c5f920 // indirect
sigs.k8s.io/controller-runtime v0.7.0 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.0.2 // indirect
sigs.k8s.io/yaml v1.2.0 // indirect
)

replace github.com/dapr/components-contrib/tests/certification => ../../

replace github.com/dapr/components-contrib => ../../../../

replace github.com/eclipse/paho.mqtt.golang => github.com/shivamkm07/paho.mqtt.golang v1.3.6-0.20220106130409-e28a1db639f8

// Uncomment for local development for testing with changes
// in the Dapr runtime. Don't commit with this uncommented!
//
// replace github.com/dapr/dapr => ../../../../../dapr