Skip to content

This module is a wrapper for streadway/amqp package

Notifications You must be signed in to change notification settings

astreter/amqpwrapper

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

36 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

amqp-wrapper

This module is a wrapper for rabbitmq/amqp091-go package

Installation

go get -u github.com/astreter/amqpwrapper/v2

Publishing messages to a queue example

import (
    "context"
    "github.com/astreter/amqpwrapper/v2"
    "sync"
)

func main() {
    ctx, cancelF := context.WithCancel(context.Background())
    
    wg := new(sync.WaitGroup)
    
    amqp, err := amqpwrapper.NewRabbitChannel(
        ctx,
        wg, // application can wait until all ongoing deliveries are processed
        &amqpwrapper.Config{
            URL: "amqp://user:password@localhost:5672",
            Debug: true, // true - amqpwrapper writes logs about each delivery
            ConfirmSends: true // true - the publisher waits for confirmation from RabbitMQ server that a message has been delivered 
        },
    )
    if err != nil {
        panic(err)
    }
    
    go func() {
        <-amqp.Cancel() // such channel informs that amqp was lost and failed to reconnect
        cancelF()
    }()

	request := map[string]interface{}{"send": "something"}
    if err = amqp.Publish(context.Background(), request, "exchange_name", "routing_key"); err != nil {
        panic(err)
    }

    <-ctx.Done()
    wg.Wait()
}

Receiving messages from a queue example

import (
    "context"
    "github.com/astreter/amqpwrapper/v2"
    "sync"
)

func main() {
    ctx, cancelF := context.WithCancel(context.Background())
    
    wg := new(sync.WaitGroup)
    
    amqp, err := amqpwrapper.NewRabbitChannel(
        ctx,
        wg, // application can wait until all ongoing deliveries are processed
        &amqpwrapper.Config{
            URL: "amqp://user:password@localhost:5672",
            Debug: true, // true - amqpwrapper writes logs about each delivery
            ConfirmSends: true // true - the publisher waits for confirmation from RabbitMQ server that a message has been delivered 
        },
    )
    if err != nil {
        panic(err)
    }
    
    go func() {
        <-amqp.Cancel() // such channel informs that amqp was lost and failed to reconnect
        cancelF()
    }()


    if err = amqp.DefineExchange(
		"exchange_name",
		false, // false - creates new exchange if it doesn't exist yet; true - relies on such exchange already exists
    ); err != nil {
        panic(err)
    }

    if err = amqp.SetUpConsumer(
        "exchange_name",
        "routing_key",
        func(ctx context.Context, delivery amqp.Delivery){
            //todo: process the delivery
        },
        amqpwrapper.WithOptionThreads(5), // number of threads which can process deliveries in parallel
    ); err != nil {
        panic(err)
    }

    <-ctx.Done()
    wg.Wait()
}

About

This module is a wrapper for streadway/amqp package

Resources

Stars

Watchers

Forks

Packages

No packages published

Languages