diff --git a/example_client_test.go b/example_client_test.go index 2bdc500..63034cf 100644 --- a/example_client_test.go +++ b/example_client_test.go @@ -30,13 +30,82 @@ func Example() { addr := "amqp://guest:guest@localhost:5672/" queue := New(queueName, addr) message := []byte("message") - // Attempt to push a message every 2 seconds + + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*20)) + defer cancel() +loop: for { - time.Sleep(time.Second * 2) - if err := queue.Push(message); err != nil { - fmt.Printf("Push failed: %s\n", err) - } else { - fmt.Println("Push succeeded!") + select { + // Attempt to push a message every 2 seconds + case <-time.After(time.Second * 2): + if err := queue.Push(message); err != nil { + fmt.Printf("Push failed: %s\n", err) + } else { + fmt.Println("Push succeeded!") + } + case <-ctx.Done(): + queue.Close() + break loop + } + } +} + +func Example_consume() { + queueName := "job_queue" + addr := "amqp://guest:guest@localhost:5672/" + queue := New(queueName, addr) + + // Give the connection sometime to setup + <-time.After(time.Second) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + + deliveries, err := queue.Consume() + if err != nil { + fmt.Printf("Could not start consuming: %s\n", err) + return + } + + // This channel will receive a notification when a channel closed event + // happens. This must be different than Client.notifyChanClose because the + // library sends only one notification and Client.notifyChanClose already has + // a receiver in handleReconnect(). + // Recommended to make it buffered to avoid deadlocks + chClosedCh := make(chan *amqp.Error, 1) + queue.channel.NotifyClose(chClosedCh) + + for { + select { + case <-ctx.Done(): + queue.Close() + return + + case amqErr := <-chClosedCh: + // This case handles the event of closed channel e.g. abnormal shutdown + fmt.Printf("AMQP Channel closed due to: %s\n", amqErr) + + deliveries, err = queue.Consume() + if err != nil { + // If the AMQP channel is not ready, it will continue the loop. Next + // iteration will enter this case because chClosedCh is closed by the + // library + fmt.Println("Error trying to consume, will try again") + continue + } + + // Re-set channel to receive notifications + // The library closes this channel after abnormal shutdown + chClosedCh = make(chan *amqp.Error, 1) + queue.channel.NotifyClose(chClosedCh) + + case delivery := <-deliveries: + // Ack a message every 2 seconds + fmt.Printf("Received message: %s\n", delivery.Body) + if err := delivery.Ack(false); err != nil { + fmt.Printf("Error acknowledging message: %s\n", err) + } + <-time.After(time.Second * 2) } } } @@ -189,7 +258,7 @@ func (client *Client) init(conn *amqp.Connection) error { // and updates the close listener to reflect this. func (client *Client) changeConnection(connection *amqp.Connection) { client.connection = connection - client.notifyConnClose = make(chan *amqp.Error) + client.notifyConnClose = make(chan *amqp.Error, 1) client.connection.NotifyClose(client.notifyConnClose) } @@ -197,7 +266,7 @@ func (client *Client) changeConnection(connection *amqp.Connection) { // and updates the channel listeners to reflect this. func (client *Client) changeChannel(channel *amqp.Channel) { client.channel = channel - client.notifyChanClose = make(chan *amqp.Error) + client.notifyChanClose = make(chan *amqp.Error, 1) client.notifyConfirm = make(chan amqp.Confirmation, 1) client.channel.NotifyClose(client.notifyChanClose) client.channel.NotifyPublish(client.notifyConfirm) @@ -268,6 +337,15 @@ func (client *Client) Consume() (<-chan amqp.Delivery, error) { if !client.isReady { return nil, errNotConnected } + + if err := client.channel.Qos( + 1, // prefetchCount + 0, // prefrechSize + false, // global + ); err != nil { + return nil, err + } + return client.channel.Consume( client.queueName, "", // Consumer