Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an example of reliable consumer #109

Merged
merged 1 commit into from Aug 18, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
94 changes: 86 additions & 8 deletions example_client_test.go
Expand Up @@ -30,13 +30,82 @@ func Example() {
addr := "amqp://guest:guest@localhost:5672/"
queue := New(queueName, addr)
message := []byte("message")
// Attempt to push a message every 2 seconds

ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*20))
defer cancel()
loop:
for {
time.Sleep(time.Second * 2)
if err := queue.Push(message); err != nil {
fmt.Printf("Push failed: %s\n", err)
} else {
fmt.Println("Push succeeded!")
select {
// Attempt to push a message every 2 seconds
case <-time.After(time.Second * 2):
if err := queue.Push(message); err != nil {
fmt.Printf("Push failed: %s\n", err)
} else {
fmt.Println("Push succeeded!")
}
case <-ctx.Done():
queue.Close()
break loop
}
}
}

func Example_consume() {
queueName := "job_queue"
addr := "amqp://guest:guest@localhost:5672/"
queue := New(queueName, addr)

// Give the connection sometime to setup
<-time.After(time.Second)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()

deliveries, err := queue.Consume()
if err != nil {
fmt.Printf("Could not start consuming: %s\n", err)
return
}

// This channel will receive a notification when a channel closed event
// happens. This must be different than Client.notifyChanClose because the
// library sends only one notification and Client.notifyChanClose already has
// a receiver in handleReconnect().
// Recommended to make it buffered to avoid deadlocks
chClosedCh := make(chan *amqp.Error, 1)
queue.channel.NotifyClose(chClosedCh)

for {
select {
case <-ctx.Done():
queue.Close()
return

case amqErr := <-chClosedCh:
// This case handles the event of closed channel e.g. abnormal shutdown
fmt.Printf("AMQP Channel closed due to: %s\n", amqErr)

deliveries, err = queue.Consume()
if err != nil {
// If the AMQP channel is not ready, it will continue the loop. Next
// iteration will enter this case because chClosedCh is closed by the
// library
fmt.Println("Error trying to consume, will try again")
continue
}

// Re-set channel to receive notifications
// The library closes this channel after abnormal shutdown
chClosedCh = make(chan *amqp.Error, 1)
queue.channel.NotifyClose(chClosedCh)

case delivery := <-deliveries:
// Ack a message every 2 seconds
fmt.Printf("Received message: %s\n", delivery.Body)
if err := delivery.Ack(false); err != nil {
fmt.Printf("Error acknowledging message: %s\n", err)
}
<-time.After(time.Second * 2)
}
}
}
Expand Down Expand Up @@ -189,15 +258,15 @@ func (client *Client) init(conn *amqp.Connection) error {
// and updates the close listener to reflect this.
func (client *Client) changeConnection(connection *amqp.Connection) {
client.connection = connection
client.notifyConnClose = make(chan *amqp.Error)
client.notifyConnClose = make(chan *amqp.Error, 1)
client.connection.NotifyClose(client.notifyConnClose)
}

// changeChannel takes a new channel to the queue,
// and updates the channel listeners to reflect this.
func (client *Client) changeChannel(channel *amqp.Channel) {
client.channel = channel
client.notifyChanClose = make(chan *amqp.Error)
client.notifyChanClose = make(chan *amqp.Error, 1)
client.notifyConfirm = make(chan amqp.Confirmation, 1)
client.channel.NotifyClose(client.notifyChanClose)
client.channel.NotifyPublish(client.notifyConfirm)
Expand Down Expand Up @@ -268,6 +337,15 @@ func (client *Client) Consume() (<-chan amqp.Delivery, error) {
if !client.isReady {
return nil, errNotConnected
}

if err := client.channel.Qos(
1, // prefetchCount
0, // prefrechSize
false, // global
); err != nil {
return nil, err
}

return client.channel.Consume(
client.queueName,
"", // Consumer
Expand Down