diff --git a/src/Http/Middleware/WebSocketMiddleware.php b/src/Http/Middleware/WebSocketMiddleware.php new file mode 100644 index 00000000..afdc671c --- /dev/null +++ b/src/Http/Middleware/WebSocketMiddleware.php @@ -0,0 +1,27 @@ +isWebSocketRequest()) { + return $next($request); + } + + return $wsRequest->negotiate($request); + } +} diff --git a/src/Servers/Ratchet/Console/Commands/StartServer.php b/src/Servers/Ratchet/Console/Commands/StartServer.php index 54548512..9eff3bf7 100644 --- a/src/Servers/Ratchet/Console/Commands/StartServer.php +++ b/src/Servers/Ratchet/Console/Commands/StartServer.php @@ -50,11 +50,9 @@ public function handle(): void $this->subscribeToRedis($loop); $this->scheduleCleanup($loop); - $server = ServerFactory::make($host, $port, $loop); - $this->components->info("Starting server on {$host}:{$port}"); - $server->run(); + ServerFactory::make($host, $port, $loop); } /** diff --git a/src/Servers/Ratchet/Factory.php b/src/Servers/Ratchet/Factory.php index e6587338..44437a24 100644 --- a/src/Servers/Ratchet/Factory.php +++ b/src/Servers/Ratchet/Factory.php @@ -3,17 +3,19 @@ namespace Laravel\Reverb\Servers\Ratchet; use Illuminate\Support\Facades\App; +use Laravel\Reverb\Contracts\ApplicationProvider; +use Laravel\Reverb\Event; use Laravel\Reverb\Http\Controllers\EventController; use Laravel\Reverb\Http\Controllers\StatsController; -use Ratchet\Http\HttpServer; -use Ratchet\Http\Router; -use Ratchet\Server\IoServer; +use Laravel\Reverb\Http\Middleware\WebSocketMiddleware; +use Laravel\Reverb\Server; +use Psr\Http\Message\ServerRequestInterface; use Ratchet\WebSocket\WsServer; use React\EventLoop\Loop; use React\EventLoop\LoopInterface; +use React\Http\HttpServer; +use React\Http\Message\Response; use React\Socket\SocketServer; -use Symfony\Component\Routing\Matcher\UrlMatcher; -use Symfony\Component\Routing\RequestContext; use Symfony\Component\Routing\Route; use Symfony\Component\Routing\RouteCollection; @@ -28,17 +30,27 @@ public static function make(string $host = '0.0.0.0', string $port = '8080', Loo $socket = new SocketServer("{$host}:{$port}", [], $loop); + $server = new HttpServer( + $loop, + new WebSocketMiddleware(App::make(Server::class)), + function (ServerRequestInterface $request) { + $payload = json_decode($request->getBody()->getContents(), true); + parse_str($request->getUri()->getQuery(), $queryString); + $app = app(ApplicationProvider::class)->findById($queryString['appId']); - // $app = new Router( - // new UrlMatcher(static::routes(), new RequestContext) - // ); + Event::dispatch($app, [ + 'event' => $payload['name'], + 'channel' => $payload['channel'], + 'data' => $payload['data'], + ]); - // return new IoServer( - // new HttpServer($app), - // $socket, - // $loop - // ); + return Response::json([]); + } + ); + + $server->listen($socket); + $loop->run(); } /** diff --git a/src/WebSockets/Connection.php b/src/WebSockets/Connection.php new file mode 100644 index 00000000..a655205a --- /dev/null +++ b/src/WebSockets/Connection.php @@ -0,0 +1,86 @@ +buffer = new MessageBuffer( + new CloseFrameChecker, + onMessage: function (Message $message) { + App::make(Server::class)->message($this, $message->getPayload()); + }, + sender: [$connection->stream, 'write'] + ); + + App::make(ConnectionManager::class)->for($application)->resolve( + $connection->resourceId, + fn () => $this + ); + + App::make(Server::class)->open($this); + $connection->stream->on('data', [$this->buffer, 'onData']); + $connection->stream->on('close', function () { + App::make(Server::class)->close($this); + }); + } + + /** + * Get the raw socket connection identifier. + */ + public function identifier(): string + { + return (string) $this->connection->resourceId; + } + + /** + * Get the normalized socket ID. + */ + public function id(): string + { + if (! $this->id) { + $this->id = $this->generateId(); + } + + return $this->id; + } + + public static function make(WsConnection $connection, $application, $origin) + { + return new static($connection, $application, $origin); + } + + public function send(string $message): void + { + $this->buffer->sendMessage($message); + } + + public function terminate(): void + { + $this->connection->stream->close(); + } +} diff --git a/src/WebSockets/Request.php b/src/WebSockets/Request.php new file mode 100644 index 00000000..7df7f9d8 --- /dev/null +++ b/src/WebSockets/Request.php @@ -0,0 +1,76 @@ +request->getHeader('Upgrade')[0] ?? null; + + return $upgrade === 'websocket'; + } + + public function negotiate() + { + $negotiator = new ServerNegotiator(new RequestVerifier); + $response = $negotiator->handshake($this->request); + + if ($response->getStatusCode() != '101') { + return false; + } + + $inStream = new ThroughStream(); + $outStream = new ThroughStream(); + + $this->connect($inStream, $outStream); + + return new Response( + $response->getStatusCode(), + $response->getHeaders(), + new CompositeStream($outStream, $inStream) + ); + } + + public function connect($inStream, $outStream) + { + return $this->connection = Connection::make( + new WsConnection(new CompositeStream($inStream, $outStream)), + $this->application(), + $this->origin(), + ); + } + + public function connection() + { + return $this->connection; + } + + protected function application() + { + parse_str($this->request->getUri()->getQuery(), $queryString); + + return App::make(ApplicationProvider::class)->findByKey($queryString['appId']); + } + + protected function origin() + { + return $this->request->getHeader('Origin')[0] ?? null; + } +} diff --git a/src/WebSockets/WsConnection.php b/src/WebSockets/WsConnection.php new file mode 100644 index 00000000..ad972430 --- /dev/null +++ b/src/WebSockets/WsConnection.php @@ -0,0 +1,17 @@ +resourceId = (string) Str::uuid(); + } +} \ No newline at end of file