Skip to content

pmorelli92/bunnify

Repository files navigation

Go Report Card GitHub license Tests Coverage Status

Bunnify is a library to publish and consume events for AMQP.

Important

While from my perspective the library is working just fine, I am still changing things here and there. Even tough the library is tagged as semver, I will start respecting it after the v1.0.0, and I won't guarantee backwards compatibility before that.

Features

Easy setup: Bunnify is designed to be easy to set up and use. Simply reference the library and start publishing and consuming events.

Automatic payload marshaling and unmarshaling: You can consume the same payload you published, without worrying about the details of marshaling and unmarshaling. Bunnify handles these actions for you, abstracting them away from the developer.

Automatic reconnection: If your connection to the AMQP server is interrupted, Bunnify will automatically handle the reconnection for you. This ensures that your events are published and consumed without interruption.

Built-in event metadata handling: The library automatically handles event metadata, including correlation IDs and other important details.

Retries and dead lettering: You can configure how many times an event can be retried and to send the event to a dead letter queue when the processing fails.

Tracing out of the box: Automatically injects and extracts traces when publishing and consuming. Minimal setup required is shown on the tracer test.

Prometheus metrics: Prometheus gatherer will collect automatically the following metrics:

  • amqp_events_received
  • amqp_events_without_handler
  • amqp_events_not_parsable
  • amqp_events_nack
  • amqp_events_processed_duration
  • amqp_events_publish_succeed
  • amqp_events_publish_failed

Only dependencies needed: The intention of the library is to avoid having lots of unneeded dependencies. I will always try to triple check the dependencies and use the least quantity of libraries to achieve the functionality required.

  • github.com/rabbitmq/amqp091-go: Handles the connection with AMQP protocol.
  • github.com/google/uuid: Generates UUID for events ID and correlation ID.
  • go.uber.org/goleak: Used on tests to verify that there are no leaks of routines on the handling of channels.
  • go.opentelemetry.io/otel: Handles the injection and extraction of the traces on the events.
  • github.com/prometheus/client_golang: Used in order to export metrics to Prometheus.

Outbox publisher: There is a submodule that you can refer with go get github.com/pmorelli92/bunnify/outbox. This publisher is wrapping the default bunnify publisher and stores all events in a database table which will be looped in an async way to be published to AMQP. You can read more here.

Motivation

Every workplace I have been had their own AMQP library. Most of the time the problems that they try to solve are reconnection, logging, correlation, handling the correct body type for events and dead letter. Most of this libraries are good but also built upon some other internal libraries and with some company's specifics that makes them impossible to open source.

Some developers are often spoiled with these as they provide a good dev experience and that is great; but if you cannot use it in side projects or if you start your own company, what is the point?

Bunnify aims to provide a flexible and adaptable solution that can be used in a variety of environments and scenarios. By abstracting away many of the technical details of AMQP publishing and consumption, Bunnify makes it easy to get started with event-driven architecture without needing to be an AMQP expert.

Installation

go get github.com/pmorelli92/bunnify

Examples

You can find all the working examples under the tests folder.

Consumer

connection := bunnify.NewConnection(
bunnify.WithURI("amqp://localhost:5672"),
bunnify.WithReconnectInterval(1*time.Second),
bunnify.WithNotificationChannel(notificationChannel))
if err := connection.Start(); err != nil {
t.Fatal(err)
}
var consumedEvent bunnify.ConsumableEvent[orderCreated]
eventHandler := func(ctx context.Context, event bunnify.ConsumableEvent[orderCreated]) error {
consumedEvent = event
return nil
}
consumer := connection.NewConsumer(
queueName,
bunnify.WithQuorumQueue(),
bunnify.WithBindingToExchange(exchangeName),
bunnify.WithHandler(routingKey, eventHandler))
if err := consumer.Consume(); err != nil {
t.Fatal(err)
}

Dead letter consumer

