Skip to content

Commit a6dca41

Browse files
committed
RabbitMQ implementation
1 parent 84e0621 commit a6dca41

File tree

3 files changed

+121
-0
lines changed

3 files changed

+121
-0
lines changed
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
<?php
2+
3+
namespace MyCLabs\Work\Dispatcher;
4+
5+
use MyCLabs\Work\Task\Task;
6+
use PhpAmqpLib\Channel\AMQPChannel;
7+
use PhpAmqpLib\Message\AMQPMessage;
8+
9+
/**
10+
* RabbitMQ implementation.
11+
*
12+
* @author Matthieu Napoli <[email protected]>
13+
*/
14+
class RabbitMQWorkDispatcher implements WorkDispatcher
15+
{
16+
/**
17+
* @var AMQPChannel
18+
*/
19+
private $channel;
20+
21+
/**
22+
* @var string
23+
*/
24+
private $queue;
25+
26+
/**
27+
* @param AMQPChannel $channel
28+
* @param string $queue
29+
*/
30+
public function __construct(AMQPChannel $channel, $queue)
31+
{
32+
$this->channel = $channel;
33+
$this->queue = $queue;
34+
}
35+
36+
/**
37+
* {@inheritdoc}
38+
*/
39+
public function runBackground(Task $task)
40+
{
41+
$message = new AMQPMessage(
42+
serialize($task),
43+
[
44+
'delivery_mode' => 2, // make message persistent
45+
]
46+
);
47+
48+
$this->channel->basic_publish($message, '', $this->queue);
49+
}
50+
}

src/MyCLabs/Work/Dispatcher/SimpleWorkDispatcher.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public function __construct(SimpleWorker $simpleWorker)
3232

3333
/**
3434
* {@inheritdoc}
35+
* @return mixed This particular implementation can return the result since it's executed synchronously
3536
*/
3637
public function runBackground(Task $task)
3738
{
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
<?php
2+
3+
namespace MyCLabs\Work\Worker;
4+
5+
use PhpAmqpLib\Channel\AMQPChannel;
6+
7+
/**
8+
* RabbitMQ implementation.
9+
*
10+
* @author Matthieu Napoli <[email protected]>
11+
*/
12+
class RabbitMQWorker extends Worker
13+
{
14+
/**
15+
* @var AMQPChannel
16+
*/
17+
private $channel;
18+
19+
/**
20+
* @var string
21+
*/
22+
private $queue;
23+
24+
/**
25+
* @param AMQPChannel $channel
26+
* @param string $queue
27+
*/
28+
public function __construct(AMQPChannel $channel, $queue)
29+
{
30+
$this->channel = $channel;
31+
$this->queue = $queue;
32+
}
33+
34+
/**
35+
* {@inheritdoc}
36+
*/
37+
public function work()
38+
{
39+
$callback = function($message) {
40+
$this->workHandler($message);
41+
};
42+
43+
$this->channel->basic_qos(null, 1, null);
44+
$this->channel->basic_consume($this->queue, '', false, false, false, false, $callback);
45+
46+
// Loop infinitely to execute tasks
47+
while(count($this->channel->callbacks)) {
48+
$this->channel->wait();
49+
}
50+
}
51+
52+
/**
53+
* Handles a message
54+
*
55+
* @param mixed $message
56+
*/
57+
private function workHandler($message)
58+
{
59+
/** @var AMQPChannel $channel */
60+
$channel = $message->delivery_info['channel'];
61+
62+
$task = unserialize($message->body);
63+
64+
// Execute the task
65+
$this->getExecutor($task)->execute($task);
66+
67+
// Send ACK signaling the task execution is over
68+
$channel->basic_ack($message->delivery_info['delivery_tag']);
69+
}
70+
}

0 commit comments

Comments
 (0)