Skip to content

Commit

Permalink
test(cluster): init the test suite for the clustered adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
darrachequesne committed Feb 21, 2024
1 parent 2a6a215 commit 8fce41b
Show file tree
Hide file tree
Showing 6 changed files with 924 additions and 42 deletions.
107 changes: 69 additions & 38 deletions lib/cluster-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ type DistributiveOmit<T, K extends keyof any> = T extends any
: never;

/**
* The unique ID of this server
* The unique ID of a server
*/
type ServerId = string;
export type ServerId = string;

/**
* The id of a message (for the Connection state recovery)
* The unique ID of a message (for the connection state recovery feature)
*/
type Offset = string;
export type Offset = string;

export interface ClusterAdapterOptions {
/**
Expand Down Expand Up @@ -101,7 +101,7 @@ export type ClusterMessage = {
type: MessageType.SERVER_SIDE_EMIT;
data: {
requestId?: string;
packet: unknown;
packet: any[];
};
}
);
Expand Down Expand Up @@ -196,10 +196,15 @@ export abstract class ClusterAdapter extends Adapter {
*/
protected async onMessage(message: ClusterMessage, offset?: string) {
if (message.uid === this.uid) {
return debug("ignore message from self");
return debug("[%s] ignore message from self", this.uid);
}

debug("new event of type %d from %s", message.type, message.uid);
debug(
"[%s] new event of type %d from %s",
this.uid,
message.type,
message.uid
);

switch (message.type) {
case MessageType.BROADCAST: {
Expand All @@ -209,7 +214,11 @@ export abstract class ClusterAdapter extends Adapter {
message.data.packet,
decodeOptions(message.data.opts),
(clientCount) => {
debug("waiting for %d client acknowledgements", clientCount);
debug(
"[%s] waiting for %d client acknowledgements",
this.uid,
clientCount
);
this.publishResponse(message.uid, {
type: MessageType.BROADCAST_CLIENT_COUNT,
data: {
Expand All @@ -219,7 +228,11 @@ export abstract class ClusterAdapter extends Adapter {
});
},
(arg) => {
debug("received acknowledgement with value %j", arg);
debug(
"[%s] received acknowledgement with value %j",
this.uid,
arg
);
this.publishResponse(message.uid, {
type: MessageType.BROADCAST_ACK,
data: {
Expand Down Expand Up @@ -256,7 +269,11 @@ export abstract class ClusterAdapter extends Adapter {
break;

case MessageType.FETCH_SOCKETS: {
debug("calling fetchSockets with opts %j", message.data.opts);
debug(
"[%s] calling fetchSockets with opts %j",
this.uid,
message.data.opts
);
const localSockets = await super.fetchSockets(
decodeOptions(message.data.opts)
);
Expand All @@ -281,7 +298,7 @@ export abstract class ClusterAdapter extends Adapter {
}

case MessageType.SERVER_SIDE_EMIT: {
const packet = message.data.packet as unknown[];
const packet = message.data.packet;
const withAck = message.data.requestId !== undefined;
if (!withAck) {
this.nsp._onServerSideEmit(packet);
Expand All @@ -294,7 +311,7 @@ export abstract class ClusterAdapter extends Adapter {
return;
}
called = true;
debug("calling acknowledgement with %j", arg);
debug("[%s] calling acknowledgement with %j", this.uid, arg);
this.publishResponse(message.uid, {
type: MessageType.SERVER_SIDE_EMIT_RESPONSE,
data: {
Expand All @@ -304,8 +321,7 @@ export abstract class ClusterAdapter extends Adapter {
});
};

packet.push(callback);
this.nsp._onServerSideEmit(packet);
this.nsp._onServerSideEmit([...packet, callback]);
break;
}

Expand All @@ -323,7 +339,7 @@ export abstract class ClusterAdapter extends Adapter {
break;

default:
debug("unknown message type: %s", message.type);
debug("[%s] unknown message type: %s", this.uid, message.type);
}
}

Expand All @@ -336,7 +352,12 @@ export abstract class ClusterAdapter extends Adapter {
protected onResponse(response: ClusterResponse) {
const requestId = response.data.requestId;

debug("received response %s to request %s", response.type, requestId);
debug(
"[%s] received response %s to request %s",
this.uid,
response.type,
requestId
);

switch (response.type) {
case MessageType.BROADCAST_CLIENT_COUNT: {
Expand Down Expand Up @@ -391,7 +412,7 @@ export abstract class ClusterAdapter extends Adapter {

default:
// @ts-ignore
debug("unknown response type: %s", response.type);
debug("[%s] unknown response type: %s", this.uid, response.type);
}
}

Expand All @@ -409,7 +430,11 @@ export abstract class ClusterAdapter extends Adapter {
});
this.addOffsetIfNecessary(packet, opts, offset);
} catch (e) {
return debug("error while broadcasting message: %s", e.message);
return debug(
"[%s] error while broadcasting message: %s",
this.uid,
e.message
);
}
}

Expand Down Expand Up @@ -454,6 +479,11 @@ export abstract class ClusterAdapter extends Adapter {
if (!onlyLocal) {
const requestId = randomId();

this.ackRequests.set(requestId, {
clientCountCallback,
ack,
});

this.publish({
type: MessageType.BROADCAST,
data: {
Expand All @@ -463,11 +493,6 @@ export abstract class ClusterAdapter extends Adapter {
},
});

this.ackRequests.set(requestId, {
clientCountCallback,
ack,
});

// we have no way to know at this level whether the server has received an acknowledgement from each client, so we
// will simply clean up the ackRequests map after the given delay
setTimeout(() => {
Expand Down Expand Up @@ -591,7 +616,8 @@ export abstract class ClusterAdapter extends Adapter {
const expectedResponseCount = (await this.serverCount()) - 1;

debug(
'waiting for %d responses to "serverSideEmit" request',
'[%s] waiting for %d responses to "serverSideEmit" request',
this.uid,
expectedResponseCount
);

Expand Down Expand Up @@ -637,7 +663,7 @@ export abstract class ClusterAdapter extends Adapter {
message: DistributiveOmit<ClusterMessage, "nsp" | "uid">
): void {
this.publishAndReturnOffset(message).catch((err) => {
debug("error while publishing message: %s", err);
debug("[%s] error while publishing message: %s", this.uid, err);
});
}

Expand Down Expand Up @@ -666,7 +692,7 @@ export abstract class ClusterAdapter extends Adapter {
(response as ClusterResponse).nsp = this.nsp.name;
this.doPublishResponse(requesterUid, response as ClusterResponse).catch(
(err) => {
debug("error while publishing response: %s", err);
debug("[%s] error while publishing response: %s", this.uid, err);
}
);
}
Expand Down Expand Up @@ -714,7 +740,7 @@ export abstract class ClusterAdapterWithHeartbeat extends ClusterAdapter {
this.nodesMap.forEach((lastSeen, uid) => {
const nodeSeemsDown = now - lastSeen > this._opts.heartbeatTimeout;
if (nodeSeemsDown) {
debug("node %s seems down", uid);
debug("[%s] node %s seems down", this.uid, uid);
this.removeNode(uid);
}
});
Expand Down Expand Up @@ -751,14 +777,21 @@ export abstract class ClusterAdapterWithHeartbeat extends ClusterAdapter {

override async onMessage(message: ClusterMessage, offset?: string) {
if (message.uid === this.uid) {
return debug("ignore message from self");
return debug("[%s] ignore message from self", this.uid);
}

if (message.uid && message.uid !== EMITTER_UID) {
// we track the UID of each sender, in order to know how many servers there are in the cluster
this.nodesMap.set(message.uid, Date.now());
}

debug(
"[%s] new event of type %d from %s",
this.uid,
message.type,
message.uid
);

switch (message.type) {
case MessageType.INITIAL_HEARTBEAT:
this.publish({
Expand All @@ -777,14 +810,6 @@ export abstract class ClusterAdapterWithHeartbeat extends ClusterAdapter {
}

override serverCount(): Promise<number> {
const now = Date.now();
this.nodesMap.forEach((lastSeen, uid) => {
const nodeSeemsDown = now - lastSeen > this._opts.heartbeatTimeout;
if (nodeSeemsDown) {
debug("node %s seems down", uid);
this.nodesMap.delete(uid);
}
});
return Promise.resolve(1 + this.nodesMap.size);
}

Expand All @@ -810,7 +835,8 @@ export abstract class ClusterAdapterWithHeartbeat extends ClusterAdapter {
const expectedResponseCount = this.nodesMap.size;

debug(
'waiting for %d responses to "serverSideEmit" request',
'[%s] waiting for %d responses to "serverSideEmit" request',
this.uid,
expectedResponseCount
);

Expand Down Expand Up @@ -905,7 +931,12 @@ export abstract class ClusterAdapterWithHeartbeat extends ClusterAdapter {
override onResponse(response: ClusterResponse) {
const requestId = response.data.requestId;

debug("received response %s to request %s", response.type, requestId);
debug(
"[%s] received response %s to request %s",
this.uid,
response.type,
requestId
);

switch (response.type) {
case MessageType.FETCH_SOCKETS_RESPONSE: {
Expand Down
9 changes: 8 additions & 1 deletion lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -506,4 +506,11 @@ function shouldIncludePacket(
return included && notExcluded;
}

export { ClusterAdapter } from "./cluster-adapter";
export {
ClusterAdapter,
ClusterAdapterWithHeartbeat,
ClusterMessage,
ClusterResponse,
ServerId,
Offset,
} from "./cluster-adapter";
Loading

0 comments on commit 8fce41b

Please sign in to comment.