From 20e949e2d35137012a706035a0f188db562d75f8 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 6 Sep 2022 18:57:31 +0200 Subject: [PATCH] use PublishWithContext (#115) Signed-off-by: Gabriele Santomaggio Signed-off-by: Gabriele Santomaggio --- _examples/pubsub/pubsub.go | 6 +++++- _examples/simple-producer/producer.go | 5 ++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/_examples/pubsub/pubsub.go b/_examples/pubsub/pubsub.go index e30cd51..50c0937 100644 --- a/_examples/pubsub/pubsub.go +++ b/_examples/pubsub/pubsub.go @@ -15,6 +15,7 @@ import ( "io" "log" "os" + "time" amqp "github.com/rabbitmq/amqp091-go" ) @@ -87,6 +88,9 @@ func redial(ctx context.Context, url string) chan chan session { // publish publishes messages to a reconnecting session to a fanout exchange. // It receives from the application specific source of messages. func publish(sessions chan chan session, messages <-chan message) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + for session := range sessions { var ( running bool @@ -122,7 +126,7 @@ func publish(sessions chan chan session, messages <-chan message) { case body = <-pending: routingKey := "ignored for fanout exchanges, application dependent for other exchanges" - err := pub.Publish(exchange, routingKey, false, false, amqp.Publishing{ + err := pub.PublishWithContext(ctx, exchange, routingKey, false, false, amqp.Publishing{ Body: body, }) // Retry failed delivery on the next session diff --git a/_examples/simple-producer/producer.go b/_examples/simple-producer/producer.go index c2f9edf..81407cf 100644 --- a/_examples/simple-producer/producer.go +++ b/_examples/simple-producer/producer.go @@ -4,6 +4,7 @@ package main import ( + "context" "flag" "fmt" "log" @@ -101,12 +102,14 @@ func publish(done chan bool, amqpURI, exchange, exchangeType, routingKey, body s } Log.Println("declared Exchange, publishing messages") + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() for { seqNo := channel.GetNextPublishSeqNo() Log.Printf("publishing %dB body (%q)", len(body), body) - if err := channel.Publish( + if err := channel.PublishWithContext(ctx, exchange, // publish to an exchange routingKey, // routing to 0 or more queues false, // mandatory