diff --git a/_examples/simple-consumer/consumer.go b/_examples/simple-consumer/consumer.go index 6ea634d..e8771fc 100644 --- a/_examples/simple-consumer/consumer.go +++ b/_examples/simple-consumer/consumer.go @@ -66,7 +66,7 @@ type Consumer struct { } func SetupCloseHandler(consumer *Consumer) { - c := make(chan os.Signal) + c := make(chan os.Signal, 2) signal.Notify(c, os.Interrupt, syscall.SIGTERM) go func() { <-c @@ -88,8 +88,10 @@ func NewConsumer(amqpURI, exchange, exchangeType, queueName, key, ctag string) ( var err error + config := amqp.Config{Properties: amqp.NewConnectionProperties()} + config.Properties.SetClientConnectionName("sample-consumer") Log.Printf("dialing %q", amqpURI) - c.conn, err = amqp.Dial(amqpURI) + c.conn, err = amqp.DialConfig(amqpURI, config) if err != nil { return nil, fmt.Errorf("Dial: %s", err) } diff --git a/_examples/simple-producer/producer.go b/_examples/simple-producer/producer.go index 75b9cb0..c2f9edf 100644 --- a/_examples/simple-producer/producer.go +++ b/_examples/simple-producer/producer.go @@ -42,7 +42,7 @@ func main() { } func SetupCloseHandler(done chan bool) { - c := make(chan os.Signal) + c := make(chan os.Signal, 2) signal.Notify(c, os.Interrupt, syscall.SIGTERM) go func() { <-c @@ -55,8 +55,10 @@ func publish(done chan bool, amqpURI, exchange, exchangeType, routingKey, body s // This function dials, connects, declares, publishes, and tears down, // all in one go. In a real service, you probably want to maintain a // long-lived connection as state, and publish against that. + config := amqp.Config{Properties: amqp.NewConnectionProperties()} + config.Properties.SetClientConnectionName("sample-producer") Log.Printf("dialing %q", amqpURI) - connection, err := amqp.Dial(amqpURI) + connection, err := amqp.DialConfig(amqpURI, config) if err != nil { return fmt.Errorf("Dial: %s", err) } diff --git a/connection.go b/connection.go index 6269f71..04de993 100644 --- a/connection.go +++ b/connection.go @@ -76,6 +76,13 @@ type Config struct { Dial func(network, addr string) (net.Conn, error) } +// NewConnectionProperties initialises an amqp.Table struct to empty value. This +// amqp.Table can be used as Properties in amqp.Config to set the connection +// name, using amqp.DialConfig() +func NewConnectionProperties() Table { + return make(Table) +} + // Connection manages the serialization and deserialization of frames from IO // and dispatches the frames to the appropriate channel. All RPC methods and // asynchronous Publishing, Delivery, Ack, Nack and Return messages are diff --git a/connection_test.go b/connection_test.go index a0d64b0..8f8e068 100644 --- a/connection_test.go +++ b/connection_test.go @@ -268,3 +268,20 @@ func TestReaderGoRoutineTerminatesWhenMsgIsProcessedDuringClose(t *testing.T) { t.Log("waiting for go-routines to terminate") wg.Wait() } + +func TestConnectionConfigPropertiesWithClientProvidedConnectionName(t *testing.T) { + const expectedConnectionName = "amqp091-go-test" + + connectionProperties := NewConnectionProperties() + connectionProperties.SetClientConnectionName(expectedConnectionName) + + currentConnectionName, ok := connectionProperties["connection_name"] + if !ok { + t.Fatal("Connection name was not set by Table.SetClientConnectionName") + } + if currentConnectionName != expectedConnectionName { + t.Fatalf("Connection name is set to: %s. Expected: %s", + currentConnectionName, + expectedConnectionName) + } +} diff --git a/examples_test.go b/examples_test.go index d3c5caa..79bdf69 100644 --- a/examples_test.go +++ b/examples_test.go @@ -422,3 +422,15 @@ func ExampleConnection_NotifyBlocked() { // Your application domain channel setup publishings publishAllTheThings(conn) } + +func ExampleTable_SetClientConnectionName() { + // Sets the well-known connection_name property in amqp.Config. The connection + // name will be visible in RabbitMQ Management UI. + config := amqp.Config{Properties: amqp.NewConnectionProperties()} + config.Properties.SetClientConnectionName("my-client-app") + conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") + if err != nil { + log.Fatalf("connection.open: %s", err) + } + defer conn.Close() +} diff --git a/types.go b/types.go index 1f12377..80f7362 100644 --- a/types.go +++ b/types.go @@ -268,6 +268,14 @@ func (t Table) Validate() error { return validateField(t) } +// Sets the connection name property. This property can be used in +// amqp.Config to set a custom connection name during amqp.DialConfig(). This +// can be helpful to identify specific connections in RabbitMQ, for debugging or +// tracing purposes. +func (t Table) SetClientConnectionName(connName string) { + t["connection_name"] = connName +} + type message interface { id() (uint16, uint16) wait() bool