Skip to content

Commit

Permalink
separated dead letter test into two (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
pmorelli92 committed Oct 2, 2023
1 parent f356a80 commit 76f7495
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 25 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ You can find all the working examples under the `tests` folder.

### Consumer

https://github.com/pmorelli92/bunnify/blob/aa8f63943345a8e3d092e98b9cd69cf73e7a0ec9/tests/consumer_publish_test.go#L38-L59
https://github.com/pmorelli92/bunnify/blob/f356a80625d9dcdaec12d05953447ebcc24a1b13/tests/consumer_publish_test.go#L38-L61

### Dead letter consumer

Expand Down
97 changes: 97 additions & 0 deletions tests/consumer_publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package tests

import (
"context"
"encoding/json"
"fmt"
"testing"
"time"
Expand Down Expand Up @@ -110,3 +111,99 @@ func TestConsumerPublisher(t *testing.T) {

goleak.VerifyNone(t)
}

func TestConsumerDefaultHandler(t *testing.T) {
// Setup
queueName := uuid.NewString()

type orderCreated struct {
ID string `json:"id"`
}

type orderUpdated struct {
ID string `json:"id"`
UpdatedAt time.Time `json:"updatedAt"`
}

connection := bunnify.NewConnection()
if err := connection.Start(); err != nil {
t.Fatal(err)
}

var consumedEvents []bunnify.ConsumableEvent[json.RawMessage]
eventHandler := func(ctx context.Context, event bunnify.ConsumableEvent[json.RawMessage]) error {
consumedEvents = append(consumedEvents, event)
return nil
}

// Bind only to queue received messages
consumer := connection.NewConsumer(
queueName,
bunnify.WithDefaultHandler(eventHandler))

if err := consumer.Consume(); err != nil {
t.Fatal(err)
}

orderCreatedEvent := orderCreated{ID: uuid.NewString()}
orderUpdatedEvent := orderUpdated{ID: uuid.NewString(), UpdatedAt: time.Now()}
publisher := connection.NewPublisher()

// Publish directly to the queue, without routing key
err := publisher.Publish(
context.TODO(),
"",
queueName,
bunnify.NewPublishableEvent(orderCreatedEvent))
if err != nil {
t.Fatal(err)
}

// Publish directly to the queue, without routing key
err = publisher.Publish(
context.TODO(),
"",
queueName,
bunnify.NewPublishableEvent(orderUpdatedEvent))
if err != nil {
t.Fatal(err)
}

time.Sleep(50 * time.Millisecond)

if err := connection.Close(); err != nil {
t.Fatal(err)
}

// Assert
if len(consumedEvents) != 2 {
t.Fatalf("expected 2 events, got %d", len(consumedEvents))
}

// First event should be orderCreated
var receivedOrderCreated orderCreated
err = json.Unmarshal(consumedEvents[0].Payload, &receivedOrderCreated)
if err != nil {
t.Fatal(err)
}

if orderCreatedEvent.ID != receivedOrderCreated.ID {
t.Fatalf("expected created order ID to be %s got %s", orderCreatedEvent.ID, receivedOrderCreated.ID)
}

var receivedOrderUpdated orderUpdated
err = json.Unmarshal(consumedEvents[1].Payload, &receivedOrderUpdated)
if err != nil {
t.Fatal(err)
}

if orderUpdatedEvent.ID != receivedOrderUpdated.ID {
t.Fatalf("expected updated order ID to be %s got %s", orderUpdatedEvent.ID, receivedOrderUpdated.ID)
}

if !orderUpdatedEvent.UpdatedAt.Equal(receivedOrderUpdated.UpdatedAt) {
t.Fatalf("expected updated order time to be %s got %s", orderUpdatedEvent.UpdatedAt, receivedOrderUpdated.UpdatedAt)
}

goleak.VerifyNone(t)
}
42 changes: 18 additions & 24 deletions tests/dead_letter_receives_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package tests

import (
"context"
"encoding/json"
"fmt"
"testing"
"time"
Expand Down Expand Up @@ -36,9 +35,9 @@ func TestDeadLetterReceivesEvent(t *testing.T) {
return fmt.Errorf("error, this event will go to dead-letter")
}

var eventFromDeadLetter bunnify.ConsumableEvent[json.RawMessage]
defaultHandler := func(ctx context.Context, event bunnify.ConsumableEvent[json.RawMessage]) error {
eventFromDeadLetter = event
var deadEvent bunnify.ConsumableEvent[orderCreated]
deadEventHandler := func(ctx context.Context, event bunnify.ConsumableEvent[orderCreated]) error {
deadEvent = event
return nil
}

Expand All @@ -61,7 +60,7 @@ func TestDeadLetterReceivesEvent(t *testing.T) {

deadLetterConsumer := connection.NewConsumer(
deadLetterQueueName,
bunnify.WithDefaultHandler(defaultHandler))
bunnify.WithHandler(routingKey, deadEventHandler))

if err := deadLetterConsumer.Consume(); err != nil {
t.Fatal(err)
Expand All @@ -81,32 +80,27 @@ func TestDeadLetterReceivesEvent(t *testing.T) {
}

// Assert
if publishedEvent.ID != eventFromDeadLetter.ID {
t.Fatalf("expected event ID %s, got %s", publishedEvent.ID, eventFromDeadLetter.ID)
if publishedEvent.ID != deadEvent.ID {
t.Fatalf("expected event ID %s, got %s", publishedEvent.ID, deadEvent.ID)
}
if publishedEvent.CorrelationID != eventFromDeadLetter.CorrelationID {
t.Fatalf("expected correlation ID %s, got %s", publishedEvent.CorrelationID, eventFromDeadLetter.CorrelationID)
if publishedEvent.CorrelationID != deadEvent.CorrelationID {
t.Fatalf("expected correlation ID %s, got %s", publishedEvent.CorrelationID, deadEvent.CorrelationID)
}
if !publishedEvent.Timestamp.Equal(eventFromDeadLetter.Timestamp) {
t.Fatalf("expected timestamp %s, got %s", publishedEvent.Timestamp, eventFromDeadLetter.Timestamp)
if !publishedEvent.Timestamp.Equal(deadEvent.Timestamp) {
t.Fatalf("expected timestamp %s, got %s", publishedEvent.Timestamp, deadEvent.Timestamp)
}

var jsonData map[string]interface{}
if err = json.Unmarshal(eventFromDeadLetter.Payload, &jsonData); err != nil {
t.Fatal(err)
}

if publishedOrderCreated.ID != jsonData["id"].(string) {
t.Fatalf("expected order created ID %s, got %s", publishedOrderCreated.ID, jsonData["id"].(string))
if publishedOrderCreated.ID != deadEvent.Payload.ID {
t.Fatalf("expected order created ID %s, got %s", publishedOrderCreated.ID, deadEvent.Payload.ID)
}
if exchangeName != eventFromDeadLetter.DeliveryInfo.Exchange {
t.Fatalf("expected exchange %s, got %s", exchangeName, eventFromDeadLetter.DeliveryInfo.Exchange)
if exchangeName != deadEvent.DeliveryInfo.Exchange {
t.Fatalf("expected exchange %s, got %s", exchangeName, deadEvent.DeliveryInfo.Exchange)
}
if queueName != eventFromDeadLetter.DeliveryInfo.Queue {
t.Fatalf("expected queue %s, got %s", queueName, eventFromDeadLetter.DeliveryInfo.Queue)
if queueName != deadEvent.DeliveryInfo.Queue {
t.Fatalf("expected queue %s, got %s", queueName, deadEvent.DeliveryInfo.Queue)
}
if routingKey != eventFromDeadLetter.DeliveryInfo.RoutingKey {
t.Fatalf("expected routing key %s, got %s", routingKey, eventFromDeadLetter.DeliveryInfo.RoutingKey)
if routingKey != deadEvent.DeliveryInfo.RoutingKey {
t.Fatalf("expected routing key %s, got %s", routingKey, deadEvent.DeliveryInfo.RoutingKey)
}

goleak.VerifyNone(t)
Expand Down

0 comments on commit 76f7495

Please sign in to comment.