Skip to content

Commit

Permalink
Refactor analysis console to use SSE for real-time updates
Browse files Browse the repository at this point in the history
  • Loading branch information
mateuscardosodeveloper committed May 8, 2024
1 parent 5d62548 commit 07b64ce
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 43 deletions.
69 changes: 29 additions & 40 deletions src/commands/analysis/analysis-console.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,22 @@
import { connect } from "socket.io-client";

import EventSource from "eventsource";
import { Account } from "@tago-io/sdk";
import { AnalysisInfo } from "@tago-io/sdk/lib/types";

import { getEnvironmentConfig, IEnvironment } from "../../lib/config-file";
import { errorHandler, highlightMSG, infoMSG, successMSG } from "../../lib/messages";
import { searchName } from "../../lib/search-name";
import { pickAnalysisFromConfig } from "../../prompt/pick-analysis-from-config";

/**
* Creates a WebSocket connection to the TagoIO Realtime API.
* @param profileToken The user's profile token.
* @returns The WebSocket instance.
* Creates a new SSE connection to the TagoIO Realtime API.
* @param profileToken - The user's profile token.
* @param analysisID - The ID of the analysis script to connect to.
* @returns An EventSource instance connected to the TagoIO Realtime API.
*/
function apiSocket(profileToken: string) {
const socket = connect("wss://realtime.tago.io", {
reconnectionDelay: 10_000,
reconnection: true,
transports: ["websocket"],
query: {
token: profileToken,
},
});
function apiSSE(profileToken: string, analysisID: string) {
const sse = new EventSource(`https://sse.tago.io/events?channel=analysis_console.${analysisID}&token=${profileToken}`);

return socket;
return sse;
}

/**
Expand All @@ -43,34 +37,29 @@ async function getScriptObj(scriptName: string | void, analysisList: IEnvironmen
}
return scriptObj;
}

/**
* Sets up a socket connection to TagoIO and attaches to an analysis script.
* @param socket - The socket connection to TagoIO.
* @param scriptId - The ID of the analysis script to attach to.
* @param analysis_info - Information about the analysis script.
* Sets up the SSE connection and event listeners for device live inspection.
* @param sse - The SSE connection to TagoIO.
* @param deviceIdOrToken - The ID or token of the device to inspect.
* @param deviceInfo - Information about the device being inspected.
*/
function setupSocket(socket: ReturnType<typeof apiSocket>, scriptId: string, analysis_info: any) {
socket.on("connect", () => {
infoMSG("Connected to TagoIO, Getting analysis information...");
socket.emit("attach", "analysis", scriptId);
socket.emit("attach", {
resourceName: "analysis",
resourceID: scriptId,
});
});

socket.on("disconnect", () => console.info("Disconnected from TagoIO.\n\n"));
function setupSSE(sse: ReturnType<typeof apiSSE>, scriptId: string, analysis_info: AnalysisInfo) {
sse.onmessage = (event) => {
const scope = JSON.parse(event.data).payload;
console.log(`\x1b[35m${new Date(scope.timestamp).toISOString()} \x1b[0m ${scope.message}`);
};

socket.on("error", (e: Error) => {
sse.onerror = (error) => {
errorHandler("Connection error");
console.error(e);
});

socket.on("ready", () => successMSG(`Analysis [${highlightMSG(analysis_info.name)}] found succesfully. ${highlightMSG("Waiting for logs...")}`));
console.error(error);
};

socket.on("analysis::console", (scope: any) => {
console.log(`\x1b[35m${new Date(scope.timestamp).toISOString()} \x1b[0m ${scope.message}`);
});
sse.onopen = () => {
infoMSG("Connected to TagoIO, Getting analysis information...");
successMSG(`Analysis [${highlightMSG(analysis_info.name)}] found successfully.`);
successMSG(`Waiting for logs...`);
};
}

/**
Expand Down Expand Up @@ -99,8 +88,8 @@ async function connectAnalysisConsole(scriptName: string | void, options: { envi
return;
}

const socket = apiSocket(config.profileToken);
setupSocket(socket, scriptObj.id, analysis_info);
const sse = apiSSE(config.profileToken, analysis_info.id);
setupSSE(sse, scriptObj.id, analysis_info);
}

export { connectAnalysisConsole };
6 changes: 3 additions & 3 deletions src/commands/devices/device-live-inspector.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import EventSource from "eventsource";

import { Account, Device } from "@tago-io/sdk";
import { DeviceInfo } from "@tago-io/sdk/lib/types";

Expand All @@ -10,6 +9,7 @@ import { pickDeviceIDFromTagoIO } from "../../prompt/pick-device-id-from-tagoio"
/**
* Creates a new SSE connection to the TagoIO Realtime API.
* @param profileToken - The user's profile token.
* @param deviceID - The ID of the device to inspect.
* @returns An EventSource instance connected to the TagoIO Realtime API.
*/
function apiSSE(profileToken: string, deviceID: string) {
Expand Down Expand Up @@ -59,9 +59,9 @@ function setupSSE(sse: ReturnType<typeof apiSSE>, deviceIdOrToken: string, devic
}
};

sse.onerror = (_error) => {
sse.onerror = (error) => {
errorHandler("Connection error");
console.error(_error);
console.error(error);
};

sse.onopen = () => {
Expand Down

0 comments on commit 07b64ce

Please sign in to comment.