eventHandler := func(ctx context.Context, event bunnify.ConsumableEvent[orderCreated]) error {
return fmt.Errorf("error, this event will go to dead-letter")
}
var deadEvent bunnify.ConsumableEvent[orderCreated]
deadEventHandler := func(ctx context.Context, event bunnify.ConsumableEvent[orderCreated]) error {
deadEvent = event
return nil
}
// Exercise
connection := bunnify.NewConnection()
if err := connection.Start(); err != nil {
t.Fatal(err)
}
consumer := connection.NewConsumer(
queueName,
bunnify.WithQoS(2, 0),
bunnify.WithBindingToExchange(exchangeName),
bunnify.WithHandler(routingKey, eventHandler),
bunnify.WithDeadLetterQueue(deadLetterQueueName))
if err := consumer.Consume(); err != nil {
t.Fatal(err)
}
deadLetterConsumer := connection.NewConsumer(
deadLetterQueueName,
bunnify.WithHandler(routingKey, deadEventHandler))
if err := deadLetterConsumer.Consume(); err != nil {
t.Fatal(err)
}

Using a default handler

var consumedEvents []bunnify.ConsumableEvent[json.RawMessage]
eventHandler := func(ctx context.Context, event bunnify.ConsumableEvent[json.RawMessage]) error {
consumedEvents = append(consumedEvents, event)
return nil
}
// Bind only to queue received messages
consumer := connection.NewConsumer(
queueName,
bunnify.WithDefaultHandler(eventHandler))
if err := consumer.Consume(); err != nil {
t.Fatal(err)
}
orderCreatedEvent := orderCreated{ID: uuid.NewString()}
orderUpdatedEvent := orderUpdated{ID: uuid.NewString(), UpdatedAt: time.Now()}
publisher := connection.NewPublisher()
// Publish directly to the queue, without routing key
err := publisher.Publish(
context.TODO(),
"",
queueName,
bunnify.NewPublishableEvent(orderCreatedEvent))
if err != nil {
t.Fatal(err)
}
// Publish directly to the queue, without routing key
err = publisher.Publish(
context.TODO(),
"",
queueName,
bunnify.NewPublishableEvent(orderUpdatedEvent))
if err != nil {
t.Fatal(err)
}

Publisher

publisher := connection.NewPublisher()
orderCreatedID := uuid.NewString()
eventToPublish := bunnify.NewPublishableEvent(orderCreated{
ID: orderCreatedID,
})
err := publisher.Publish(
context.TODO(),
exchangeName,
routingKey,
eventToPublish)
if err != nil {
t.Fatal(err)
}

Enable Prometheus metrics

r := prometheus.NewRegistry()
err := bunnify.InitMetrics(r)
if err != nil {
t.Fatal(err)
}

if err = assertMetrics(r,
"amqp_events_publish_succeed",
"amqp_events_received",
"amqp_events_ack",
"amqp_events_processed_duration"); err != nil {
t.Fatal(err)
}

Enable tracing

// Setup tracing
otel.SetTracerProvider(tracesdk.NewTracerProvider())
otel.SetTextMapPropagator(propagation.TraceContext{})

publishingContext, _ := otel.Tracer("amqp").Start(context.Background(), "publish-test")
err := publisher.Publish(
publishingContext,
exchangeName,
routingKey,
bunnify.NewPublishableEvent(struct{}{}))
if err != nil {
t.Fatal(err)
}

var actualTraceID trace.TraceID
eventHandler := func(ctx context.Context, _ bunnify.ConsumableEvent[any]) error {
actualTraceID = trace.SpanFromContext(ctx).SpanContext().TraceID()
return nil
}

Retries

actualProcessing := 0
eventHandler := func(ctx context.Context, event bunnify.ConsumableEvent[orderCreated]) error {
actualProcessing++
return fmt.Errorf("error, this event should be retried")
}
// Exercise
connection := bunnify.NewConnection()
if err := connection.Start(); err != nil {
t.Fatal(err)
}
consumer := connection.NewConsumer(
queueName,
bunnify.WithQuorumQueue(),
bunnify.WithRetries(expectedRetries),
bunnify.WithBindingToExchange(exchangeName),
bunnify.WithHandler(routingKey, eventHandler))
if err := consumer.Consume(); err != nil {
t.Fatal(err)
}

