Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"cookie": "^1.0.1",
"cookie-parser": "^1.4.4",
"cors": "^2.8.5",
"emittery": "^1.2.0",
"escape-string-regexp": "^5.0.0",
"explain-error": "^1.0.4",
"express": "^5.0.0",
Expand Down
64 changes: 21 additions & 43 deletions src/SocketServer.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { promisify } from 'node:util';
import lodash from 'lodash';
import sjson from 'secure-json-parse';
import { WebSocketServer } from 'ws';
import Ajv from 'ajv';
import { stdSerializers } from 'pino';
Expand Down Expand Up @@ -96,8 +95,6 @@ class SocketServer {

#logger;

#redisSubscription;

#wss;

#closing = false;
Expand Down Expand Up @@ -134,6 +131,8 @@ class SocketServer {
*/
#serverActions;

#unsubscribe;

/**
* Create a socket server.
*
Expand All @@ -157,7 +156,6 @@ class SocketServer {
req: stdSerializers.req,
},
});
this.#redisSubscription = uw.redis.duplicate();

this.options = {
/** @type {(_socket: import('ws').WebSocket | undefined, err: Error) => void} */
Expand All @@ -176,11 +174,8 @@ class SocketServer {
port: options.server ? undefined : options.port,
});

uw.use(() => this.#redisSubscription.subscribe('uwave'));
this.#redisSubscription.on('message', (channel, command) => {
// this returns a promise, but we don't handle the error case:
// there is not much we can do, so just let node.js crash w/ an unhandled rejection
this.onServerMessage(channel, command);
this.#unsubscribe = uw.events.onAny((command, data) => {
this.#onServerMessage(command, data);
});

this.#wss.on('error', (error) => {
Expand Down Expand Up @@ -517,12 +512,12 @@ class SocketServer {
/**
* Get a LostConnection for a user, if one exists.
*
* @param {User} user
* @param {string} sessionID
* @private
*/
getLostConnection(user) {
getLostConnection(sessionID) {
return this.#connections.find((connection) => (
connection instanceof LostConnection && connection.user.id === user.id
connection instanceof LostConnection && connection.sessionID === sessionID
));
}

