Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding a heartbeat to the signalling protocol. #409

Merged
merged 9 commits into from
Jan 20, 2025
110 changes: 106 additions & 4 deletions Common/src/Protocol/SignallingProtocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,48 @@ import * as MessageHelpers from '../Messages/message_helpers';
* out:
* Emitted when sending a message out on the transport. Similar to 'message' but
* only for when messages are sent from this endpoint. Useful for debugging.
*
* timeout:
* Emitted when a ping fails to receive a pong in time and the connection is
* assumed to be dead. This is emitted after the connection is closed.
*/
export class SignallingProtocol extends EventEmitter {
static get SIGNALLING_VERSION(): string {
return '1.2.1';
}

static readonly PING_TIMEOUT: number = 30 * 1000;

// The transport in use by this protocol object.
transport: ITransport;

private _rtt: number = 0;

/**
* Get the current RTT time in milliseconds. Only valid for the connection initiator. ie. the
* end that called "connect".
*/
get RTT(): number {
return this._rtt;
}

constructor(transport: ITransport) {
super();
this.transport = transport;

this.initHeartbeat();

transport.onMessage = (msg: BaseMessage) => {
// auto handle ping messages
if (msg.type == Messages.ping.typeName) {
const pongMessage = MessageHelpers.createMessage(Messages.pong, {
time: new Date().getTime()
});
transport.sendMessage(pongMessage);
const pingMsg = msg as Messages.ping;
this.onHeartbeatPing(pingMsg.time);
}
if (msg.type == Messages.pong.typeName) {
const pongMsg = msg as Messages.pong;
this.onHeartbeatPong(pongMsg.time);
}
// allow the above to fall through so clients can still react to the ping and pong messages if needed

// call the handlers
transport.emit('message', msg); // emit this for listeners listening to any message
Expand Down Expand Up @@ -72,6 +93,7 @@ export class SignallingProtocol extends EventEmitter {
*/
disconnect(code?: number, reason?: string): void {
this.transport.disconnect(code, reason);
this.stopHeartbeat();
}

/**
Expand All @@ -90,4 +112,84 @@ export class SignallingProtocol extends EventEmitter {
this.transport.sendMessage(msg);
this.transport.emit('out', msg); // emit this for listeners listening to outgoing messages
}

private keepalive?: ReturnType<typeof setInterval>;
private alive: boolean = false;
private initiator: boolean = false;

private initHeartbeat(): void {
// if we're already connected we should receive heartbeats from the initiator
// otherwise we're the initiator and should start sending heartbeats
this.initiator = !this.transport.isConnected();
if (this.initiator) {
this.transport.on('open', this.startHeartbeat.bind(this));
} else {
this.heartbeatWait();
}
}

private startHeartbeat(): void {
// initiator only.
// just sets up an interval to send heartbeats
this.alive = true;
this.keepalive = setInterval(this.heartbeatInitiate.bind(this), SignallingProtocol.PING_TIMEOUT);
}

private stopHeartbeat(): void {
// stops either the heartbeat or the waiting for a heartbeat
if (this.initiator) {
clearInterval(this.keepalive);
} else {
clearTimeout(this.keepalive);
}
}

private heartbeatInitiate(): void {
// initiator only.
// if we never got a response from the last heartbeat, assume the connection is dead and timeout
if (this.alive === false) {
this.disconnect();
this.transport.emit('timeout');
return;
}

// mark the connection as temporarily dead until we get a response from the ping
this.alive = false;
this.sendMessage(MessageHelpers.createMessage(Messages.ping, { time: new Date().getTime() }));
}

private heartbeatWait(): void {
// non-initiator only.
// starts a timer to wait for heartbeat pings. we give it double the ping time to allow some lag
this.keepalive = setTimeout(this.heartbeatTimeout.bind(this), SignallingProtocol.PING_TIMEOUT * 2);
}

private heartbeatTimeout(): void {
// non-initiator only.
// called when we dont cancel the heartbeat timer because we got a ping. times out the connection.
this.disconnect();
this.transport.emit('timeout');
}

private onHeartbeatPing(time: number): void {
// non-initiator only.
// we got a ping from the initiator so mark the connection as live and clear the timeout timer.
this.alive = true;
clearTimeout(this.keepalive);

// respond with a pong
const pongMessage = MessageHelpers.createMessage(Messages.pong, { time: time });
this.transport.sendMessage(pongMessage);

// start waiting for the next one
this.heartbeatWait();
}

private onHeartbeatPong(time: number): void {
// initiator only
// we got a pong response from the other side, the connection is good.
// we also store the round trip time if anyone is curious
this._rtt = new Date().getTime() - time;
this.alive = true;
}
}
7 changes: 4 additions & 3 deletions Common/src/Transport/WebSocketTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,12 @@ export class WebSocketTransport extends EventEmitter implements ITransport {
return;
}

Logger.Info('received => \n' + JSON.stringify(JSON.parse(event.data as string), undefined, 4));

let parsedMessage: BaseMessage;
try {
parsedMessage = JSON.parse(event.data as string) as BaseMessage;
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const parsedData = JSON.parse(event.data as string);
Logger.Info('received => \n' + JSON.stringify(parsedData, undefined, 4));
parsedMessage = parsedData as BaseMessage;
} catch (e: unknown) {
if (e instanceof Error) {
Logger.Error(`Error parsing message string ${event.data}.\n${e.message}`);
Expand Down
2 changes: 2 additions & 0 deletions Extras/JSStreamer/src/streamer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ export class Streamer extends EventEmitter {
this.protocol.addListener(Messages.iceCandidate.typeName, (msg: BaseMessage) =>
this.handleIceMessage(msg as Messages.iceCandidate)
);

this.transport.on('timeout', () => console.log('Streamer connection timeout'));
}

startStreaming(signallingURL: string, stream: MediaStream) {
Expand Down
4 changes: 4 additions & 0 deletions Frontend/library/src/WebRtcPlayer/WebRtcPlayerController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,10 @@ export class WebRtcPlayerController {
}
});

this.protocol.transport.addListener('timeout', () => {
Logger.Error(`Transport timeout`);
});

// set up the final webRtc player controller methods from within our application so a connection can be activated
this.sendMessageController = new SendMessageController(
this.dataChannelSender,
Expand Down
5 changes: 5 additions & 0 deletions Signalling/src/PlayerConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export class PlayerConnection implements IPlayer, LogUtils.IMessageLogger {

this.transport.on('error', this.onTransportError.bind(this));
this.transport.on('close', this.onTransportClose.bind(this));
this.transport.on('timeout', this.onTransportTimeout.bind(this));

this.streamerIdChangeListener = this.onStreamerIdChanged.bind(this);
this.streamerDisconnectedListener = this.onStreamerDisconnected.bind(this);
Expand Down Expand Up @@ -204,6 +205,10 @@ export class PlayerConnection implements IPlayer, LogUtils.IMessageLogger {
this.disconnect();
}

private onTransportTimeout(): void {
Logger.debug('PlayerConnection transport timeout.');
}

private onSubscribeMessage(message: Messages.subscribe): void {
this.subscribe(message.streamerId);
}
Expand Down
5 changes: 5 additions & 0 deletions Signalling/src/SFUConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ export class SFUConnection extends EventEmitter implements IPlayer, IStreamer, L

this.transport.on('error', this.onTransportError.bind(this));
this.transport.on('close', this.onTransportClose.bind(this));
this.transport.on('timeout', this.onTransportTimeout.bind(this));

this.layerPreferenceListener = this.onLayerPreference.bind(this);
this.streamerIdChangeListener = this.onStreamerIdChanged.bind(this);
Expand Down Expand Up @@ -265,6 +266,10 @@ export class SFUConnection extends EventEmitter implements IPlayer, IStreamer, L
this.disconnect();
}

private onTransportTimeout(): void {
Logger.debug('SFUConnection transport timeout.');
}

private onSubscribeMessage(message: Messages.subscribe): void {
this.subscribe(message.streamerId);
}
Expand Down
11 changes: 5 additions & 6 deletions Signalling/src/StreamerConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import {
WebSocketTransportNJS,
BaseMessage,
Messages,
MessageHelpers,
EventEmitter
} from '@epicgames-ps/lib-pixelstreamingcommon-ue5.5';
import { IStreamer, IStreamerInfo } from './StreamerRegistry';
Expand Down Expand Up @@ -59,6 +58,7 @@ export class StreamerConnection extends EventEmitter implements IStreamer, LogUt

this.transport.on('error', this.onTransportError.bind(this));
this.transport.on('close', this.onTransportClose.bind(this));
this.transport.on('timeout', this.onTransportTimeout.bind(this));

this.registerMessageHandlers();

Expand Down Expand Up @@ -107,7 +107,6 @@ export class StreamerConnection extends EventEmitter implements IStreamer, LogUt
Messages.endpointId.typeName,
LogUtils.createHandlerListener(this, this.onEndpointId)
);
this.protocol.on(Messages.ping.typeName, LogUtils.createHandlerListener(this, this.onPing));
this.protocol.on(
Messages.disconnectPlayer.typeName,
LogUtils.createHandlerListener(this, this.onDisconnectPlayerRequest)
Expand Down Expand Up @@ -145,12 +144,12 @@ export class StreamerConnection extends EventEmitter implements IStreamer, LogUt
this.emit('disconnect');
}

private onEndpointId(_message: Messages.endpointId): void {
this.streaming = true; // we're ready to stream when we id ourselves
private onTransportTimeout(): void {
Logger.error(`Streamer '${this.streamerId}' - websocket timeout.`);
}

private onPing(message: Messages.ping): void {
this.sendMessage(MessageHelpers.createMessage(Messages.pong, { time: message.time }));
private onEndpointId(_message: Messages.endpointId): void {
this.streaming = true; // we're ready to stream when we id ourselves
}

private onDisconnectPlayerRequest(message: Messages.disconnectPlayer): void {
Expand Down
Loading