Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 19 additions & 16 deletions action-server/eslint.config.mjs
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
import { sheriff, tseslint } from 'eslint-config-sheriff';
import { sheriff, tseslint } from "eslint-config-sheriff";

const sheriffOptions = {
"react": false,
"lodash": false,
"remeda": false,
"next": false,
"astro": false,
"playwright": false,
"jest": false,
"vitest": false
react: false,
lodash: false,
remeda: false,
next: false,
astro: false,
playwright: false,
jest: false,
vitest: false,
};

export default tseslint.config(sheriff(sheriffOptions),
{
rules: {
"@typescript-eslint/no-explicit-any": ["off"]
},
},
);
export default tseslint.config(sheriff(sheriffOptions), {
rules: {
"@typescript-eslint/no-explicit-any": ["off"],
"@typescript-eslint/no-extraneous-class": "off",
"@typescript-eslint/naming-convention": "off",
"func-style": "off",
"no-restricted-syntax": "off",
"@typescript-eslint/no-dynamic-delete": "off",
},
});
10 changes: 10 additions & 0 deletions action-server/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { configuration } from "./config";
import { corsMiddleware, jsonErrorMiddleware } from "./middleware";
import { ActionWorkerPool } from "./threads/workerPool";
import { cleanup, setupListeners } from "./listeners/dbListeners";
import { ActionRunner } from "./type/actionRunner";

const port = configuration().PORT;

Expand Down Expand Up @@ -34,6 +35,15 @@ app.get("/health", async (req, res, next) => {
res.status(200).send();
});

app.post("/secrets", async (req, res, next) => {
const { action_run_id, secrets } = req.body;
const actionRunId = action_run_id as string;

ActionRunner.addActionSecret(actionRunId, secrets as Record<string, string>);

res.status(200).send({ success: true });
});

// handle termination signals
process.on("SIGINT", () => cleanup(server));
process.on("SIGTERM", () => cleanup(server));
50 changes: 31 additions & 19 deletions action-server/src/listeners/dbListeners.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
import { readFile } from "node:fs/promises";
import type http from "node:http";
import * as path from "node:path";
import type { Pool, PoolClient } from "pg";
import type { PoolClient } from "pg";
import { configuration } from "../config";
import { ActionsDbManager } from "../db";
import { ActionWorkerPool } from "../threads/workerPool";
import type { ActionDefinitionInsertedPayload, ActionResponse, ActionRunInsertedPayload } from "../type/types";
import { extractSchemas } from "../utils/codeRunner";
import { createLogger, format, transports } from "winston";
import logger from "../utils/logger";
import { ActionRunCancellationRequestPayload } from "../type/types";
import { ActionRunner } from "../type/actionRunner";

let listenClient: PoolClient | undefined;

async function readFileFromStore(fileName: string): Promise<string> {
// read file from aerie file store and return [resolve] it as a string
const fileStoreBasePath = configuration().ACTION_LOCAL_STORE;
const filePath = path.join(fileStoreBasePath, fileName);

logger.info(`path is ${filePath}`);

return await readFile(filePath, "utf-8");
}

Expand All @@ -42,6 +44,7 @@ async function refreshActionDefinitionSchema(payload: ActionDefinitionInsertedPa
JSON.stringify(schemas.settingDefinitions),
payload.action_definition_id,
]);

