Skip to content

Commit

Permalink
refactor(cluster): add explicit types
Browse files Browse the repository at this point in the history
  • Loading branch information
darrachequesne committed Feb 20, 2024
1 parent d99a71b commit 2a6a215
Showing 1 changed file with 19 additions and 9 deletions.
28 changes: 19 additions & 9 deletions lib/cluster-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ type DistributiveOmit<T, K extends keyof any> = T extends any
? Omit<T, K>
: never;

/**
* The unique ID of this server
*/
type ServerId = string;

/**
* The id of a message (for the Connection state recovery)
*/
type Offset = string;

export interface ClusterAdapterOptions {
/**
* The number of ms between two heartbeats.
Expand Down Expand Up @@ -49,7 +59,7 @@ export enum MessageType {
}

export type ClusterMessage = {
uid: string;
uid: ServerId;
nsp: string;
} & (
| {
Expand Down Expand Up @@ -106,7 +116,7 @@ interface ClusterRequest {
}

export type ClusterResponse = {
uid: string;
uid: ServerId;
nsp: string;
} & (
| {
Expand Down Expand Up @@ -167,7 +177,7 @@ function decodeOptions(opts): BroadcastOptions {
* - call {@link ClusterAdapter#onMessage} and {@link ClusterAdapter#onResponse}
*/
export abstract class ClusterAdapter extends Adapter {
protected readonly uid: string;
protected readonly uid: ServerId;

private requests: Map<string, ClusterRequest> = new Map();
private ackRequests: Map<string, ClusterAckRequest> = new Map();
Expand Down Expand Up @@ -418,7 +428,7 @@ export abstract class ClusterAdapter extends Adapter {
private addOffsetIfNecessary(
packet: any,
opts: BroadcastOptions,
offset: string
offset: Offset
) {
if (!this.nsp.server.opts.connectionStateRecovery) {
return;
Expand Down Expand Up @@ -646,10 +656,10 @@ export abstract class ClusterAdapter extends Adapter {
* @protected
* @return an offset, if applicable
*/
protected abstract doPublish(message: ClusterMessage): Promise<string>;
protected abstract doPublish(message: ClusterMessage): Promise<Offset>;

protected publishResponse(
requesterUid: string,
requesterUid: ServerId,
response: Omit<ClusterResponse, "nsp" | "uid">
) {
(response as ClusterResponse).uid = this.uid;
Expand All @@ -669,7 +679,7 @@ export abstract class ClusterAdapter extends Adapter {
* @protected
*/
protected abstract doPublishResponse(
requesterUid: string,
requesterUid: ServerId,
response: ClusterResponse
): Promise<void>;
}
Expand All @@ -686,7 +696,7 @@ export abstract class ClusterAdapterWithHeartbeat extends ClusterAdapter {
private readonly _opts: Required<ClusterAdapterOptions>;

private heartbeatTimer: NodeJS.Timeout;
private nodesMap: Map<string, number> = new Map(); // uid => timestamp of last message
private nodesMap: Map<ServerId, number> = new Map(); // uid => timestamp of last message
private readonly cleanupTimer: NodeJS.Timeout | undefined;
private customRequests: Map<string, CustomClusterRequest> = new Map();

Expand Down Expand Up @@ -941,7 +951,7 @@ export abstract class ClusterAdapterWithHeartbeat extends ClusterAdapter {
}
}

private removeNode(uid: string) {
private removeNode(uid: ServerId) {
this.customRequests.forEach((request, requestId) => {
request.missingUids.delete(uid);
if (request.missingUids.size === 0) {
Expand Down

0 comments on commit 2a6a215

Please sign in to comment.