Skip to content

Commit

Permalink
Merge pull request #574 from GilGil1/master
Browse files Browse the repository at this point in the history
Custom open connection for different network types.
This allows the user to provide a custom function which will be used to open the network connection enabling support for edge cases not supported by the inbuilt function.
  • Loading branch information
MattBrittan committed Dec 22, 2021
2 parents aa9a7a1 + b7ff17c commit 6f41a8c
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 1 deletion.
6 changes: 5 additions & 1 deletion client.go
Expand Up @@ -393,7 +393,11 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
tlsCfg = c.options.OnConnectAttempt(broker, c.options.TLSConfig)
}
// Start by opening the network connection (tcp, tls, ws) etc
conn, err = openConnection(broker, tlsCfg, c.options.ConnectTimeout, c.options.HTTPHeaders, c.options.WebsocketOptions, c.options.Dialer)
if c.options.CustomOpenConnectionFn != nil{
conn, err = c.options.CustomOpenConnectionFn(broker, c.options)
} else {
conn, err = openConnection(broker, tlsCfg, c.options.ConnectTimeout, c.options.HTTPHeaders, c.options.WebsocketOptions, c.options.Dialer)
}
if err != nil {
ERROR.Println(CLI, err.Error())
WARN.Println(CLI, "failed to connect to broker, trying next")
Expand Down
64 changes: 64 additions & 0 deletions client_test.go
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2021 IBM Corp and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0/
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Seth Hoenig
* Allan Stockdill-Mander
* Mike Robertson
* Matt Brittan
*/
package mqtt

import (
"net"
"net/url"
"strings"
"testing"
"time"
)

func TestCustomConnectionFunction(t *testing.T) {
// Set netpipe to emu
netClient, netServer := net.Pipe()
defer netClient.Close()
defer netServer.Close()
var firstMessage = ""
go func() {
// read first message only
bytes := make([]byte, 1024)
n, err := netServer.Read(bytes)
if err != nil {
t.Errorf("%v", err)
}
firstMessage = string(bytes[:n])
}()
// Set custom network connection function and client connect
var customConnectionFunc OpenConnectionFunc = func(uri *url.URL, options ClientOptions) (net.Conn, error) {
return netClient, nil
}
options := &ClientOptions{
CustomOpenConnectionFn: customConnectionFunc,
}
brokerAddr := netServer.LocalAddr().Network()
options.AddBroker(brokerAddr)
client := NewClient(options)

// Try to connect using custom function, wait for 2 seconds, to pass MQTT first message
if token := client.Connect(); token.WaitTimeout(2*time.Second) && token.Error() != nil {
t.Errorf("%v", token.Error())
}

// Analyze first message sent by client and received by the server
if len(firstMessage) <= 0 || !strings.Contains(firstMessage, "MQTT") {
t.Error("no message recieved on connect")
}
}
17 changes: 17 additions & 0 deletions options.go
Expand Up @@ -57,6 +57,11 @@ type ReconnectHandler func(Client, *ClientOptions)
// ConnectionAttemptHandler is invoked prior to making the initial connection.
type ConnectionAttemptHandler func(broker *url.URL, tlsCfg *tls.Config) *tls.Config

// OpenConnectionFunc is invoked to establish the underlying network connection
// Its purpose if for custom network transports.
// Does not carry out any MQTT specific handshakes.
type OpenConnectionFunc func(uri *url.URL, options ClientOptions) (net.Conn, error)

// ClientOptions contains configurable options for an Client. Note that these should be set using the
// relevant methods (e.g. AddBroker) rather than directly. See those functions for information on usage.
// WARNING: Create the below using NewClientOptions unless you have a compelling reason not to. It is easy
Expand Down Expand Up @@ -98,6 +103,7 @@ type ClientOptions struct {
WebsocketOptions *WebsocketOptions
MaxResumePubInFlight int // // 0 = no limit; otherwise this is the maximum simultaneous messages sent while resuming
Dialer *net.Dialer
CustomOpenConnectionFn OpenConnectionFunc
}

// NewClientOptions will create a new ClientClientOptions type with some
Expand Down Expand Up @@ -140,6 +146,7 @@ func NewClientOptions() *ClientOptions {
HTTPHeaders: make(map[string][]string),
WebsocketOptions: &WebsocketOptions{},
Dialer: &net.Dialer{Timeout: 30 * time.Second},
CustomOpenConnectionFn: nil,
}
return o
}
Expand Down Expand Up @@ -429,3 +436,13 @@ func (o *ClientOptions) SetDialer(dialer *net.Dialer) *ClientOptions {
o.Dialer = dialer
return o
}

// SetCustomOpenConectionFn replaces the inbuilt function that establishes a network connection with a custom function.
// The passed in function should return an open `net.Conn` or an error (see the existing openConnection function for an example)
// It enables custom networking types in addition to the defaults (tcp, tls, websockets...)
func (o *ClientOptions) SetCustomOpenConectionFn(customOpenConnectionfn OpenConnectionFunc) *ClientOptions {
if customOpenConnectionfn != nil {
o.CustomOpenConnectionFn = customOpenConnectionfn
}
return o
}
39 changes: 39 additions & 0 deletions options_test.go
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2021 IBM Corp and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0/
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Seth Hoenig
* Allan Stockdill-Mander
* Mike Robertson
* Måns Ansgariusson
*/

// Portions copyright © 2018 TIBCO Software Inc.
package mqtt

import (
"fmt"
"net"
"net/url"
"testing"
)

func TestSetCustomConnectionOptions(t *testing.T) {
var customConnectionFunc OpenConnectionFunc = func(uri *url.URL, options ClientOptions) (net.Conn, error) {
return nil, fmt.Errorf("not implemented open connection func")
}
options := &ClientOptions{}
options = options.SetCustomOpenConectionFn(customConnectionFunc)
if options.CustomOpenConnectionFn == nil {
t.Error("custom open connection function cannot be set")
}
}

0 comments on commit 6f41a8c

Please sign in to comment.