Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1.x] Optionally uses control frames for handling ping and pong #253

Merged
merged 7 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Laravel\Reverb\Concerns\GeneratesIdentifiers;
use Laravel\Reverb\Contracts\Connection as ConnectionContract;
use Laravel\Reverb\Events\MessageSent;
use Ratchet\RFC6455\Messaging\Frame;

class Connection extends ConnectionContract
{
Expand Down Expand Up @@ -50,6 +51,14 @@ public function send(string $message): void
MessageSent::dispatch($this, $message);
}

/**
* Send a control frame to the connection.
*/
public function control(string $type = Frame::OP_PING): void
{
$this->connection->send(new Frame('', opcode: $type));
}

/**
* Terminate a connection.
*/
Expand Down
29 changes: 29 additions & 0 deletions src/Contracts/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Laravel\Reverb\Contracts;

use Laravel\Reverb\Application;
use Ratchet\RFC6455\Messaging\Frame;

abstract class Connection
{
Expand All @@ -16,6 +17,11 @@ abstract class Connection
*/
protected $hasBeenPinged = false;

/**
* Indicates if the connection uses control frames.
*/
protected $usesControlFrames = false;

/**
* Create a new connection instance.
*/
Expand All @@ -39,6 +45,11 @@ abstract public function id(): string;
*/
abstract public function send(string $message): void;

/**
* Send a control frame to the connection.
*/
abstract public function control(string $type = Frame::OP_PING): void;

/**
* Terminate a connection.
*/
Expand Down Expand Up @@ -136,4 +147,22 @@ public function isStale(): bool
{
return $this->isInactive() && $this->hasBeenPinged;
}

/**
* Determine whether the connection uses control frames.
*/
public function usesControlFrames(): bool
{
return $this->usesControlFrames;
}

/**
* Mark the connection as using control frames to track activity.
*/
public function setUsesControlFrames(bool $usesControlFrames = true): Connection
{
$this->usesControlFrames = $usesControlFrames;

return $this;
}
}
4 changes: 3 additions & 1 deletion src/Protocols/Pusher/EventHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ public function pong(Connection $connection): void
*/
public function ping(Connection $connection): void
{
static::send($connection, 'ping');
$connection->usesControlFrames()
? $connection->control()
: static::send($connection, 'ping');

$connection->ping();
}
Expand Down
5 changes: 5 additions & 0 deletions src/Protocols/Pusher/Http/Controllers/PusherController.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Laravel\Reverb\Protocols\Pusher\Server as PusherServer;
use Laravel\Reverb\Servers\Reverb\Connection;
use Psr\Http\Message\RequestInterface;
use Ratchet\RFC6455\Messaging\FrameInterface;

class PusherController
{
Expand All @@ -34,6 +35,10 @@ public function __invoke(RequestInterface $request, Connection $connection, stri
fn ($message) => $this->server->message($reverbConnection, (string) $message)
);

$connection->onControl(
fn (FrameInterface $message) => $this->server->control($reverbConnection, $message)
);

$connection->onClose(
fn () => $this->server->close($reverbConnection)
);
Expand Down
17 changes: 17 additions & 0 deletions src/Protocols/Pusher/Server.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
use Laravel\Reverb\Protocols\Pusher\Contracts\ChannelManager;
use Laravel\Reverb\Protocols\Pusher\Exceptions\InvalidOrigin;
use Laravel\Reverb\Protocols\Pusher\Exceptions\PusherException;
use Ratchet\RFC6455\Messaging\Frame;
use Ratchet\RFC6455\Messaging\FrameInterface;

class Server
{
Expand Down Expand Up @@ -70,6 +72,21 @@ public function message(Connection $from, string $message): void
}
}

/**
* Handle a low-level WebSocket control frame.
*/
public function control(Connection $from, FrameInterface $message): void
{
Log::info('Control Frame Received', $from->id());
Log::message($message);

$from->setUsesControlFrames();

if (in_array($message->getOpcode(), [Frame::OP_PING, Frame::OP_PONG], strict: true)) {
$from->touch();
}
}

/**
* Handle a client disconnection.
*/
Expand Down
19 changes: 19 additions & 0 deletions src/Servers/Reverb/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ class Connection extends EventEmitter implements WebSocketConnection
*/
protected $onMessage;

/**
* The control frame handler.
*
* @var ?callable
*/
protected $onControl;

