Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
joedixon committed Nov 14, 2023
1 parent 70eea9a commit 83e6615
Show file tree
Hide file tree
Showing 6 changed files with 232 additions and 16 deletions.
27 changes: 27 additions & 0 deletions src/Http/Middleware/WebSocketMiddleware.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

namespace Laravel\Reverb\Http\Middleware;

use Closure;
use Laravel\Reverb\Server;
use Laravel\Reverb\WebSockets\Request as WebSocketRequest;
use Psr\Http\Message\ServerRequestInterface;

class WebSocketMiddleware
{
public function __construct(protected Server $server)
{

}

public function __invoke(ServerRequestInterface $request, Closure $next)
{
$wsRequest = new WebSocketRequest($request);

if (! $wsRequest->isWebSocketRequest()) {
return $next($request);
}

return $wsRequest->negotiate($request);
}
}
4 changes: 1 addition & 3 deletions src/Servers/Ratchet/Console/Commands/StartServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,9 @@ public function handle(): void
$this->subscribeToRedis($loop);
$this->scheduleCleanup($loop);

$server = ServerFactory::make($host, $port, $loop);

$this->components->info("Starting server on {$host}:{$port}");

$server->run();
ServerFactory::make($host, $port, $loop);
}

/**
Expand Down
38 changes: 25 additions & 13 deletions src/Servers/Ratchet/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@
namespace Laravel\Reverb\Servers\Ratchet;

use Illuminate\Support\Facades\App;
use Laravel\Reverb\Contracts\ApplicationProvider;
use Laravel\Reverb\Event;
use Laravel\Reverb\Http\Controllers\EventController;
use Laravel\Reverb\Http\Controllers\StatsController;
use Ratchet\Http\HttpServer;
use Ratchet\Http\Router;
use Ratchet\Server\IoServer;
use Laravel\Reverb\Http\Middleware\WebSocketMiddleware;
use Laravel\Reverb\Server;
use Psr\Http\Message\ServerRequestInterface;
use Ratchet\WebSocket\WsServer;
use React\EventLoop\Loop;
use React\EventLoop\LoopInterface;
use React\Http\HttpServer;
use React\Http\Message\Response;
use React\Socket\SocketServer;
use Symfony\Component\Routing\Matcher\UrlMatcher;
use Symfony\Component\Routing\RequestContext;
use Symfony\Component\Routing\Route;
use Symfony\Component\Routing\RouteCollection;

Expand All @@ -28,17 +30,27 @@ public static function make(string $host = '0.0.0.0', string $port = '8080', Loo

$socket = new SocketServer("{$host}:{$port}", [], $loop);

$server = new HttpServer(
$loop,
new WebSocketMiddleware(App::make(Server::class)),
function (ServerRequestInterface $request) {
$payload = json_decode($request->getBody()->getContents(), true);
parse_str($request->getUri()->getQuery(), $queryString);

$app = app(ApplicationProvider::class)->findById($queryString['appId']);

// $app = new Router(
// new UrlMatcher(static::routes(), new RequestContext)
// );
Event::dispatch($app, [
'event' => $payload['name'],
'channel' => $payload['channel'],
'data' => $payload['data'],
]);

// return new IoServer(
// new HttpServer($app),
// $socket,
// $loop
// );
return Response::json([]);
}
);

$server->listen($socket);
$loop->run();
}

/**
Expand Down
86 changes: 86 additions & 0 deletions src/WebSockets/Connection.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
<?php

namespace Laravel\Reverb\WebSockets;

use Illuminate\Support\Facades\App;
use Laravel\Reverb\Application;
use Laravel\Reverb\Concerns\GeneratesPusherIdentifiers;
use Laravel\Reverb\Connection as ReverbConnection;
use Laravel\Reverb\Contracts\ConnectionManager;
use Laravel\Reverb\Server;
use Ratchet\RFC6455\Messaging\CloseFrameChecker;
use Ratchet\RFC6455\Messaging\Message;
use Ratchet\RFC6455\Messaging\MessageBuffer;

class Connection extends ReverbConnection
{
use GeneratesPusherIdentifiers;

/**
* The normalized socket ID.
*
* @var string
*/
protected $id;

protected $buffer;

public function __construct(protected WsConnection $connection, Application $application, string $origin = null)
{
parent::__construct($application, $origin);

$this->buffer = new MessageBuffer(
new CloseFrameChecker,
onMessage: function (Message $message) {
App::make(Server::class)->message($this, $message->getPayload());
},
sender: [$connection->stream, 'write']
);

App::make(ConnectionManager::class)->for($application)->resolve(
$connection->resourceId,
fn () => $this
);

App::make(Server::class)->open($this);
$connection->stream->on('data', [$this->buffer, 'onData']);
$connection->stream->on('close', function () {
App::make(Server::class)->close($this);
});
}

/**
* Get the raw socket connection identifier.
*/
public function identifier(): string
{
return (string) $this->connection->resourceId;
}

/**
* Get the normalized socket ID.
*/
public function id(): string
{
if (! $this->id) {
$this->id = $this->generateId();
}

return $this->id;
}

public static function make(WsConnection $connection, $application, $origin)
{
return new static($connection, $application, $origin);
}

public function send(string $message): void
{
$this->buffer->sendMessage($message);
}

public function terminate(): void
{
$this->connection->stream->close();
}
}
76 changes: 76 additions & 0 deletions src/WebSockets/Request.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
<?php

namespace Laravel\Reverb\WebSockets;

use Illuminate\Support\Facades\App;
use Laravel\Reverb\Contracts\ApplicationProvider;
use Psr\Http\Message\ServerRequestInterface;
use Ratchet\RFC6455\Handshake\RequestVerifier;
use Ratchet\RFC6455\Handshake\ServerNegotiator;
use React\Http\Message\Response;
use React\Stream\CompositeStream;
use React\Stream\ThroughStream;

class Request
{
protected $connection;

public function __construct(protected ServerRequestInterface $request)
{

}

public function isWebSocketRequest()
{
$upgrade = $this->request->getHeader('Upgrade')[0] ?? null;

return $upgrade === 'websocket';
}

public function negotiate()
{
$negotiator = new ServerNegotiator(new RequestVerifier);
$response = $negotiator->handshake($this->request);

if ($response->getStatusCode() != '101') {
return false;
}

$inStream = new ThroughStream();
$outStream = new ThroughStream();

$this->connect($inStream, $outStream);

return new Response(
$response->getStatusCode(),
$response->getHeaders(),
new CompositeStream($outStream, $inStream)
);
}

public function connect($inStream, $outStream)
{
return $this->connection = Connection::make(
new WsConnection(new CompositeStream($inStream, $outStream)),
$this->application(),
$this->origin(),
);
}

public function connection()
{
return $this->connection;
}

protected function application()
{
parse_str($this->request->getUri()->getQuery(), $queryString);

return App::make(ApplicationProvider::class)->findByKey($queryString['appId']);
}

protected function origin()
{
return $this->request->getHeader('Origin')[0] ?? null;
}
}
17 changes: 17 additions & 0 deletions src/WebSockets/WsConnection.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

namespace Laravel\Reverb\WebSockets;

use React\Stream\CompositeStream;
use Illuminate\Support\Str;
use React\Stream\DuplexStreamInterface;

class WsConnection
{
public string $resourceId;

public function __construct(public DuplexStreamInterface $stream)
{
$this->resourceId = (string) Str::uuid();
}
}

0 comments on commit 83e6615

Please sign in to comment.