Skip to content

Commit

Permalink
Add AMQP transport (#746) (#746)
Browse files Browse the repository at this point in the history
amqp transport publisher

amqp transport tests

lint fixes for amqp transport

fixed formatting and punctuation

zzz

default ContentType is null, increased max length of correlationId to 255

refractored subscriber EncodeResponseFunc into encode func, and send func

zzz
  • Loading branch information
mattfung authored and peterbourgon committed Nov 8, 2018
1 parent ba206c1 commit 12210fb
Show file tree
Hide file tree
Showing 8 changed files with 1,249 additions and 0 deletions.
2 changes: 2 additions & 0 deletions transport/amqp/doc.go
@@ -0,0 +1,2 @@
// Package amqp implements an AMQP transport.
package amqp
22 changes: 22 additions & 0 deletions transport/amqp/encode-decode.go
@@ -0,0 +1,22 @@
package amqp

import (
"context"
"github.com/streadway/amqp"
)

// DecodeRequestFunc extracts a user-domain request object from
// an AMQP Delivery object. It is designed to be used in AMQP Subscribers.
type DecodeRequestFunc func(context.Context, *amqp.Delivery) (request interface{}, err error)

// EncodeRequestFunc encodes the passed request object into
// an AMQP Publishing object. It is designed to be used in AMQP Publishers.
type EncodeRequestFunc func(context.Context, *amqp.Publishing, interface{}) error

// EncodeResponseFunc encodes the passed reponse object to
// an AMQP Publishing object. It is designed to be used in AMQP Subscribers.
type EncodeResponseFunc func(context.Context, *amqp.Publishing, interface{}) error

// DecodeResponseFunc extracts a user-domain response object from
// an AMQP Delivery object. It is designed to be used in AMQP Publishers.
type DecodeResponseFunc func(context.Context, *amqp.Delivery) (response interface{}, err error)
152 changes: 152 additions & 0 deletions transport/amqp/publisher.go
@@ -0,0 +1,152 @@
package amqp

import (
"context"
"time"

"github.com/go-kit/kit/endpoint"
"github.com/streadway/amqp"
)

// The golang AMQP implementation requires the []byte representation of
// correlation id strings to have a maximum length of 255 bytes.
const maxCorrelationIdLength = 255

// Publisher wraps an AMQP channel and queue, and provides a method that
// implements endpoint.Endpoint.
type Publisher struct {
ch Channel
q *amqp.Queue
enc EncodeRequestFunc
dec DecodeResponseFunc
before []RequestFunc
after []PublisherResponseFunc
timeout time.Duration
}

// NewPublisher constructs a usable Publisher for a single remote method.
func NewPublisher(
ch Channel,
q *amqp.Queue,
enc EncodeRequestFunc,
dec DecodeResponseFunc,
options ...PublisherOption,
) *Publisher {
p := &Publisher{
ch: ch,
q: q,
enc: enc,
dec: dec,
timeout: 10 * time.Second,
}
for _, option := range options {
option(p)
}
return p
}

// PublisherOption sets an optional parameter for clients.
type PublisherOption func(*Publisher)

// PublisherBefore sets the RequestFuncs that are applied to the outgoing AMQP
// request before it's invoked.
func PublisherBefore(before ...RequestFunc) PublisherOption {
return func(p *Publisher) { p.before = append(p.before, before...) }
}

// PublisherAfter sets the ClientResponseFuncs applied to the incoming AMQP
// request prior to it being decoded. This is useful for obtaining anything off
// of the response and adding onto the context prior to decoding.
func PublisherAfter(after ...PublisherResponseFunc) PublisherOption {
return func(p *Publisher) { p.after = append(p.after, after...) }
}

// PublisherTimeout sets the available timeout for an AMQP request.
func PublisherTimeout(timeout time.Duration) PublisherOption {
return func(p *Publisher) { p.timeout = timeout }
}

// Endpoint returns a usable endpoint that invokes the remote endpoint.
func (p Publisher) Endpoint() endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
ctx, cancel := context.WithTimeout(ctx, p.timeout)
defer cancel()

pub := amqp.Publishing{
ReplyTo: p.q.Name,
CorrelationId: randomString(randInt(5, maxCorrelationIdLength)),
}

if err := p.enc(ctx, &pub, request); err != nil {
return nil, err
}

for _, f := range p.before {
ctx = f(ctx, &pub)
}

deliv, err := p.publishAndConsumeFirstMatchingResponse(ctx, &pub)
if err != nil {
return nil, err
}

for _, f := range p.after {
ctx = f(ctx, deliv)
}
response, err := p.dec(ctx, deliv)
if err != nil {
return nil, err
}

return response, nil
}
}

// publishAndConsumeFirstMatchingResponse publishes the specified Publishing
// and returns the first Delivery object with the matching correlationId.
// If the context times out while waiting for a reply, an error will be returned.
func (p Publisher) publishAndConsumeFirstMatchingResponse(
ctx context.Context,
pub *amqp.Publishing,
) (*amqp.Delivery, error) {
err := p.ch.Publish(
getPublishExchange(ctx),
getPublishKey(ctx),
false, //mandatory
false, //immediate
*pub,
)
if err != nil {
return nil, err
}
autoAck := getConsumeAutoAck(ctx)

msg, err := p.ch.Consume(
p.q.Name,
"", //consumer
autoAck,
false, //exclusive
false, //noLocal
false, //noWait
getConsumeArgs(ctx),
)
if err != nil {
return nil, err
}

for {
select {
case d := <-msg:
if d.CorrelationId == pub.CorrelationId {
if !autoAck {
d.Ack(false) //multiple
}
return &d, nil
}

case <-ctx.Done():
return nil, ctx.Err()
}
}

}

0 comments on commit 12210fb

Please sign in to comment.