/**
* The connection close handler.
*
Expand Down Expand Up @@ -83,6 +90,10 @@ public function send(mixed $message): void
*/
public function control(FrameInterface $message): void
{
if ($this->onControl) {
($this->onControl)($message);
}

match ($message->getOpcode()) {
Frame::OP_PING => $this->send(new Frame($message->getPayload(), opcode: Frame::OP_PONG)),
Frame::OP_PONG => fn () => null,
Expand All @@ -98,6 +109,14 @@ public function onMessage(callable $callback): void
$this->onMessage = $callback;
}

/**
* Set the control frame handler.
*/
public function onControl(callable $callback): void
{
$this->onControl = $callback;
}

/**
* Set the close handler.
*/
Expand Down
7 changes: 7 additions & 0 deletions tests/FakeConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Laravel\Reverb\Concerns\GeneratesIdentifiers;
use Laravel\Reverb\Contracts\ApplicationProvider;
use Laravel\Reverb\Contracts\Connection as BaseConnection;
use Ratchet\RFC6455\Messaging\Frame;

class FakeConnection extends BaseConnection
{
Expand Down Expand Up @@ -97,6 +98,12 @@ public function send(string $message): void
$this->messages[] = $message;
}

/**
* Send a control frame to the connection.
*/
public function control(string $type = Frame::OP_PING): void { }


/**
* Terminate a connection.
*/
Expand Down
55 changes: 55 additions & 0 deletions tests/Feature/Protocols/Pusher/Reverb/ServerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use Laravel\Reverb\Jobs\PingInactiveConnections;
use Laravel\Reverb\Jobs\PruneStaleConnections;
use Laravel\Reverb\Tests\ReverbTestCase;
use Ratchet\RFC6455\Messaging\Frame;
use React\Promise\Deferred;

use function Ratchet\Client\connect as wsConnect;
Expand Down Expand Up @@ -482,3 +483,57 @@
expect($response)->toContain('"ids\":[1]');
expect($response)->toContain('"hash\":{\"1\":{\"name\":\"Test User\"}}');
});

it('can handle a ping control frame', function () {
$connection = connect();
subscribe('test-channel', connection: $connection);
$channels = channels();
$managedConnection = Arr::first($channels->connections());
$subscribedAt = $managedConnection->lastSeenAt();
sleep(1);
$connection->send(new Frame('', opcode: Frame::OP_PING));

$connection->assertPonged();
expect($managedConnection->lastSeenAt())->toBeGreaterThan($subscribedAt);
});

it('can handle a pong control frame', function () {
$connection = connect();
subscribe('test-channel', connection: $connection);
$channels = channels();
$managedConnection = Arr::first($channels->connections());
$subscribedAt = $managedConnection->lastSeenAt();
sleep(1);
$connection->send(new Frame('', opcode: Frame::OP_PONG));

$connection->assertNotPinged();
$connection->assertNotPonged();
expect($managedConnection->lastSeenAt())->toBeGreaterThan($subscribedAt);
});

it('uses pusher control messages by default', function () {
$connection = connect();
subscribe('test-channel', connection: $connection);

$channels = channels();
Arr::first($channels->connections())->setLastSeenAt(time() - 60 * 10);

(new PingInactiveConnections)->handle($channels);

$connection->assertReceived('{"event":"pusher:ping"}');
$connection->assertNotPinged();
});

it('uses control frames when the client prefers', function () {
$connection = connect();
$connection->send(new Frame('', opcode: Frame::OP_PING));
subscribe('test-channel', connection: $connection);

$channels = channels();
Arr::first($channels->connections())->setLastSeenAt(time() - 60 * 10);

(new PingInactiveConnections)->handle($channels);

$connection->assertPinged();
$connection->assertNotReceived('{"event":"pusher:ping"}');
});
64 changes: 64 additions & 0 deletions tests/TestConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ class TestConnection
*/
public $receivedMessages = [];

public $wasPinged = false;

public $wasPonged = false;

/**
* Create a new test connection instance.
*/
Expand All @@ -34,6 +38,14 @@ public function __construct(public WebSocket $connection)
$this->receivedMessages[] = (string) $message;
});

$connection->on('ping', function () {
$this->wasPinged = true;
});

$connection->on('pong', function () {
$this->wasPonged = true;
});

$connection->on('close', function ($code, $message) {
$this->receivedMessages[] = (string) $message;
});
Expand Down Expand Up @@ -89,6 +101,58 @@ public function assertReceived(string $message, ?int $count = null): void
expect($this->receivedMessages)->toContain($message);
}

/**
* Assert that the connection did not receiv the given message.
*/
public function assertNotReceived(string $message): void
{
if (! in_array($message, $this->receivedMessages)) {
$this->await();
}

expect($this->receivedMessages)->not->toContain($message);
}

/**
* Assert the connection was pinged during the test.
*/
public function assertPinged(): void
{
$this->await();

expect($this->wasPinged)->toBeTrue();
}

/**
* Assert the connection was not pinged during the test.
*/
public function assertNotPinged(): void
{
$this->await();

expect($this->wasPinged)->toBeFalse();
}

/**
* Assert the connection was ponged during the test.
*/
public function assertPonged(): void
{
$this->await();

expect($this->wasPonged)->toBeTrue();
}

/**
* Assert the connection was not ponged during the test.
*/
public function assertNotPonged(): void
{
$this->await();

expect($this->wasPonged)->toBeFalse();
}

/**
* Proxy method calls to the connection.
*/
Expand Down