Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom open connection for different network types #574

Merged
merged 3 commits into from Dec 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion client.go
Expand Up @@ -393,7 +393,11 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
tlsCfg = c.options.OnConnectAttempt(broker, c.options.TLSConfig)
}
// Start by opening the network connection (tcp, tls, ws) etc
conn, err = openConnection(broker, tlsCfg, c.options.ConnectTimeout, c.options.HTTPHeaders, c.options.WebsocketOptions, c.options.Dialer)
if c.options.CustomOpenConnectionFn != nil{
conn, err = c.options.CustomOpenConnectionFn(broker, c.options)
} else {
conn, err = openConnection(broker, tlsCfg, c.options.ConnectTimeout, c.options.HTTPHeaders, c.options.WebsocketOptions, c.options.Dialer)
}
if err != nil {
ERROR.Println(CLI, err.Error())
WARN.Println(CLI, "failed to connect to broker, trying next")
Expand Down
64 changes: 64 additions & 0 deletions client_test.go
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2021 IBM Corp and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0/
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Seth Hoenig
* Allan Stockdill-Mander
* Mike Robertson
* Matt Brittan
*/
package mqtt

import (
"net"
"net/url"
"strings"
"testing"
"time"
)

func TestCustomConnectionFunction(t *testing.T) {
// Set netpipe to emu
netClient, netServer := net.Pipe()
defer netClient.Close()
defer netServer.Close()
var firstMessage = ""
go func() {
// read first message only
bytes := make([]byte, 1024)
n, err := netServer.Read(bytes)
if err != nil {
t.Errorf("%v", err)
}
firstMessage = string(bytes[:n])
}()
// Set custom network connection function and client connect
var customConnectionFunc OpenConnectionFunc = func(uri *url.URL, options ClientOptions) (net.Conn, error) {
return netClient, nil
}
options := &ClientOptions{
CustomOpenConnectionFn: customConnectionFunc,
}
brokerAddr := netServer.LocalAddr().Network()
options.AddBroker(brokerAddr)
client := NewClient(options)

// Try to connect using custom function, wait for 2 seconds, to pass MQTT first message
if token := client.Connect(); token.WaitTimeout(2*time.Second) && token.Error() != nil {
t.Errorf("%v", token.Error())
}

// Analyze first message sent by client and received by the server
if len(firstMessage) <= 0 || !strings.Contains(firstMessage, "MQTT") {
t.Error("no message recieved on connect")
}
}
17 changes: 17 additions & 0 deletions options.go
Expand Up @@ -57,6 +57,11 @@ type ReconnectHandler func(Client, *ClientOptions)
// ConnectionAttemptHandler is invoked prior to making the initial connection.
type ConnectionAttemptHandler func(broker *url.URL, tlsCfg *tls.Config) *tls.Config

// OpenConnectionFunc is invoked to establish the underlying network connection
// Its purpose if for custom network transports.
// Does not carry out any MQTT specific handshakes.
type OpenConnectionFunc func(uri *url.URL, options ClientOptions) (net.Conn, error)

// ClientOptions contains configurable options for an Client. Note that these should be set using the
// relevant methods (e.g. AddBroker) rather than directly. See those functions for information on usage.
// WARNING: Create the below using NewClientOptions unless you have a compelling reason not to. It is easy
Expand Down Expand Up @@ -98,6 +103,7 @@ type ClientOptions struct {
WebsocketOptions *WebsocketOptions
MaxResumePubInFlight int // // 0 = no limit; otherwise this is the maximum simultaneous messages sent while resuming
Dialer *net.Dialer
CustomOpenConnectionFn OpenConnectionFunc
}

// NewClientOptions will create a new ClientClientOptions type with some
Expand Down Expand Up @@ -140,6 +146,7 @@ func NewClientOptions() *ClientOptions {
HTTPHeaders: make(map[string][]string),
WebsocketOptions: &WebsocketOptions{},
Dialer: &net.Dialer{Timeout: 30 * time.Second},
CustomOpenConnectionFn: nil,
}
return o
}
Expand Down Expand Up @@ -429,3 +436,13 @@ func (o *ClientOptions) SetDialer(dialer *net.Dialer) *ClientOptions {
o.Dialer = dialer
return o
}

// SetCustomOpenConectionFn replaces the inbuilt function that establishes a network connection with a custom function.
// The passed in function should return an open `net.Conn` or an error (see the existing openConnection function for an example)
// It enables custom networking types in addition to the defaults (tcp, tls, websockets...)
func (o *ClientOptions) SetCustomOpenConectionFn(customOpenConnectionfn OpenConnectionFunc) *ClientOptions {
if customOpenConnectionfn != nil {
o.CustomOpenConnectionFn = customOpenConnectionfn
}
return o
}
39 changes: 39 additions & 0 deletions options_test.go
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2021 IBM Corp and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0/
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Seth Hoenig
* Allan Stockdill-Mander
* Mike Robertson
* Måns Ansgariusson
*/

// Portions copyright © 2018 TIBCO Software Inc.
package mqtt

import (
"fmt"
"net"
"net/url"
"testing"
)

func TestSetCustomConnectionOptions(t *testing.T) {
var customConnectionFunc OpenConnectionFunc = func(uri *url.URL, options ClientOptions) (net.Conn, error) {
return nil, fmt.Errorf("not implemented open connection func")
}
options := &ClientOptions{}
options = options.SetCustomOpenConectionFn(customConnectionFunc)
if options.CustomOpenConnectionFn == nil {
t.Error("custom open connection function cannot be set")
}
}