logger.info("Updated action_definition:", res.rows[0]);
} catch (error) {
logger.error("Error updating row:", error);
Expand All @@ -52,31 +55,36 @@ async function cancelAction(payload: ActionRunCancellationRequestPayload) {
ActionWorkerPool.cancelTask(payload.action_run_id);
}

async function runAction(payload: ActionRunInsertedPayload) {
const actionRunId = payload.action_run_id;
const actionFilePath = payload.action_file_path;
logger.info(`action run ${actionRunId} inserted (${actionFilePath})`);
export async function runAction(payload: ActionRunInsertedPayload, actionSecrets?: Record<string, any>): Promise<void> {
const { action_file_path, action_run_id, parameters, settings, workspace_id } = payload;
const actionRunId = Number(action_run_id);

logger.info(`action run ${action_run_id} inserted (${action_file_path})`);

let taskError;

// event payload contains a file path for the action file which should be run
const actionJS = await readFileFromStore(actionFilePath);
const actionJS = await readFileFromStore(action_file_path);

// NOTE: Authentication tokens are unavailable in PostgreSQL Listen/Notify
// const authToken = req.header("authorization");
// if (!authToken) console.warn("No valid `authorization` header in action-run request");

const { parameters, settings } = payload;
const workspaceId = payload.workspace_id;
const pool = ActionsDbManager.getDb();
logger.info(`Submitting task to worker pool for action run ${actionRunId}`);

logger.info(`Submitting task to worker pool for action run ${action_run_id}`);
const start = performance.now();
let run, taskError;
let run;

try {
run = (await ActionWorkerPool.submitTask({
actionJS: actionJS,
action_run_id: actionRunId,
actionJS,
action_run_id,
message_port: null,
parameters: parameters,
settings: settings,
workspaceId: workspaceId,
parameters,
settings,
workspaceId: workspace_id,
secrets: actionSecrets,
})) satisfies ActionResponse;
} catch (error: any) {
if (error?.name === "AbortError") {
Expand All @@ -92,7 +100,6 @@ async function runAction(payload: ActionRunInsertedPayload) {
const status = taskError || run?.errors ? "failed" : "success";
logger.info(`Finished run ${actionRunId} in ${duration / 1000}s - ${status}`);
const errorValue = JSON.stringify(taskError || run?.errors || {});

const logStr = run ? run.console.join("\n") : "";

// update action_run row in DB with status/results/errors/logs
Expand All @@ -118,6 +125,7 @@ async function runAction(payload: ActionRunInsertedPayload) {
payload.action_run_id,
],
);

logger.info("Updated action_run:", res.rows[0]);
} catch (error) {
logger.error("Error updating row:", error);
Expand All @@ -141,25 +149,29 @@ export async function setupListeners() {

listenClient.on("notification", async (msg) => {
console.info(`PG notify event: ${JSON.stringify(msg, null, 2)}`);

if (!msg.payload) {
console.warn(`warning: PG event with no message or payload: ${JSON.stringify(msg, null, 2)}`);
return;
}

const payload = JSON.parse(msg.payload);

if (msg.channel === "action_definition_inserted") {
await refreshActionDefinitionSchema(payload);
} else if (msg.channel === "action_run_inserted") {
await runAction(payload);
await ActionRunner.addActionRun(payload as ActionRunInsertedPayload);
} else if (msg.channel === "action_run_cancel_requested") {
await cancelAction(payload);
}
});

logger.info("Initialized PG event listeners");
}

export function cleanup(server: http.Server) {
export function cleanup(server: http.Server): void {
console.log("shutting down...");

if (listenClient) {
listenClient.release();
}
Expand Down
4 changes: 3 additions & 1 deletion action-server/src/threads/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,16 @@ export async function runAction(task: ActionTask): Promise<ActionResponse> {
}

let jsRun: ActionResponse;

try {
jsRun = await jsExecute(task.actionJS, task.parameters, task.settings, task.auth, client, task.workspaceId);
jsRun = await jsExecute(task.actionJS, task.parameters, task.settings, task.auth, client, task.workspaceId, task.secrets);
logger.info(`[Action Run ${task.action_run_id}, Thread ${threadId}] done executing`);
await releaseDbPoolAndClient();
logger.info(`[Action Run ${task.action_run_id}, Thread ${threadId}] released DB connection`);
// Send "I'm finished" back to main thread:
task.message_port?.postMessage({ type: "finished" });
task.message_port?.close();

return jsRun;
} catch (e) {
logger.info(`[Action Run ${task.action_run_id}, Thread ${threadId}] Error while executing`);
Expand Down
3 changes: 1 addition & 2 deletions action-server/src/threads/workerPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,13 @@ export class ActionWorkerPool {
// Case 1. Worker has not yet started -> use abortcontroller to remove from piscina task queue
logger.info(`Action run ${action_run_id} has not yet started, removing it from the queue`);
const abortController = this.abortControllerForActionRun.get(action_run_id);
if(abortController) {
if (abortController) {
abortController.abort();
} else {
logger.warn(`No abort controller found for task ${action_run_id}`);
}
this.removeFromMaps(action_run_id);
return;

} else {
// Case 2. Worker has started, and is not completed -> ask it to close its database connection
const port = this.messagePortsForActionRun.get(action_run_id);
Expand Down
85 changes: 85 additions & 0 deletions action-server/src/type/actionRunner.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import { runAction } from "../listeners/dbListeners";
import logger from "../utils/logger";
import type { ActionRunInsertedPayload } from "./types";

export class ActionRunner {
// Wait up to 10 minutes for the action run associated with the secrets.
private static WAIT_FOR_ACTION_RUN_TIMEOUT = 600000;
// Wait up to 1 minute for the secrets associated with the action run.
private static WAIT_FOR_SECRET_TIMEOUT = 60000;

private static actionRuns: Record<string, ActionRunInsertedPayload> = {};
private static actionRunQueue: Map<string, (actionRunId: string) => Promise<void>> = new Map();
private static actionSecretsMap: Map<string, Record<string, string>> = new Map();

static async addActionRun(actionRun: ActionRunInsertedPayload): Promise<void> {
const actionRunId = actionRun.action_run_id;

this.actionRuns[actionRunId] = actionRun;
const actionRunFunc = async (runId: string) => {
try {
await ActionRunner.runAction(runId);
this.deleteActionRun(runId);
} catch (error) {
this.deleteActionRun(runId);
}
};

this.actionRunQueue.set(actionRunId, actionRunFunc);

// If there aren't any secrets execute the action run immediately.
if (!actionRun.has_secrets) {
await actionRunFunc(actionRunId);
} else {
logger.info(`Action Run: ${actionRunId} waiting for secrets...`);
}

setTimeout(() => {
if (this.actionRunQueue.get(actionRunId) !== null) {
logger.info(`Action Run: ${actionRunId} timed out waiting for the associated action secrets.`);
this.deleteActionRun(actionRunId);
}
}, this.WAIT_FOR_SECRET_TIMEOUT);
}

static async addActionSecret(actionRunId: string, actionSecrets: Record<string, string>): Promise<void> {
this.actionSecretsMap.set(actionRunId, actionSecrets);

logger.info(`Secret found for Action Run: ${actionRunId}, running action...`);

Check warning

Code scanning / CodeQL

Log injection Medium

Log entry depends on a
user-provided value
.

const actionRunFunc = this.actionRunQueue.get(actionRunId);

if (actionRunFunc) {
setTimeout(() => {
if (this.actionSecretsMap.get(actionRunId) !== null) {
logger.info(`Secret for Action Run: ${actionRunId} timed out waiting for the associated action run.`);

Check warning

Code scanning / CodeQL

Log injection Medium

Log entry depends on a
user-provided value
.
this.deleteActionSecret(actionRunId);
}
}, this.WAIT_FOR_ACTION_RUN_TIMEOUT);

await actionRunFunc(actionRunId);

Check failure

Code scanning / CodeQL

Unvalidated dynamic method call High

Invocation of method with
user-controlled
name may dispatch to unexpected target and cause an exception.
this.deleteActionSecret(actionRunId);
} else {
throw new Error(`Action Run ${actionRunId} not found in queue`);
}
}

static deleteActionRun(actionRunId: string): void {
delete this.actionRuns[actionRunId];
this.actionRunQueue.delete(actionRunId);
}

static deleteActionSecret(actionRunId: string): void {
this.actionSecretsMap.delete(actionRunId);
}

private static async runAction(actionRunId: string): Promise<void> {
const action = this.actionRuns[actionRunId];
const secret = action.has_secrets ? this.actionSecretsMap.get(actionRunId) : undefined;

this.deleteActionRun(actionRunId);
this.deleteActionSecret(actionRunId);

await runAction(action, secret);
}
}
3 changes: 3 additions & 0 deletions action-server/src/type/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ export type ConsoleOutput = {
export type ActionConfig = {
ACTION_FILE_STORE: string;
SEQUENCING_FILE_STORE: string;
SECRETS?: Record<string, string> | undefined;
WORKSPACE_BASE_URL: string;
HASURA_GRAPHQL_ADMIN_SECRET: string;
};
Expand All @@ -33,6 +34,7 @@ export type ActionTask = {
action_run_id: string;
parameters: Record<string, any>;
settings: Record<string, any>;
secrets?: Record<string, string>;
auth?: string;
workspaceId: number;
message_port: MessagePort | null;
Expand All @@ -50,6 +52,7 @@ export type ActionRunInsertedPayload = {
action_definition_id: number;
workspace_id: number;
action_file_path: string;
has_secrets: boolean;
};

export type ActionRunCancellationRequestPayload = {
Expand Down
16 changes: 10 additions & 6 deletions action-server/src/utils/codeRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,24 @@ const { ACTION_LOCAL_STORE, SEQUENCING_LOCAL_STORE, WORKSPACE_BASE_URL, HASURA_G

function injectLogger(oldConsole: any, logBuffer: string[], secrets?: Record<string, any> | undefined) {
// secrets may be passed as last argument, to be censored in the logs
secrets = secrets || {};
secrets['HASURA_GRAPHQL_ADMIN_SECRET'] = HASURA_GRAPHQL_ADMIN_SECRET;
const censoredSecrets = {
...(secrets || {}),
HASURA_GRAPHQL_ADMIN_SECRET
};

// inject a winston logger to be passed to the action VM, replacing its normal `console`,
// so we can capture the console outputs and return them with the action results
const logger = createLogger({
level: "debug", // todo allow user to set log level
format: format.combine(
format.timestamp(),
format.printf(({ level, message, timestamp }) => {

const logLine = `${timestamp} [${level.toUpperCase()}] `;
let output = message as string;

// If the action has secrets filter them out of the log.
if (secrets !== undefined && Object.keys(secrets).length > 0) {
const secretValues = Object.values(secrets);
if (Object.keys(censoredSecrets).length > 0) {
const secretValues = Object.values(censoredSecrets);

for (const secretValue of secretValues) {
output = output.replaceAll(secretValue, "*****");
Expand Down Expand Up @@ -78,14 +80,15 @@ export const jsExecute = async (
authToken: string | undefined,
client: PoolClient,
workspaceId: number,
secrets: Record<string, string> | undefined,
): Promise<ActionResponse> => {
// create a clone of the global object (including getters/setters/non-enumerable properties)
// to be passed to the context so it has access to eg. node built-ins
const aerieGlobal = getGlobals();
// inject custom logger to capture logs from action run
const logBuffer: string[] = [];

aerieGlobal.console = injectLogger(aerieGlobal.console, logBuffer);
aerieGlobal.console = injectLogger(aerieGlobal.console, logBuffer, secrets);

const context = vm.createContext(aerieGlobal);

Expand All @@ -95,6 +98,7 @@ export const jsExecute = async (
const actionConfig: ActionConfig = {
ACTION_FILE_STORE: ACTION_LOCAL_STORE,
SEQUENCING_FILE_STORE: SEQUENCING_LOCAL_STORE,
SECRETS: secrets,
WORKSPACE_BASE_URL: WORKSPACE_BASE_URL,
HASURA_GRAPHQL_ADMIN_SECRET: HASURA_GRAPHQL_ADMIN_SECRET
};
Expand Down
Loading
Loading