Skip to content

Latest commit

 

History

History
30 lines (16 loc) · 1.42 KB

README.md

File metadata and controls

30 lines (16 loc) · 1.42 KB

Outbox

Important

This submodule is couple with github.com/jackc/pgx/v5 and therefore works for Postgres databases only. This is the reason why this is shipped in a submodule instead of being included in the base module.

This submodule implements the outbox pattern on top of the Bunnify publisher.

How does it work

Whenever you call the Publish function the event is stored in a database table created only if not exists on the creation of the Publisher. This function takes a transaction, which makes your entities + creation of the event atomic.

There is a go routine that every certain duration will try to fetch the outbox events pending for publishing if any. Each one of them will be published in the same way that the bunnify Publisher does. All published events will be marked as published in the database table (or deleted if configured).

Installation

go get github.com/pmorelli92/bunnify/outbox

Examples

Setup Publisher

// Setup publisher
publisher := connection.NewPublisher()
// Setup database connection
dbCtx := context.TODO()
db, err := pgxpool.New(dbCtx, "postgresql://db:pass@localhost:5432/db")
if err != nil {
t.Fatal(err)
}
// Setup outbox publisher
outboxPublisher, err := outbox.NewPublisher(
dbCtx, db, *publisher,
outbox.WithLoopingInterval(1*time.Second),
outbox.WithNoficationChannel(notificationChannel))
if err != nil {
t.Fatal(err)
}

Publish

orderCreatedID := uuid.NewString()
eventToPublish := bunnify.NewPublishableEvent(orderCreated{ID: orderCreatedID})
publisherCtx, _ := otel.Tracer("amqp").Start(context.Background(), "outbox-publisher")
tx, err := db.Begin(dbCtx)
if err != nil {
t.Fatal(err)
}
defer func() {
_ = tx.Rollback(dbCtx)
}()
err = outboxPublisher.Publish(publisherCtx, tx, exchangeName, routingKey, eventToPublish)
if err != nil {
t.Fatal(err)
}
if err := tx.Commit(dbCtx); err != nil {
t.Fatal(err)
}

Configuration

// as it is not interesting to have the event as a timelog history.
func WithDeleteAfterPublish() func(*publisherOption) {
return func(opt *publisherOption) {
opt.deleteAfterPublish = true
}
}
// WithLoopingInterval specifies the interval on which the
// loop to check the pending to publish events is executed
func WithLoopingInterval(interval time.Duration) func(*publisherOption) {
return func(opt *publisherOption) {
opt.loopInterval = interval
}
}
// WithNoficationChannel specifies a go channel to receive messages
// such as connection established, reconnecting, event published, consumed, etc.
func WithNoficationChannel(notificationCh chan<- bunnify.Notification) func(*publisherOption) {
return func(opt *publisherOption) {
opt.notificationChannel = notificationCh
}
}