diff --git a/src/Connection.php b/src/Connection.php index 8cf9c176..6452f529 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -5,6 +5,7 @@ use Laravel\Reverb\Concerns\GeneratesIdentifiers; use Laravel\Reverb\Contracts\Connection as ConnectionContract; use Laravel\Reverb\Events\MessageSent; +use Ratchet\RFC6455\Messaging\Frame; class Connection extends ConnectionContract { @@ -50,6 +51,14 @@ public function send(string $message): void MessageSent::dispatch($this, $message); } + /** + * Send a control frame to the connection. + */ + public function control(string $type = Frame::OP_PING): void + { + $this->connection->send(new Frame('', opcode: $type)); + } + /** * Terminate a connection. */ diff --git a/src/Contracts/Connection.php b/src/Contracts/Connection.php index b670122e..4f729796 100644 --- a/src/Contracts/Connection.php +++ b/src/Contracts/Connection.php @@ -3,6 +3,7 @@ namespace Laravel\Reverb\Contracts; use Laravel\Reverb\Application; +use Ratchet\RFC6455\Messaging\Frame; abstract class Connection { @@ -16,6 +17,11 @@ abstract class Connection */ protected $hasBeenPinged = false; + /** + * Indicates if the connection uses control frames. + */ + protected $usesControlFrames = false; + /** * Create a new connection instance. */ @@ -39,6 +45,11 @@ abstract public function id(): string; */ abstract public function send(string $message): void; + /** + * Send a control frame to the connection. + */ + abstract public function control(string $type = Frame::OP_PING): void; + /** * Terminate a connection. */ @@ -136,4 +147,22 @@ public function isStale(): bool { return $this->isInactive() && $this->hasBeenPinged; } + + /** + * Determine whether the connection uses control frames. + */ + public function usesControlFrames(): bool + { + return $this->usesControlFrames; + } + + /** + * Mark the connection as using control frames to track activity. + */ + public function setUsesControlFrames(bool $usesControlFrames = true): Connection + { + $this->usesControlFrames = $usesControlFrames; + + return $this; + } } diff --git a/src/Protocols/Pusher/EventHandler.php b/src/Protocols/Pusher/EventHandler.php index c886e90e..a9c66857 100644 --- a/src/Protocols/Pusher/EventHandler.php +++ b/src/Protocols/Pusher/EventHandler.php @@ -117,7 +117,9 @@ public function pong(Connection $connection): void */ public function ping(Connection $connection): void { - static::send($connection, 'ping'); + $connection->usesControlFrames() + ? $connection->control() + : static::send($connection, 'ping'); $connection->ping(); } diff --git a/src/Protocols/Pusher/Http/Controllers/PusherController.php b/src/Protocols/Pusher/Http/Controllers/PusherController.php index 1819cec9..3a6b265b 100644 --- a/src/Protocols/Pusher/Http/Controllers/PusherController.php +++ b/src/Protocols/Pusher/Http/Controllers/PusherController.php @@ -8,6 +8,7 @@ use Laravel\Reverb\Protocols\Pusher\Server as PusherServer; use Laravel\Reverb\Servers\Reverb\Connection; use Psr\Http\Message\RequestInterface; +use Ratchet\RFC6455\Messaging\FrameInterface; class PusherController { @@ -34,6 +35,10 @@ public function __invoke(RequestInterface $request, Connection $connection, stri fn ($message) => $this->server->message($reverbConnection, (string) $message) ); + $connection->onControl( + fn (FrameInterface $message) => $this->server->control($reverbConnection, $message) + ); + $connection->onClose( fn () => $this->server->close($reverbConnection) ); diff --git a/src/Protocols/Pusher/Server.php b/src/Protocols/Pusher/Server.php index 223a63b1..f694b927 100644 --- a/src/Protocols/Pusher/Server.php +++ b/src/Protocols/Pusher/Server.php @@ -10,6 +10,8 @@ use Laravel\Reverb\Protocols\Pusher\Contracts\ChannelManager; use Laravel\Reverb\Protocols\Pusher\Exceptions\InvalidOrigin; use Laravel\Reverb\Protocols\Pusher\Exceptions\PusherException; +use Ratchet\RFC6455\Messaging\Frame; +use Ratchet\RFC6455\Messaging\FrameInterface; class Server { @@ -70,6 +72,21 @@ public function message(Connection $from, string $message): void } } + /** + * Handle a low-level WebSocket control frame. + */ + public function control(Connection $from, FrameInterface $message): void + { + Log::info('Control Frame Received', $from->id()); + Log::message($message); + + $from->setUsesControlFrames(); + + if (in_array($message->getOpcode(), [Frame::OP_PING, Frame::OP_PONG], strict: true)) { + $from->touch(); + } + } + /** * Handle a client disconnection. */ diff --git a/src/Servers/Reverb/Connection.php b/src/Servers/Reverb/Connection.php index 3167c9ee..3bb5ebd8 100644 --- a/src/Servers/Reverb/Connection.php +++ b/src/Servers/Reverb/Connection.php @@ -27,6 +27,13 @@ class Connection extends EventEmitter implements WebSocketConnection */ protected $onMessage; + /** + * The control frame handler. + * + * @var ?callable + */ + protected $onControl; + /** * The connection close handler. * @@ -83,6 +90,10 @@ public function send(mixed $message): void */ public function control(FrameInterface $message): void { + if ($this->onControl) { + ($this->onControl)($message); + } + match ($message->getOpcode()) { Frame::OP_PING => $this->send(new Frame($message->getPayload(), opcode: Frame::OP_PONG)), Frame::OP_PONG => fn () => null, @@ -98,6 +109,14 @@ public function onMessage(callable $callback): void $this->onMessage = $callback; } + /** + * Set the control frame handler. + */ + public function onControl(callable $callback): void + { + $this->onControl = $callback; + } + /** * Set the close handler. */ diff --git a/tests/FakeConnection.php b/tests/FakeConnection.php index ea7849f6..9795095f 100644 --- a/tests/FakeConnection.php +++ b/tests/FakeConnection.php @@ -7,6 +7,7 @@ use Laravel\Reverb\Concerns\GeneratesIdentifiers; use Laravel\Reverb\Contracts\ApplicationProvider; use Laravel\Reverb\Contracts\Connection as BaseConnection; +use Ratchet\RFC6455\Messaging\Frame; class FakeConnection extends BaseConnection { @@ -97,6 +98,12 @@ public function send(string $message): void $this->messages[] = $message; } + /** + * Send a control frame to the connection. + */ + public function control(string $type = Frame::OP_PING): void { } + + /** * Terminate a connection. */ diff --git a/tests/Feature/Protocols/Pusher/Reverb/ServerTest.php b/tests/Feature/Protocols/Pusher/Reverb/ServerTest.php index 3b7cb9fa..7388d09f 100644 --- a/tests/Feature/Protocols/Pusher/Reverb/ServerTest.php +++ b/tests/Feature/Protocols/Pusher/Reverb/ServerTest.php @@ -4,6 +4,7 @@ use Laravel\Reverb\Jobs\PingInactiveConnections; use Laravel\Reverb\Jobs\PruneStaleConnections; use Laravel\Reverb\Tests\ReverbTestCase; +use Ratchet\RFC6455\Messaging\Frame; use React\Promise\Deferred; use function Ratchet\Client\connect as wsConnect; @@ -482,3 +483,57 @@ expect($response)->toContain('"ids\":[1]'); expect($response)->toContain('"hash\":{\"1\":{\"name\":\"Test User\"}}'); }); + +it('can handle a ping control frame', function () { + $connection = connect(); + subscribe('test-channel', connection: $connection); + $channels = channels(); + $managedConnection = Arr::first($channels->connections()); + $subscribedAt = $managedConnection->lastSeenAt(); + sleep(1); + $connection->send(new Frame('', opcode: Frame::OP_PING)); + + $connection->assertPonged(); + expect($managedConnection->lastSeenAt())->toBeGreaterThan($subscribedAt); +}); + +it('can handle a pong control frame', function () { + $connection = connect(); + subscribe('test-channel', connection: $connection); + $channels = channels(); + $managedConnection = Arr::first($channels->connections()); + $subscribedAt = $managedConnection->lastSeenAt(); + sleep(1); + $connection->send(new Frame('', opcode: Frame::OP_PONG)); + + $connection->assertNotPinged(); + $connection->assertNotPonged(); + expect($managedConnection->lastSeenAt())->toBeGreaterThan($subscribedAt); +}); + +it('uses pusher control messages by default', function () { + $connection = connect(); + subscribe('test-channel', connection: $connection); + + $channels = channels(); + Arr::first($channels->connections())->setLastSeenAt(time() - 60 * 10); + + (new PingInactiveConnections)->handle($channels); + + $connection->assertReceived('{"event":"pusher:ping"}'); + $connection->assertNotPinged(); +}); + +it('uses control frames when the client prefers', function () { + $connection = connect(); + $connection->send(new Frame('', opcode: Frame::OP_PING)); + subscribe('test-channel', connection: $connection); + + $channels = channels(); + Arr::first($channels->connections())->setLastSeenAt(time() - 60 * 10); + + (new PingInactiveConnections)->handle($channels); + + $connection->assertPinged(); + $connection->assertNotReceived('{"event":"pusher:ping"}'); +}); diff --git a/tests/TestConnection.php b/tests/TestConnection.php index 79c19472..e9a0f144 100644 --- a/tests/TestConnection.php +++ b/tests/TestConnection.php @@ -25,6 +25,10 @@ class TestConnection */ public $receivedMessages = []; + public $wasPinged = false; + + public $wasPonged = false; + /** * Create a new test connection instance. */ @@ -34,6 +38,14 @@ public function __construct(public WebSocket $connection) $this->receivedMessages[] = (string) $message; }); + $connection->on('ping', function () { + $this->wasPinged = true; + }); + + $connection->on('pong', function () { + $this->wasPonged = true; + }); + $connection->on('close', function ($code, $message) { $this->receivedMessages[] = (string) $message; }); @@ -89,6 +101,58 @@ public function assertReceived(string $message, ?int $count = null): void expect($this->receivedMessages)->toContain($message); } + /** + * Assert that the connection did not receiv the given message. + */ + public function assertNotReceived(string $message): void + { + if (! in_array($message, $this->receivedMessages)) { + $this->await(); + } + + expect($this->receivedMessages)->not->toContain($message); + } + + /** + * Assert the connection was pinged during the test. + */ + public function assertPinged(): void + { + $this->await(); + + expect($this->wasPinged)->toBeTrue(); + } + + /** + * Assert the connection was not pinged during the test. + */ + public function assertNotPinged(): void + { + $this->await(); + + expect($this->wasPinged)->toBeFalse(); + } + + /** + * Assert the connection was ponged during the test. + */ + public function assertPonged(): void + { + $this->await(); + + expect($this->wasPonged)->toBeTrue(); + } + + /** + * Assert the connection was not ponged during the test. + */ + public function assertNotPonged(): void + { + $this->await(); + + expect($this->wasPonged)->toBeFalse(); + } + /** * Proxy method calls to the connection. */