Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sample doc format update #1142

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 19 additions & 1 deletion example_test.go
Expand Up @@ -24,23 +24,41 @@ import (

// Shows different ways to create a Conn.
func ExampleConnect() {
// Simple connect, no options
nc, _ := nats.Connect("demo.nats.io")
nc.Close()

// Connect with username/password
nc, _ = nats.Connect("nats://derek:secretpassword@demo.nats.io:4222")
nc.Close()

// Establish TLS connection
nc, _ = nats.Connect("tls://derek:secretpassword@demo.nats.io:4443")
nc.Close()

// Connect to multiple servers
nc, _ = nats.Connect("demo.nats.io,nats://127.0.0.1:1223,nats://127.0.0.1:1224")
nc.Close()

// Connect with Options
nc, _ = nats.Connect("demo.nats.io",
nats.MaxReconnects(10),
nats.ReconnectWait(5*time.Second),
nats.Timeout(1*time.Second))

nc.Close()
}

func ExampleOptions_Connect() {
opts := nats.Options{
Url: "demo.nats.io",
AllowReconnect: true,
MaxReconnect: 10,
ReconnectWait: 5 * time.Second,
Timeout: 1 * time.Second,
}

nc, _ = opts.Connect()
nc, _ := opts.Connect()
nc.Close()
}

Expand Down
209 changes: 140 additions & 69 deletions nats.go
Expand Up @@ -46,9 +46,15 @@ import (
"github.com/nats-io/nats.go/util"
)

// Default Constants
// Client version and language name.
// Used by nats-server to help identify a client connection.
const (
Version = "1.20.0"
LangString = "go"
)

// Default values used by [Connect] and [GetDefaultOptions].
const (
Version = "1.20.0"
DefaultURL = "nats://127.0.0.1:4222"
DefaultPort = 4222
DefaultMaxReconnect = 60
Expand All @@ -62,9 +68,11 @@ const (
DefaultReconnectBufSize = 8 * 1024 * 1024 // 8MB
RequestChanLen = 8
DefaultDrainTimeout = 30 * time.Second
LangString = "go"
)

// Internal constants used to identify errors returned by nats-server.
//
// These are for internal use. To identify errors returned by the client, use predefined error variables.
const (
// STALE_CONNECTION is for detection and proper handling of stale connections.
STALE_CONNECTION = "stale connection"
Expand All @@ -88,17 +96,42 @@ const (
MAX_CONNECTIONS_ERR = "maximum connections exceeded"
)

// Errors
// # NATS Errors
//
// Generic errors returned by Core NATS client.
var (
ErrConnectionClosed = errors.New("nats: connection closed")
ErrConnectionDraining = errors.New("nats: connection draining")
ErrDrainTimeout = errors.New("nats: draining connection timed out")
// ErrConnectionClosed is returned when attempting to perform any server interaction
// when the connection was already closed.
ErrConnectionClosed = errors.New("nats: connection closed")

// ErrConnectionDraining is returned when attempting
// to publish or subscribe on a connection in draining state.
ErrConnectionDraining = errors.New("nats: connection draining")

// ErrDrainTimeout is returned when Drain() times out before drainig all active subscriptions.
// Drain timeout can be configured using DrainTimeout connection option.
ErrDrainTimeout = errors.New("nats: draining connection timed out")

// ErrDrainTimeout is returned when Drain() is called when NATS
// connection is in either of CONNECTING or RECONNECTING state.
ErrConnectionReconnecting = errors.New("nats: connection reconnecting")
ErrSecureConnRequired = errors.New("nats: secure connection required")
ErrSecureConnWanted = errors.New("nats: secure connection not available")
ErrBadSubscription = errors.New("nats: invalid subscription")
ErrTypeSubscription = errors.New("nats: invalid subscription type")
ErrBadSubject = errors.New("nats: invalid subject")

// ErrSecureConnWanted is returned when Secure connection option is used,
// but the server does not support TLS connections.
ErrSecureConnWanted = errors.New("nats: secure connection not available")

// ErrBadSubscription is a multi-purpose error returned when a subscription is not valid.
// It can either be nil, closed, or configured inproperly.
ErrBadSubscription = errors.New("nats: invalid subscription")

// ErrTypeSubscription is returned when attempting to invoke an operation on Subscription
// which is not supported by given Subscription type.
ErrTypeSubscription = errors.New("nats: invalid subscription type")

// ErrBadSubject is returned when subject validation fails (subject is either empty or has invalid format).
ErrBadSubject = errors.New("nats: invalid subject")

// ErrBadQueueName is returned when queue name validation fails.
ErrBadQueueName = errors.New("nats: invalid queue name")
ErrSlowConsumer = errors.New("nats: slow consumer, messages dropped")
ErrTimeout = errors.New("nats: timeout")
Expand Down Expand Up @@ -139,13 +172,17 @@ var (
ErrNoResponders = errors.New("nats: no responders available for request")
ErrMaxConnectionsExceeded = errors.New("nats: server maximum connections exceeded")
ErrConnectionNotTLS = errors.New("nats: connection is not tls")

// DEPRECATED: not used
ErrSecureConnRequired = errors.New("nats: secure connection required")
)

func init() {
rand.Seed(time.Now().UnixNano())
}

// GetDefaultOptions returns default configuration options for the client.
// For detailed usage of specific values, see [Options].
func GetDefaultOptions() Options {
return Options{
AllowReconnect: true,
Expand All @@ -163,13 +200,15 @@ func GetDefaultOptions() Options {
}

// DEPRECATED: Use GetDefaultOptions() instead.
//
// DefaultOptions is not safe for use by multiple clients.
// For details see #308.
var DefaultOptions = GetDefaultOptions()

// Status represents the state of the connection.
type Status int

// All possible connection states. Connection state can be verified using [Conn.Status]
const (
DISCONNECTED = Status(iota)
CONNECTED
Expand All @@ -180,6 +219,7 @@ const (
DRAINING_PUBS
)

// String returns a string representation of client connection state.
func (s Status) String() string {
switch s {
case DISCONNECTED:
Expand All @@ -200,36 +240,41 @@ func (s Status) String() string {
return "unknown status"
}

// ConnHandler is used for asynchronous events such as
// disconnected and closed connections.
type ConnHandler func(*Conn)

// ConnErrHandler is used to process asynchronous events like
// disconnected connection with the error (if any).
type ConnErrHandler func(*Conn, error)

// ErrHandler is used to process asynchronous errors encountered
// while processing inbound messages.
type ErrHandler func(*Conn, *Subscription, error)

// UserJWTHandler is used to fetch and return the account signed
// JWT for this user.
type UserJWTHandler func() (string, error)

// SignatureHandler is used to sign a nonce from the server while
// authenticating with nkeys. The user should sign the nonce and
// return the raw signature. The client will base64 encode this to
// send to the server.
type SignatureHandler func([]byte) ([]byte, error)

// AuthTokenHandler is used to generate a new token.
type AuthTokenHandler func() string

// ReconnectDelayHandler is used to get from the user the desired
// delay the library should pause before attempting to reconnect
// again. Note that this is invoked after the library tried the
// whole list of URLs and failed to reconnect.
type ReconnectDelayHandler func(attempts int) time.Duration
// Handler functions used to configure callbacks in [Options]
type (
// ConnHandler is used for handling asynchronous connection events in callbacks.
// It is used in several callbacks on [Options].
ConnHandler func(*Conn)

// ConnErrHandler is used to process asynchronous connection events like
// disconnected connection with the error (if any).
// It is used in several callbacks on [Options].
ConnErrHandler func(*Conn, error)

// ErrHandler is used to process asynchronous errors encountered
// while processing inbound messages.
// Used to configure AsyncErrorCB on [Options].
ErrHandler func(*Conn, *Subscription, error)

// UserJWTHandler is used to fetch and return the account signed
// JWT for this user.
UserJWTHandler func() (string, error)

// SignatureHandler is used to sign a nonce from the server while
// authenticating with nkeys. The user should sign the nonce and
// return the raw signature. The client will base64 encode this to
// send to the server.
SignatureHandler func([]byte) ([]byte, error)

// AuthTokenHandler is used to generate a new token.
AuthTokenHandler func() string

// ReconnectDelayHandler is used to get from the user the desired
// delay the library should pause before attempting to reconnect
// again. Note that this is invoked after the library tried the
// whole list of URLs and failed to reconnect.
ReconnectDelayHandler func(attempts int) time.Duration
)

// asyncCB is used to preserve order for async callbacks.
type asyncCB struct {
Expand All @@ -245,19 +290,25 @@ type asyncCallbacksHandler struct {
}

// Option is a function on the options for a connection.
// It is used to enable passing connection options to [Connect].
type Option func(*Options) error

// CustomDialer can be used to specify any dialer, not necessarily
// a *net.Dialer.
// CustomDialer can be used to specify any dialer for a connection,
// not necessarily a [*net.Dialer].
type CustomDialer interface {
Dial(network, address string) (net.Conn, error)
}

// InProcessConnProvider is used to configure a connection
// to the NATS server running in the same process.
type InProcessConnProvider interface {
InProcessConn() (net.Conn, error)
}

// Options can be used to create a customized connection.
// Options can be used to create a customized NATS server connection.
//
// It exposes [Options.Connect] method to established the connection with the provided configuration.
// Alternatively, it is used configure [Connect] function via [Option] type functions.
type Options struct {

// Url represents a single NATS server url to which the client
Expand Down Expand Up @@ -496,13 +547,22 @@ const (
)

// A Conn represents a bare connection to a nats-server.
// It can send and receive []byte payloads.
//
// It exposes methods used across the client to interact with the server:
// - Publishing messages
// - Creating subscriptions to receive messages
// - Getting information about the connected servers
// - Creating JetStreamContext to interact with JetStream API (if enabled)
// - Flushing the connection
//
// The connection is safe to use in multiple Go routines concurrently.
type Conn struct {
// Keep all members for which we use atomic at the beginning of the
// struct and make sure they are all 64bits (or use padding if necessary).
// atomic.* functions crash on 32bit machines if operand is not aligned
// at 64bit. See https://github.com/golang/go/issues/599

// Statistics on the current connection (number of in/out messages, bytes and number of reconnects).
Statistics
mu sync.RWMutex
// Opts holds the configuration of the Conn.
Expand Down Expand Up @@ -607,30 +667,33 @@ type Subscription struct {
dropped int
}

// Msg represents a message delivered by NATS. This structure is used
// Msg represents a message sent to or delivered by NATS. This structure is used
// by Subscribers and PublishMsg().
//
// # Types of Acknowledgements
//
// In case using JetStream, there are multiple ways to ack a Msg:
//
// // Acknowledgement that a message has been processed.
// msg.Ack()
//
// // Negatively acknowledges a message.
// msg.Nak()
//
// // Terminate a message so that it is not redelivered further.
// msg.Term()
//
// // Signal the server that the message is being worked on and reset redelivery timer.
// msg.InProgress()
// - [Msg.Ack]
// - [Msg.Nak]
// - [Msg.Term]
// - [Msg.InProgress]
type Msg struct {
// Subject contains the subject on which the message is sent/delivered.
Subject string
Reply string
Header Header
Data []byte
Sub *Subscription

// Reply is the subjet on which the requestor expects the response.
Reply string

// Header is a map of message headers.
Header Header

// Data contains payload of the message.
Data []byte

// Sub contains the Subscription associated with the received message.
// When using PublishMsg(), this field can be empty and is not used.
Sub *Subscription

// Internal
next *Msg
wsz int
Expand Down Expand Up @@ -773,11 +836,18 @@ type connectInfo struct {
type MsgHandler func(msg *Msg)

// Connect will attempt to connect to the NATS system.
// The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222
// Comma separated arrays are also supported, e.g. urlA, urlB.
// Options start with the defaults but can be overridden.
// To connect to a NATS Server's websocket port, use the `ws` or `wss` scheme, such as
// `ws://localhost:8080`. Note that websocket schemes cannot be mixed with others (nats/tls).
//
// The url can contain username/password semantics, e.g. nats://derek:pass@localhost:4222
//
// Comma-separated list of URLs can be passed in order to connect to a cluster.
// Client will randomize the order in which connection attempts will be executed.
//
// nats.Connect("nats://127.0.0.1:1222,nats://127.0.0.1:1223,nats://127.0.0.1:1224")
//
// Options start with the defaults [GetDefaultOptions] but can be overridden.
//
// To connect to a NATS Server's websocket port, use the "ws" or "wss" scheme, such as
// ws://localhost:8080. Note that websocket schemes cannot be mixed with others (nats/tls).
func Connect(url string, options ...Option) (*Conn, error) {
opts := GetDefaultOptions()
opts.Servers = processUrlString(url)
Expand Down Expand Up @@ -908,6 +978,7 @@ func ReconnectWait(t time.Duration) Option {

// MaxReconnects is an Option to set the maximum number of reconnect attempts.
// Defaults to 60.
// This option can be used to configure [Connect].
func MaxReconnects(max int) Option {
return func(o *Options) error {
o.MaxReconnect = max
Expand Down Expand Up @@ -3412,7 +3483,7 @@ func (nc *Conn) Publish(subj string, data []byte) error {
}

// Header represents the optional Header for a NATS message,
// based on the implementation of http.Header.
// based on the implementation of [http.Header].
type Header map[string][]string

// Add adds the key, value pair to the header. It is case-sensitive
Expand Down