forked from eclipse/paho.mqtt.golang
/
main.go
176 lines (148 loc) · 5.48 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
/*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0/
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
* Contributors:
* Matt Brittan
*/
package main
// Connect to the broker, subscribe, and write messages received to a file
import (
"encoding/json"
"fmt"
"os"
"os/signal"
"syscall"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
const (
TOPIC = "topic1"
QOS = 1
SERVERADDRESS = "tcp://mosquitto:1883"
CLIENTID = "mqtt_subscriber"
WRITETOLOG = true // If true then received messages will be written to the console
WRITETODISK = false // If true then received messages will be written to the file below
OUTPUTFILE = "/binds/receivedMessages.txt"
)
// handler is a simple struct that provides a function to be called when a message is received. The message is parsed
// and the count followed by the raw message is written to the file (this makes it easier to sort the file)
type handler struct {
f *os.File
}
func NewHandler() *handler {
var f *os.File
if WRITETODISK {
var err error
f, err = os.Create(OUTPUTFILE)
if err != nil {
panic(err)
}
}
return &handler{f: f}
}
// Close closes the file
func (o *handler) Close() {
if o.f != nil {
if err := o.f.Close(); err != nil {
fmt.Printf("ERROR closing file: %s", err)
}
o.f = nil
}
}
// Message
type Message struct {
Count uint64
}
// handle is called when a message is received
func (o *handler) handle(_ mqtt.Client, msg mqtt.Message) {
// We extract the count and write that out first to simplify checking for missing values
var m Message
if err := json.Unmarshal(msg.Payload(), &m); err != nil {
fmt.Printf("Message could not be parsed (%s): %s", msg.Payload(), err)
}
if o.f != nil {
// Write out the number (make it long enough that sorting works) and the payload
if _, err := o.f.WriteString(fmt.Sprintf("%09d %s\n", m.Count, msg.Payload())); err != nil {
fmt.Printf("ERROR writing to file: %s", err)
}
}
if WRITETOLOG {
fmt.Printf("received message: %s\n", msg.Payload())
}
}
func main() {
// Enable logging by uncommenting the below
// mqtt.ERROR = log.New(os.Stdout, "[ERROR] ", 0)
// mqtt.CRITICAL = log.New(os.Stdout, "[CRITICAL] ", 0)
// mqtt.WARN = log.New(os.Stdout, "[WARN] ", 0)
// mqtt.DEBUG = log.New(os.Stdout, "[DEBUG] ", 0)
// Create a handler that will deal with incoming messages
h := NewHandler()
defer h.Close()
// Now we establish the connection to the mqtt broker
opts := mqtt.NewClientOptions()
opts.AddBroker(SERVERADDRESS)
opts.SetClientID(CLIENTID)
opts.SetOrderMatters(false) // Allow out of order messages (use this option unless in order delivery is essential)
opts.ConnectTimeout = time.Second // Minimal delays on connect
opts.WriteTimeout = time.Second // Minimal delays on writes
opts.KeepAlive = 10 // Keepalive every 10 seconds so we quickly detect network outages
opts.PingTimeout = time.Second // local broker so response should be quick
// Automate connection management (will keep trying to connect and will reconnect if network drops)
opts.ConnectRetry = true
opts.AutoReconnect = true
// If using QOS2 and CleanSession = FALSE then it is possible that we will receive messages on topics that we
// have not subscribed to here (if they were previously subscribed to they are part of the session and survive
// disconnect/reconnect). Adding a DefaultPublishHandler lets us detect this.
opts.DefaultPublishHandler = func(_ mqtt.Client, msg mqtt.Message) {
fmt.Printf("UNEXPECTED MESSAGE: %s\n", msg)
}
// Log events
opts.OnConnectionLost = func(cl mqtt.Client, err error) {
fmt.Println("connection lost")
}
opts.OnConnect = func(c mqtt.Client) {
fmt.Println("connection established")
// Establish the subscription - doing this here means that it will happen every time a connection is established
// (useful if opts.CleanSession is TRUE or the broker does not reliably store session data)
t := c.Subscribe(TOPIC, QOS, h.handle)
// the connection handler is called in a goroutine so blocking here would hot cause an issue. However as blocking
// in other handlers does cause problems its best to just assume we should not block
go func() {
_ = t.Wait() // Can also use '<-t.Done()' in releases > 1.2.0
if t.Error() != nil {
fmt.Printf("ERROR SUBSCRIBING: %s\n", t.Error())
} else {
fmt.Println("subscribed to: ", TOPIC)
}
}()
}
opts.OnReconnecting = func(mqtt.Client, *mqtt.ClientOptions) {
fmt.Println("attempting to reconnect")
}
//
// Connect to the broker
//
client := mqtt.NewClient(opts)
// If using QOS2 and CleanSession = FALSE then messages may be transmitted to us before the subscribe completes.
// Adding routes prior to connecting is a way of ensuring that these messages are processed
client.AddRoute(TOPIC, h.handle)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
fmt.Println("Connection is up")
// Messages will be delivered asynchronously so we just need to wait for a signal to shutdown
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt)
signal.Notify(sig, syscall.SIGTERM)
<-sig
fmt.Println("signal caught - exiting")
client.Disconnect(1000)
fmt.Println("shutdown complete")
}