Expand All @@ -539,7 +534,7 @@ class SocketServer {
connection.on('close', () => {
this.remove(connection);
});
connection.on('authenticate', async (user, sessionID, lastEventID) => {
connection.on('authenticate', async ({ user, sessionID, lastEventID }) => {
const isReconnect = await connection.isReconnect(sessionID);
this.#logger.info({ userId: user.id, isReconnect, lastEventID }, 'authenticated socket');
if (isReconnect) {
Expand Down Expand Up @@ -580,11 +575,7 @@ class SocketServer {
});
connection.on(
'command',
/**
* @param {string} command
* @param {import('type-fest').JsonValue} data
*/
(command, data) => {
({ command, data }) => {
this.#logger.trace({ userId: user.id, command, data }, 'command');
if (has(this.#clientActions, command)) {
// Ignore incorrect input
Expand Down Expand Up @@ -678,33 +669,20 @@ class SocketServer {
}

/**
* Handle command messages coming in from Redis.
* Handle command messages coming in from elsewhere in the app.
* Some commands are intended to broadcast immediately to all connected
* clients, but others require special action.
*
* @param {string} channel
* @param {string} rawCommand
* @returns {Promise<void>}
* @private
* @template {keyof import('./redisMessages.js').ServerActionParameters} K
* @param {K} command
* @param {import('./redisMessages.js').ServerActionParameters[K]} data
*/
async onServerMessage(channel, rawCommand) {
/**
* @type {{ command: string, data: import('type-fest').JsonValue }|undefined}
*/
const json = sjson.safeParse(rawCommand);
if (!json) {
return;
}
const { command, data } = json;
#onServerMessage(command, data) {
this.#logger.trace({ channel: command, command, data }, 'server message');

this.#logger.trace({ channel, command, data }, 'server message');

if (has(this.#serverActions, command)) {
const action = this.#serverActions[command];
if (action !== undefined) { // the types for `ServerActions` allow undefined, so...
// @ts-expect-error TS2345 `data` is validated
action(data);
}
const action = this.#serverActions[command];
if (action !== undefined) {
action(data);
}
}

Expand All @@ -714,9 +692,10 @@ class SocketServer {
* @returns {Promise<void>}
*/
async destroy() {
clearInterval(this.#pinger);

this.#closing = true;

this.#unsubscribe();
clearInterval(this.#pinger);
clearInterval(this.#guestCountInterval);

for (const connection of this.#connections) {
Expand All @@ -725,7 +704,6 @@ class SocketServer {

const closeWsServer = promisify(this.#wss.close.bind(this.#wss));
await closeWsServer();
await this.#redisSubscription.quit();
}

/**
Expand Down
8 changes: 7 additions & 1 deletion src/Uwave.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import waitlist from './plugins/waitlist.js';
import passport from './plugins/passport.js';
import migrations from './plugins/migrations.js';
import { SqliteDateColumnsPlugin, connect as connectSqlite } from './utils/sqlite.js';
import Emittery from 'emittery';

const DEFAULT_SQLITE_PATH = './uwave.sqlite';
const DEFAULT_REDIS_URL = 'redis://localhost:6379';
Expand Down Expand Up @@ -125,6 +126,11 @@ class UwaveServer extends EventEmitter {
// @ts-expect-error TS2564 Definitely assigned in a plugin
socketServer;

/** @type {Emittery<import('./redisMessages.js').ServerActionParameters>} */
events = new Emittery({
debug: { name: 'u-wave-core' },
});

/**
* @type {Map<string, Source>}
*/
Expand Down Expand Up @@ -294,7 +300,7 @@ class UwaveServer extends EventEmitter {
* @param {import('./redisMessages.js').ServerActionParameters[CommandName]} data
*/
publish(command, data) {
return this.redis.publish('uwave', JSON.stringify({ command, data }));
return this.events.emit(command, data);
}

async listen() {
Expand Down
54 changes: 14 additions & 40 deletions src/plugins/configStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import EventEmitter from 'node:events';
import Ajv from 'ajv/dist/2019.js';
import formats from 'ajv-formats';
import jsonMergePatch from 'json-merge-patch';
import sjson from 'secure-json-parse';
import ValidationError from '../errors/ValidationError.js';
import { sql } from 'kysely';
import { fromJson, json, jsonb } from '../utils/sqlite.js';
Expand All @@ -29,12 +28,12 @@ class ConfigStore {

#logger;

#subscriber;

#ajv;

#emitter = new EventEmitter();

#unsubscribe;

/** @type {Map<string, import('ajv').ValidateFunction<unknown>>} */
#validators = new Map();

Expand All @@ -44,7 +43,6 @@ class ConfigStore {
constructor(uw) {
this.#uw = uw;
this.#logger = uw.logger.child({ ns: 'uwave:config' });
this.#subscriber = uw.redis.duplicate();
this.#ajv = new Ajv({
useDefaults: true,
// Allow unknown keywords (`uw:xyz`)
Expand All @@ -59,40 +57,16 @@ class ConfigStore {
fs.readFileSync(new URL('../schemas/definitions.json', import.meta.url), 'utf8'),
));

this.#subscriber.on('message', (_channel, command) => {
this.#onServerMessage(command);
});

uw.use(async () => this.#subscriber.subscribe('uwave'));
}

/**
* @param {string} rawCommand
*/
async #onServerMessage(rawCommand) {
/**
* @type {undefined|{
* command: string,
* data: import('../redisMessages.js').ServerActionParameters['configStore:update'],
* }}
*/
const json = sjson.safeParse(rawCommand);
if (!json) {
return;
}
const { command, data } = json;
if (command !== CONFIG_UPDATE_MESSAGE) {
return;
}

this.#logger.trace({ command, data }, 'handle config update');
this.#unsubscribe = uw.events.on(CONFIG_UPDATE_MESSAGE, async (data) => {
this.#logger.trace({ data }, 'handle config update');

try {
const updatedSettings = await this.get(data.key);
this.#emitter.emit(data.key, updatedSettings, data.user, data.patch);
} catch (error) {
this.#logger.error({ err: error }, 'could not retrieve settings after update');
}
try {
const updatedSettings = await this.get(data.key);
this.#emitter.emit(data.key, updatedSettings, data.user, data.patch);
} catch (error) {
this.#logger.error({ err: error }, 'could not retrieve settings after update');
}
});
}

/**
Expand Down Expand Up @@ -258,8 +232,8 @@ class ConfigStore {
};
}

async destroy() {
await this.#subscriber.quit();
destroy() {
this.#unsubscribe();
}
}

Expand All @@ -268,7 +242,7 @@ class ConfigStore {
*/
async function configStorePlugin(uw) {
uw.config = new ConfigStore(uw);
uw.onClose(() => uw.config.destroy());
uw.onClose(async () => uw.config.destroy());
}

export default configStorePlugin;
Expand Down
14 changes: 11 additions & 3 deletions src/sockets/AuthedConnection.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import EventEmitter from 'node:events';
import Emittery from 'emittery';
import Ultron from 'ultron';
import WebSocket from 'ws';
import sjson from 'secure-json-parse';
Expand All @@ -8,7 +8,13 @@ import { fromJson, json } from '../utils/sqlite.js';
const PING_TIMEOUT = 5_000;
const DEAD_TIMEOUT = 30_000;

class AuthedConnection extends EventEmitter {
/**
* @augments {Emittery<{
* command: { command: string, data: import('type-fest').JsonValue },
* close: { banned: boolean, lastEventID: string | null },
* }>}
*/
class AuthedConnection extends Emittery {
#events;

#logger;
Expand All @@ -20,6 +26,8 @@ class AuthedConnection extends EventEmitter {
/** @type {string|null} */
#lastEventID = null;

banned = false;

/**
* @param {import('../Uwave.js').default} uw
* @param {import('ws').WebSocket} socket
Expand Down Expand Up @@ -97,7 +105,7 @@ class AuthedConnection extends EventEmitter {
this.#lastMessage = Date.now();
const { command, data } = sjson.safeParse(raw) ?? {};
if (command) {
this.emit('command', command, data);
this.emit('command', { command, data });
}
}

Expand Down
12 changes: 9 additions & 3 deletions src/sockets/GuestConnection.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
import EventEmitter from 'node:events';
import Emittery from 'emittery';
import { ulid } from 'ulid';
import Ultron from 'ultron';
import WebSocket from 'ws';

const PING_TIMEOUT = 5_000;
const DEAD_TIMEOUT = 30_000;

class GuestConnection extends EventEmitter {
/**
* @augments {Emittery<{
* close: undefined,
* authenticate: { user: import('../schema.js').User, sessionID: string, lastEventID: string | null }

Check failure on line 12 in src/sockets/GuestConnection.js

View workflow job for this annotation

GitHub Actions / Code style

This line has a length of 102. Maximum allowed is 100
* }>}
*/
class GuestConnection extends Emittery {
#events;

#logger;
Expand Down Expand Up @@ -69,7 +75,7 @@
throw new Error('You have been banned');
}

this.emit('authenticate', userModel, sessionID, null);
await this.emit('authenticate', { user: userModel, sessionID, lastEventID: null });
}

/**
Expand Down
7 changes: 5 additions & 2 deletions src/sockets/LostConnection.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import EventEmitter from 'node:events';
import Emittery from 'emittery';

class LostConnection extends EventEmitter {
/**
* @augments {Emittery<{ close: undefined }>}
*/
class LostConnection extends Emittery {
#logger;

#expiresAt;
Expand Down
Loading