actualProcessing := 0
eventHandler := func(ctx context.Context, event bunnify.ConsumableEvent[orderCreated]) error {
actualProcessing++
return fmt.Errorf("error, this event should be retried")
}
// Exercise
connection := bunnify.NewConnection()
if err := connection.Start(); err != nil {
t.Fatal(err)
}
consumer := connection.NewConsumer(
queueName,
bunnify.WithQuorumQueue(),
bunnify.WithRetries(expectedRetries),
bunnify.WithBindingToExchange(exchangeName),
bunnify.WithHandler(routingKey, eventHandler))
if err := consumer.Consume(); err != nil {
t.Fatal(err)
}

Configuration

Both the connection and consumer structs can be configured with the typical functional options. You can find the options below:

// WithURI allows the consumer to specify the AMQP Server.
// It should be in the format of amqp://0.0.0.0:5672
func WithURI(URI string) func(*connectionOption) {
return func(opt *connectionOption) {
opt.uri = URI
}
}
// WithReconnectInterval establishes how much time to wait
// between each attempt of connection.
func WithReconnectInterval(interval time.Duration) func(*connectionOption) {
return func(opt *connectionOption) {
opt.reconnectInterval = interval
}
}
// WithNotificationChannel specifies a go channel to receive messages
// such as connection established, reconnecting, event published, consumed, etc.
func WithNotificationChannel(notificationCh chan<- Notification) func(*connectionOption) {
return func(opt *connectionOption) {
opt.notificationChannel = notificationCh
}
}

// WithBindingToExchange specifies the exchange on which the queue
// will bind for the handlers provided.
func WithBindingToExchange(exchange string) func(*consumerOption) {
return func(opt *consumerOption) {
opt.exchange = exchange
}
}
// WithQoS specifies the prefetch count and size for the consumer.
func WithQoS(prefetchCount, prefetchSize int) func(*consumerOption) {
return func(opt *consumerOption) {
opt.prefetchCount = prefetchCount
opt.prefetchSize = prefetchSize
}
}
// WithQuorumQueue specifies that the queue to consume will be created as quorum queue.
// Quorum queues are used when data safety is the priority.
func WithQuorumQueue() func(*consumerOption) {
return func(opt *consumerOption) {
opt.quorumQueue = true
}
}
// WithDeadLetterQueue indicates which queue will receive the events
// that were NACKed for this consumer.
func WithDeadLetterQueue(queueName string) func(*consumerOption) {
return func(opt *consumerOption) {
opt.deadLetterQueue = queueName
}
}
// WithDefaultHandler specifies a handler that can be use for any type
// of routing key without a defined handler. This is mostly convenient if you
// don't care about the specific payload of the event, which will be received as a byte array.
func WithDefaultHandler(handler EventHandler[json.RawMessage]) func(*consumerOption) {
return func(opt *consumerOption) {
opt.defaultHandler = newWrappedHandler(handler)
}
}
// WithHandler specifies under which routing key the provided handler will be invoked.
// The routing key indicated here will be bound to the queue if the WithBindingToExchange is supplied.
func WithHandler[T any](routingKey string, handler EventHandler[T]) func(*consumerOption) {
return func(opt *consumerOption) {
opt.handlers[routingKey] = newWrappedHandler(handler)
}
}

When publishing an event, you can override the event or the correlation ID if you need. This is also achievable with options:

// WithEventID specifies the eventID to be published
// if it is not used a random uuid will be generated.
func WithEventID(eventID string) func(*eventOptions) {
return func(opt *eventOptions) {
opt.eventID = eventID
}
}
// WithCorrelationID specifies the correlationID to be published
// if it is not used a random uuid will be generated.
func WithCorrelationID(correlationID string) func(*eventOptions) {
return func(opt *eventOptions) {
opt.correlationID = correlationID
}
}