Skip to content

Commit

Permalink
Merge pull request #28 from Vinelab/dispatcher
Browse files Browse the repository at this point in the history
Dispatcher
  • Loading branch information
Mulkave authored Jul 11, 2018
2 parents 222f935 + 2045d2a commit af798dc
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 11 deletions.
51 changes: 49 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,10 @@ Bowler provide a default Pub/Sub implementation, where the user doesn't need to

In short, publish with a `routingKey` and consume with matching `bindingKeys`.


#### 1. Publish the Message
In your Producer:

#### In your Producer

```php
// Initialize a Bowler object with the rabbitmq server ip and port
Expand All @@ -234,8 +236,14 @@ $bowlerPublisher->publish('warning', $data);
As you might have noted, here we instantiate a `Publisher` not a `Producer` object. Publisher is a Producer specification, it holds the default Pub/Sub **exchange** setup.

##### Signature
```php
publish($routingKey, $data = null);
```

#### 2. Consume the Message
In your Consumer:

#### In your Consumer

##### i. Register the queue and generate its message handler
In your Consumer; from the command line use the `bowler:make:subscriber` command.
Expand Down Expand Up @@ -264,6 +272,45 @@ The Pub/Sub implementation is meant to be used as-is. It is possible to consume
If you would like to manually do the configuration, you can surely do so by setting up the Producer and Consumer as explained [earlier](## Usage).

##### Signature
```php
Registrator::subscriber($queue, $className, array $bindingKeys, $exchangeName = 'pub-sub', $exchangeType = 'topic');
```

## Dispatch (Work Queue)
Similar to Pub/Sub, except you may define your own exchange and messages will be distributed according to the least busy
consumer (see [Work Queue - Fair Dispatch](https://www.rabbitmq.com/tutorials/tutorial-two-php.html)).

#### 1. Dispatch the Message
Dispatch messages on a specific exchange with a `routingKey` and consumer with matching `bindingKeys`.

```php
// Initialize a Bowler object with the rabbitmq server ip and port
$connection = new Bowler\Connection();

// Initialize a Dispatcher object with a connection
$dispatcher = new Dispatcher($connection);

// Publish the message and set its required exchange name and routingKey
$dispatcher->dispatch('my-custom-exchange', 'warning', $data);
```
##### Signature
```php
dispatch($exchangeName, $routingKey, $data = null, $exchangeType = 'topic')
```

#### 2. Consume the Message
Registering queue consumers is the same as Pub/Sub, except the exchange name in the registration needs to match.

```php
// catch all the cows in the "farm" exchange
Registrator::subscriber('monitoring', 'App\Messaging\Handlers\MonitoringMessageHandler', [
'*.cow.*',
], 'farm');
```

The above will catch all the messages in the `farm` exchange that match the routing key `*.cow.*`

### Testing
If you would like to silence the Producer/Publisher to restrict it from actually sending/publishing messages to an exchange, bind it to a mock, locally in your test or globally.

Expand Down
17 changes: 16 additions & 1 deletion src/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,22 @@ class Connection
*/
public function __construct($host = 'localhost', $port = 5672, $username = 'guest', $password = 'guest')
{
$this->connection = new AMQPStreamConnection($host, $port, $username, $password);
$this->connection = new AMQPStreamConnection(
$host,
$port,
$username,
$password,
$vhost = '/',
$insist = false,
$login_method = 'AMQPLAIN',
$login_response = null,
$locale = 'en_US',
$connection_timeout = 30,
$read_write_timeout = 30,
$context = null,
$keepalive = false,
$heartbeat = 15
);

$this->channel = $this->connection->channel();
}
Expand Down
26 changes: 26 additions & 0 deletions src/Dispatcher.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php

namespace Vinelab\Bowler;

/**
* @author Abed Halawi <[email protected]>
*/
class Dispatcher extends Producer
{
/**
* Part of the fair dispatch implementation
* Allow setting the exchange name and type with default of `topic`.
*
* @param Vinelab\Bowler\Connection $connection
*/
public function __construct(Connection $connection)
{
parent::__construct($connection);
}

public function dispatch($exchangeName, $routingKey, $data = null, $exchangeType = 'topic')
{
$this->setup($exchangeName, $exchangeType);
$this->send($data, $routingKey);
}
}
16 changes: 8 additions & 8 deletions src/RegisterQueues.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public function queue($queue, $className, $options = [])
* @param string $className
* @param array $bindingKeys
*/
public function subscriber($queue, $className, array $bindingKeys)
public function subscriber($queue, $className, array $bindingKeys, $exchangeName = 'pub-sub', $exchangeType = 'topic')
{
if (empty($bindingKeys)) {
throw new InvalidSubscriberBindingException('Missing bindingKeys for Subscriber queue: '.$queue.'.');
Expand All @@ -46,13 +46,13 @@ public function subscriber($queue, $className, array $bindingKeys)
// Default pub/sub setup
// We only need the bindingKeys to enable key based pub/sub
$options = [
'exchangeName' => 'pub-sub',
'exchangeType' => 'topic',
'bindingKeys' => $bindingKeys,
'passive' => false,
'durable' => true,
'autoDelete' => false,
];
'exchangeName' => $exchangeName,
'exchangeType' => $exchangeType,
'bindingKeys' => $bindingKeys,
'passive' => false,
'durable' => true,
'autoDelete' => false,
];

$this->queue($queue, $className, $options);
}
Expand Down

0 comments on commit af798dc

Please sign in to comment.