Skip to content

thomasvargiu/AMQPAL

Repository files navigation

AMQPAL

Build Status Code Coverage Scrutinizer Code Quality Latest Stable Version Total Downloads Latest Unstable Version License

AMQP Abstraction Layer

An abstraction layer to use different adapters.

Supported adapters:

Example

use AMQPAL\Adapter;
use AMQPAL\Options;

$options = [
    'name' => 'amqp', // or phpamqplib
    'options' => [
        'host' => 'localhost',
        'username' => 'guest',
        'password' => 'guest',
        'vhost' => '/'
    ]
];

$factory = new Adapter\AdapterFactory();
$adapter = $factory->createAdapter($options);

$connection = $adapter->getConnection();
$channel = $connection->createChannel();

/*
 * Creating exchange...
 */
$exchangeOptions = new Options\ExchangeOptions([
    'name' => 'exchange-name',
    'type' => 'direct'
]);

$exchange = $channel->createExchange($exchangeOptions);

// or:
$exchange = $channel->createExchange([
    'name' => 'exchange-name',
    'type' => 'direct'
]);

/*
 * Creating queue...
 */
$queueOptions = new Options\QueueOptions([
    'name' => 'queue-name',
]);

$queue = $channel->createQueue($queueOptions);

// or:
$queue = $channel->createQueue([
    'name' => 'queue-name',
]);

$queue->declareQueue();
$queue->bind('exchange-name');

// publishing a message...
$exchange->publish('my message in the queue');

// get the next message in the queue...
$message = $queue->get();



// or consuming a queue...
$callback = function (Adapter\Message $message, Adapter\QueueInterface $queue) {
    // ack the message...
    $queue->ack($message->getDeliveryTag());
    
    // return false to stop consuming...
    return false;
};

// set channel qos to fetch just one message at time
$channel->setQos(null, 1);
// and consuming...
$queue->consume($callback); // This is a blocking function

About

AMQP Abstraction Layer

Resources

License

Stars

Watchers

Forks

Packages

No packages published