diff --git a/README.md b/README.md index fa6779c..d20e80e 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,7 @@ Tools like the Rabbitmq *[Management](https://www.rabbitmq.com/management.html)* ### Configuration -In order to configure rabbitmq host, port, username and password, add the following inside the connections array in config/queue.php file: +In order to configure rabbitmq host, port, username, password and configure the timeout values too, add the following inside the connections array in config/queue.php file: ```php 'rabbitmq' => [ @@ -42,9 +42,14 @@ In order to configure rabbitmq host, port, username and password, add the follow 'port' => port, 'username' => 'username', 'password' => 'password', + 'connection_timeout' => 30, + 'read_write_timeout' => 30, + 'heartbeat' => 15, ], ``` +The default value for `connection_timeout` and `read_write_timeout` is set to 30 (seconds) and `heartbeat` is set to 15 (seconds). + And register the service provider by adding `Vinelab\Bowler\BowlerServiceProvider::class` to the providers array in `config/app`. ### Producer diff --git a/src/BowlerServiceProvider.php b/src/BowlerServiceProvider.php index 79de40e..4a18ba9 100644 --- a/src/BowlerServiceProvider.php +++ b/src/BowlerServiceProvider.php @@ -11,6 +11,7 @@ /** * @author Ali Issa * @author Kinane Domloje + * @author Charalampos Raftopoulos */ class BowlerServiceProvider extends ServiceProvider { @@ -29,13 +30,17 @@ public function register() return $app['vinelab.bowler.registrator']; }); - // Bind connection to env configuration - $rbmqHost = config('queue.connections.rabbitmq.host'); - $rbmqPort = config('queue.connections.rabbitmq.port'); - $rbmqUsername = config('queue.connections.rabbitmq.username'); - $rbmqPassword = config('queue.connections.rabbitmq.password'); - $this->app->bind(Connection::class, function () use ($rbmqHost, $rbmqPort, $rbmqUsername, $rbmqPassword) { - return new Connection($rbmqHost, $rbmqPort, $rbmqUsername, $rbmqPassword); + $this->app->bind(Connection::class, function () { + // Bind connection to env configuration + $rbmqHost = config('queue.connections.rabbitmq.host'); + $rbmqPort = config('queue.connections.rabbitmq.port'); + $rbmqUsername = config('queue.connections.rabbitmq.username'); + $rbmqPassword = config('queue.connections.rabbitmq.password'); + $rbmqConnectionTimeout = config('queue.connections.rabbitmq.connection_timeout') ? (int) config('queue.connections.rabbitmq.connection_timeout') : 30; + $rbmqReadWriteTimeout = config('queue.connections.rabbitmq.read_write_timeout') ? (int) config('queue.connections.rabbitmq.read_write_timeout') : 30; + $rbmqHeartbeat = config('queue.connections.rabbitmq.heartbeat') ? (int) config('queue.connections.rabbitmq.heartbeat') : 15; + + return new Connection($rbmqHost, $rbmqPort, $rbmqUsername, $rbmqPassword, $rbmqConnectionTimeout, $rbmqReadWriteTimeout, $rbmqHeartbeat); }); $this->app->bind( diff --git a/src/Connection.php b/src/Connection.php index 6309102..b4caf4b 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -58,35 +58,75 @@ class Connection */ private $password = 'guest'; + /** + * RabbitMQ connection timeout. + * + * @var int + */ + private $connectionTimeout = 30; + + /** + * RabbitMQ read/write timeout. + * + * @var int + */ + private $readWriteTimeout = 30; + + /** + * RabbitMQ heartbeat frequency. + * + * @var int + */ + private $heartbeat = 15; + /** * @param string $host the ip of the rabbitmq server, default: localhost * @param int $port. default: 5672 * @param string $username, default: guest * @param string $password, default: guest + * @param int $connectionTimeout, default: 30 + * @param int $readWriteTimeout, default: 30 + * @param int $heartbeat, default: 15 */ - public function __construct($host = 'localhost', $port = 5672, $username = 'guest', $password = 'guest') + public function __construct($host = 'localhost', $port = 5672, $username = 'guest', $password = 'guest', $connectionTimeout = 30, $readWriteTimeout = 30, $heartbeat = 15) { $this->host = $host; - $this->poart = $port; + $this->port = $port; $this->username = $username; $this->password = $password; + $this->connectionTimeout = $connectionTimeout; + $this->readWriteTimeout = $readWriteTimeout; + $this->heartbeat = $heartbeat; + + $this->initAMQPStreamConnection($host, $port, $username, $password, $connectionTimeout, $readWriteTimeout, $heartbeat); + } - $this->connection = new AMQPStreamConnection( - $host, - $port, - $username, - $password, - $vhost = '/', - $insist = false, - $login_method = 'AMQPLAIN', - $login_response = null, - $locale = 'en_US', - $connection_timeout = 30, - $read_write_timeout = 30, - $context = null, - $keepalive = false, - $heartbeat = 15 - ); + protected function initAMQPStreamConnection($host, $port, $username, $password, $connectionTimeout, $readWriteTimeout, $heartbeat, $vhost = '/',$insist = false, $login_method = 'AMQPLAIN', $login_response = null, $locale = 'en_US', $context = null, $keepalive = false) + { + $vhost = '/'; + $insist = false; + $login_method = 'AMQPLAIN'; + $login_response = null; + $locale = 'en_US'; + $context = null; + $keepalive = false; + + $this->connection = app()->makeWith(AMQPStreamConnection::class, [ + 'host' => $host, + 'port' => $port, + 'user' => $username, + 'password' => $password, + 'vhost' => $vhost, + 'insist' => $insist, + 'login_method' => $login_method, + 'login_response' => $login_response, + 'locale' => $locale, + 'connection_timeout' => $connectionTimeout, + 'read_write_timeout' => $readWriteTimeout, + 'context' => $context, + 'keepalive' => $keepalive, + 'heartbeat' => $heartbeat, + ]); $this->channel = $this->connection->channel(); } diff --git a/tests/ConnectionTest.php b/tests/ConnectionTest.php index 098519a..0cc3ddc 100644 --- a/tests/ConnectionTest.php +++ b/tests/ConnectionTest.php @@ -3,8 +3,13 @@ namespace Vinelab\Bowler\Tests; use Mockery as M; +use ReflectionClass; use Vinelab\Bowler\Connection; +use PhpAmqpLib\Wire\IO\StreamIO; +use Illuminate\Broadcasting\Channel; +use Illuminate\Support\Facades\Config; use Vinelab\Http\Client as HTTPClient; +use PhpAmqpLib\Connection\AMQPStreamConnection; /** * @author Abed Halawi @@ -21,7 +26,7 @@ public function test_fetching_consumers_default() $queueName = 'the-queue'; $mClient = M::mock(HTTPClient::class); $request = [ - 'url' => 'localhost:15672/api/queues/%2F/'.$queueName, + 'url' => 'localhost:15672/api/queues/%2F/' . $queueName, 'params' => ['columns' => 'consumer_details.consumer_tag'], 'auth' => [ 'username' => 'guest', @@ -46,4 +51,45 @@ public function test_fetching_consumers_default() $this->assertEquals('response', $response); } + + public function test_set_default_configurations_values() + { + $mConnection = $this->getMockBuilder(Connection::class) + ->disableOriginalConstructor() + ->getMock(); + + $this->app->bind(Connection::class, function () use ($mConnection) { + return $mConnection; + }); + $connection = $this->app[Connection::class]; + + $this->assertAttributeEquals(15, 'heartbeat', $connection); + $this->assertAttributeEquals(30, 'readWriteTimeout', $connection); + $this->assertAttributeEquals(30, 'connectionTimeout', $connection); + } + + public function test_set_altered_configurations_values() + { + Config::set('queue.connections.rabbitmq.host', 'notlocal'); + Config::set('queue.connections.rabbitmq.port', 6666); + Config::set('queue.connections.rabbitmq.read_write_timeout', 60); + Config::set('queue.connections.rabbitmq.connection_timeout', 60); + Config::set('queue.connections.rabbitmq.heartbeat', 30); + + $mAMQPStreamConnection = M::mock(AMQPStreamConnection::class); + $this->app->bind(AMQPStreamConnection::class, function () use ($mAMQPStreamConnection) { + return $mAMQPStreamConnection; + }); + + $mChannel = M::mock(Channel::class); + $mAMQPStreamConnection->shouldReceive('channel')->once()->withNoArgs()->andReturn($mChannel); + $mAMQPStreamConnection->shouldReceive('close')->once()->withNoArgs(); + $mChannel->shouldReceive('close')->once()->withNoArgs(); + + $connection = $this->app[Connection::class]; + + $this->assertAttributeEquals(30, 'heartbeat', $connection); + $this->assertAttributeEquals(60, 'readWriteTimeout', $connection); + $this->assertAttributeEquals(60, 'connectionTimeout', $connection); + } }