diff --git a/migrations/20241009150000_event_dispatch_kv.js b/migrations/20241009150000_event_dispatch_kv.js new file mode 100644 index 00000000..7a11eb09 --- /dev/null +++ b/migrations/20241009150000_event_dispatch_kv.js @@ -0,0 +1,17 @@ +exports.up = function(knex) { + return knex.schema.withSchema('dbos') + .createTable('event_dispatch_kv', function(table) { + table.text('service_name').notNullable(); + table.text('workflow_fn_name').notNullable(); + table.text('key').notNullable(); + table.text('value'); + table.decimal('update_seq', 38, 0); + table.decimal('update_time', 38, 15); + table.primary(['service_name','workflow_fn_name','key']); + }) +}; + +exports.down = function(knex) { + return knex.schema.withSchema('dbos') + .dropTableIfExists('event_dispatch_kv'); +}; diff --git a/schemas/system_db_schema.ts b/schemas/system_db_schema.ts index 4deb9955..cd67468c 100644 --- a/schemas/system_db_schema.ts +++ b/schemas/system_db_schema.ts @@ -39,9 +39,16 @@ export interface workflow_inputs { inputs: string; } -export interface scheduler_state { +export interface event_dispatch_kv { + // Key fields + service_name: string; workflow_fn_name: string; - last_run_time: number; // Time that has certainly been kicked off; others may have but OAOO will cover that + key: string; + + // Payload fields + value?: string; + update_time?: number; // Timestamp of record (for upsert) + update_seq?: bigint; // Sequence number of record (for upsert) } export interface workflow_queue { @@ -50,4 +57,4 @@ export interface workflow_queue { created_at_epoch_ms: number; // This time is provided by the database started_at_epoch_ms?: number; // This time is provided by the client completed_at_epoch_ms?: number; // This time is provided by the client -} \ No newline at end of file +} diff --git a/src/dbos-executor.ts b/src/dbos-executor.ts index 46ff4938..6ff0e927 100644 --- a/src/dbos-executor.ts +++ b/src/dbos-executor.ts @@ -49,6 +49,7 @@ import { DBOSEventReceiver, DBOSExecutorContext} from "."; import { get } from "lodash"; import { wfQueueRunner, WorkflowQueue } from "./wfqueue"; import { debugTriggerPoint, DEBUG_TRIGGER_WORKFLOW_ENQUEUE } from "./debugpoint"; +import { DBOSEventReceiverState, DBOSEventReceiverQuery, DBNotificationCallback, DBNotificationListener } from "./eventreceiver"; // eslint-disable-next-line @typescript-eslint/no-empty-object-type export interface DBOSNull { } @@ -896,6 +897,38 @@ export class DBOSExecutor implements DBOSExecutorContext { return new RetrievedHandle(this.systemDatabase, workflowUUID); } + async queryUserDB(sql: string, params?: unknown[]) { + if (params !== undefined) { + return await this.userDatabase.query(sql, ...params); + } + else { + return await this.userDatabase.query(sql); + } + } + + async userDBListen(channels: string[], callback: DBNotificationCallback): Promise { + const notificationsClient = await this.procedurePool.connect(); + for (const nname of channels) { + await notificationsClient.query(`LISTEN ${nname};`); + } + + notificationsClient.on("notification", callback); + + return { + close: async () => { + for (const nname of channels) { + try { + await notificationsClient.query(`UNLISTEN ${nname};`); + } + catch(e) { + this.logger.warn(e); + } + notificationsClient.release(); + } + } + } + } + /* INTERNAL HELPERS */ #generateUUID(): string { return uuidv4(); @@ -1006,6 +1039,17 @@ export class DBOSExecutor implements DBOSExecutorContext { return this.workflow(temp_workflow, { workflowUUID: workflowStartUUID, parentCtx: parentCtx ?? undefined, configuredInstance: clsinst, recovery: true, tempWfType, tempWfClass, tempWfName}, ...inputs); } + async getEventDispatchState(svc: string, wfn: string, key: string): Promise { + return await this.systemDatabase.getEventDispatchState(svc, wfn, key); + } + async queryEventDispatchState(query: DBOSEventReceiverQuery): Promise { + return await this.systemDatabase.queryEventDispatchState(query); + } + async upsertEventDispatchState(state: DBOSEventReceiverState): Promise { + return await this.systemDatabase.upsertEventDispatchState(state); + } + + // NOTE: this creates a new span, it does not inherit the span from the original workflow #getRecoveryContext(workflowUUID: string, status: WorkflowStatus): DBOSContextImpl { const span = this.tracer.startSpan(status.workflowName, { diff --git a/src/dbos-runtime/runtime.ts b/src/dbos-runtime/runtime.ts index c53e1d51..c11a00e6 100644 --- a/src/dbos-runtime/runtime.ts +++ b/src/dbos-runtime/runtime.ts @@ -57,6 +57,7 @@ export class DBOSRuntime { this.scheduler = new DBOSScheduler(this.dbosExec); this.scheduler.initScheduler(); this.scheduler.logRegisteredSchedulerEndpoints(); + wfQueueRunner.logRegisteredEndpoints(this.dbosExec); this.wfQueueRunner = wfQueueRunner.dispatchLoop(this.dbosExec); diff --git a/src/decorators.ts b/src/decorators.ts index 2a363016..749306b3 100644 --- a/src/decorators.ts +++ b/src/decorators.ts @@ -151,10 +151,12 @@ export interface MethodRegistrationBase { procConfig?: TransactionConfig; isInstance: boolean; -eventReceiverInfo: Map; + eventReceiverInfo: Map; // eslint-disable-next-line @typescript-eslint/ban-types registeredFunction: Function | undefined; + // eslint-disable-next-line @typescript-eslint/ban-types + origFunction: Function; invoke(pthis: unknown, args: unknown[]): unknown; } diff --git a/src/eventreceiver.ts b/src/eventreceiver.ts index cdd68bef..5296ca85 100644 --- a/src/eventreceiver.ts +++ b/src/eventreceiver.ts @@ -4,6 +4,19 @@ import { WorkflowFunction, WorkflowHandle, WorkflowParams } from './workflow'; import { TransactionFunction } from './transaction'; import { MethodRegistrationBase } from './decorators'; import { StepFunction } from './step'; +import { Notification } from "pg"; + +export type DBNotification = Notification; +export type DBNotificationCallback = (n: DBNotification) => void; +export interface DBNotificationListener { + close(): Promise; +} + +export interface DBOSEventReceiverRegistration { + methodConfig: unknown, + classConfig: unknown, + methodReg: MethodRegistrationBase +} /* * Info provided to an event receiver at initialization, @@ -26,7 +39,7 @@ export interface DBOSExecutorContext * classConfig: the class info the receiver stored * methodReg: the method registration (w/ workflow, transaction, function, and other info) */ - getRegistrationsFor(eri: DBOSEventReceiver) : {methodConfig: unknown, classConfig: unknown, methodReg: MethodRegistrationBase}[]; + getRegistrationsFor(eri: DBOSEventReceiver) : DBOSEventReceiverRegistration[]; transaction(txn: TransactionFunction, params: WorkflowParams, ...args: T): Promise; workflow(wf: WorkflowFunction, params: WorkflowParams, ...args: T): Promise>; @@ -35,6 +48,25 @@ export interface DBOSExecutorContext send(destinationUUID: string, message: T, topic?: string, idempotencyKey?: string): Promise; getEvent(workflowUUID: string, key: string, timeoutSeconds: number): Promise; retrieveWorkflow(workflowUUID: string): WorkflowHandle; + + // Event receiver state queries / updates + /* + * An event dispatcher may keep state in the system database + * The 'service' should be unique to the event receiver keeping state, to separate from others + * The 'workflowFnName' workflow function name should be the fully qualified / unique function name dispatched + * The 'key' field allows multiple records per service / workflow function + * The service+workflowFnName+key uniquely identifies the record, which is associated with: + * 'value' - a value set by the event receiver service; this string may be a JSON to keep complex details + * A version, either as a sequence number (long integer), or as a time (high precision floating point). + * If versions are in use, any upsert is discarded if the version field is less than what is already stored. + * The upsert returns the current record, which is useful if it is more recent. + */ + getEventDispatchState(service: string, workflowFnName: string, key: string): Promise; + upsertEventDispatchState(state: DBOSEventReceiverState): Promise; + + queryUserDB(sql: string, params?: unknown[]): Promise; + + userDBListen(channels: string[], callback: DBNotificationCallback): Promise; } /* @@ -51,4 +83,25 @@ export interface DBOSEventReceiver destroy() : Promise; initialize(executor: DBOSExecutorContext) : Promise; logRegisteredEndpoints() : void; +} + +export interface DBOSEventReceiverState +{ + service: string; + workflowFnName: string; + key: string; + value?: string; + updateTime?: number; + updateSeq?: bigint; +} + +export interface DBOSEventReceiverQuery +{ + service?: string; + workflowFnName?: string; + key?: string; + startTime?: number; + endTime?: number; + startSeq?: bigint; + endSeq?: bigint; } \ No newline at end of file diff --git a/src/foundationdb/fdb_system_database.ts b/src/foundationdb/fdb_system_database.ts index 690a4308..219ae900 100644 --- a/src/foundationdb/fdb_system_database.ts +++ b/src/foundationdb/fdb_system_database.ts @@ -9,6 +9,7 @@ import { DBOSWorkflowConflictUUIDError } from "../error"; import { NativeValue } from "foundationdb/dist/lib/native"; import { DBOSJSON, sleepms } from "../utils"; import { WorkflowQueue } from "../wfqueue"; +import { DBOSEventReceiverState, DBOSEventReceiverQuery } from "../eventreceiver"; interface OperationOutput { output: R; @@ -400,14 +401,17 @@ export class FoundationDBSystemDatabase implements SystemDatabase { await sleepms(durationMS); // TODO: Implement } - /* SCHEDULER */ - getLastScheduledTime(_wfn: string): Promise { - return Promise.resolve(null); + // Event dispatcher queries / updates + async getEventDispatchState(_svc: string, _wfn: string, _key: string): Promise { + return Promise.resolve(undefined); } - setLastScheduledTime(_wfn: string, _invtime: number): Promise { - return Promise.resolve(null); + async queryEventDispatchState(_query: DBOSEventReceiverQuery): Promise { + return Promise.resolve([]); } - + async upsertEventDispatchState(state: DBOSEventReceiverState): Promise { + return Promise.resolve(state); + } + getWorkflows(_input: GetWorkflowsInput): Promise { throw new Error("Method not implemented."); } diff --git a/src/index.ts b/src/index.ts index 6cf53b30..68a063d8 100644 --- a/src/index.ts +++ b/src/index.ts @@ -129,7 +129,12 @@ export { export { DBOSEventReceiver, + DBOSEventReceiverRegistration, DBOSExecutorContext, + DBNotification, + DBNotificationListener, + DBOSEventReceiverQuery, + DBOSEventReceiverState, } from "./eventreceiver"; export { diff --git a/src/scheduler/scheduler.ts b/src/scheduler/scheduler.ts index b623ad14..ab6156ab 100644 --- a/src/scheduler/scheduler.ts +++ b/src/scheduler/scheduler.ts @@ -1,4 +1,4 @@ -import { WorkflowContext } from ".."; +import { DBOSEventReceiverState, WorkflowContext } from ".."; import { DBOSExecutor } from "../dbos-executor"; import { MethodRegistrationBase, registerAndWrapFunction } from "../decorators"; import { TimeMatcher } from "./crontab"; @@ -97,6 +97,8 @@ export class DBOSScheduler{ } } +const SCHEDULER_EVENT_SERVICE_NAME = 'dbos.scheduler'; + class DetachableLoop { private isRunning: boolean = false; private interruptResolve?: () => void; @@ -117,9 +119,10 @@ class DetachableLoop { // See if the exec time is available in durable storage... if (this.schedMode === SchedulerMode.ExactlyOncePerInterval) { - const lasttm = await this.dbosExec.systemDatabase.getLastScheduledTime(this.scheduledMethodName); + const lastState = await this.dbosExec.systemDatabase.getEventDispatchState(SCHEDULER_EVENT_SERVICE_NAME, this.scheduledMethodName, 'lastState'); + const lasttm = lastState?.value; if (lasttm) { - this.lastExec = new Date(lasttm); + this.lastExec = new Date(parseFloat(lasttm)); } } @@ -176,7 +179,15 @@ class DetachableLoop { } // Record the time of the wf kicked off - const dbTime = await this.dbosExec.systemDatabase.setLastScheduledTime(this.scheduledMethodName, nextExecTime.getTime()); + const ers: DBOSEventReceiverState = { + service: SCHEDULER_EVENT_SERVICE_NAME, + workflowFnName: this.scheduledMethodName, + key: 'lastState', + value: `${nextExecTime.getTime()}`, + updateTime: nextExecTime.getTime(), + } + const updRec = await this.dbosExec.systemDatabase.upsertEventDispatchState(ers); + const dbTime = parseFloat(updRec.value!); if (dbTime && dbTime > nextExecTime.getTime()) nextExecTime.setTime(dbTime); this.lastExec = nextExecTime; } diff --git a/src/system_database.ts b/src/system_database.ts index 95518d7d..3f403b43 100644 --- a/src/system_database.ts +++ b/src/system_database.ts @@ -5,13 +5,22 @@ import { DBOSExecutor, dbosNull, DBOSNull } from "./dbos-executor"; import { DatabaseError, Pool, PoolClient, Notification, PoolConfig, Client } from "pg"; import { DBOSWorkflowConflictUUIDError, DBOSNonExistentWorkflowError, DBOSDeadLetterQueueError } from "./error"; import { GetWorkflowQueueInput, GetWorkflowQueueOutput, GetWorkflowsInput, GetWorkflowsOutput, StatusString, WorkflowStatus } from "./workflow"; -import { notifications, operation_outputs, workflow_status, workflow_events, workflow_inputs, scheduler_state, workflow_queue } from "../schemas/system_db_schema"; +import { + notifications, + operation_outputs, + workflow_status, + workflow_events, + workflow_inputs, + workflow_queue, + event_dispatch_kv, +} from "../schemas/system_db_schema"; import { sleepms, findPackageRoot, DBOSJSON } from "./utils"; import { HTTPRequest } from "./context"; import { GlobalLogger as Logger } from "./telemetry/logs"; import knex, { Knex } from "knex"; import path from "path"; import { WorkflowQueue } from "./wfqueue"; +import { DBOSEventReceiverQuery, DBOSEventReceiverState } from "./eventreceiver"; export interface SystemDatabase { init(): Promise; @@ -55,10 +64,19 @@ export interface SystemDatabase { timeoutFunctionID: number }): Promise; - // Scheduler queries - // These two maintain exactly once - make sure we kick off the workflow at least once, and wf unique ID does the rest - getLastScheduledTime(wfn: string): Promise; // Last workflow we are sure we invoked - setLastScheduledTime(wfn: string, invtime: number): Promise; // We are now sure we invoked another + // Event receiver state queries / updates + // An event dispatcher may keep state in the system database + // The 'service' should be unique to the event receiver keeping state, to separate from others + // The 'workflowFnName' workflow function name should be the fully qualified / unique function name dispatched + // The 'key' field allows multiple records per service / workflow function + // The service+workflowFnName+key uniquely identifies the record, which is associated with: + // 'value' - a value set by the event receiver service; this string may be a JSON to keep complex details + // A version, either as a sequence number (long integer), or as a time (high precision floating point). + // If versions are in use, any upsert is discarded if the version field is less than what is already stored. + // The upsert returns the current record, which is useful if it is more recent. + getEventDispatchState(service: string, workflowFnName: string, key: string): Promise; + queryEventDispatchState(query: DBOSEventReceiverQuery): Promise; + upsertEventDispatchState(state: DBOSEventReceiverState): Promise; // Workflow management getWorkflows(input: GetWorkflowsInput): Promise; @@ -799,31 +817,95 @@ export class PostgresSystemDatabase implements SystemDatabase { this.notificationsClient.on("notification", handler); } - /* SCHEDULER */ - async getLastScheduledTime(wfn: string): Promise { - const res = await this.pool.query(` - SELECT last_run_time - FROM ${DBOSExecutor.systemDBSchemaName}.scheduler_state - WHERE workflow_fn_name = $1; - `, [wfn]); + // Event dispatcher queries / updates + async getEventDispatchState(svc: string, wfn: string, key: string): Promise { + const res = await this.pool.query(` + SELECT * + FROM ${DBOSExecutor.systemDBSchemaName}.event_dispatch_kv + WHERE workflow_fn_name = $1 + AND service_name = $2 + AND key = $3; + `, [wfn, svc, key]); - let v = res.rows[0]?.last_run_time ?? null; - if (v !== null) v = parseInt(`${v}`); - return v; - } + if (res.rows.length === 0) return undefined; - async setLastScheduledTime(wfn: string, invtime: number): Promise { - const res = await this.pool.query(` - INSERT INTO ${DBOSExecutor.systemDBSchemaName}.scheduler_state (workflow_fn_name, last_run_time) - VALUES ($1, $2) - ON CONFLICT (workflow_fn_name) - DO UPDATE SET last_run_time = GREATEST(EXCLUDED.last_run_time, scheduler_state.last_run_time) - RETURNING last_run_time; - `, [wfn, invtime]); + return { + service: res.rows[0].service_name, + workflowFnName: res.rows[0].workflow_fn_name, + key: res.rows[0].key, + value: res.rows[0].value, + updateTime: res.rows[0].update_time, + updateSeq: (res.rows[0].update_seq !== null && res.rows[0].update_seq !== undefined ? BigInt(res.rows[0].update_seq) : undefined), + }; + } - return parseInt(`${res.rows[0].last_run_time}`); + async queryEventDispatchState(input: DBOSEventReceiverQuery): Promise { + let query = this.knexDB(`${DBOSExecutor.systemDBSchemaName}.event_dispatch_kv`); + if (input.service) { + query = query.where('service_name', input.service); + } + if (input.workflowFnName) { + query = query.where('workflow_fn_name', input.workflowFnName); + } + if (input.key) { + query = query.where('key', input.key); + } + if (input.startTime) { + query = query.where('update_time', '>=', new Date(input.startTime).getTime()); + } + if (input.endTime) { + query = query.where('update_time', '<=', new Date(input.endTime).getTime()); + } + if (input.startSeq) { + query = query.where('update_seq', '>=', input.startSeq); + } + if (input.endSeq) { + query = query.where('update_seq', '<=', input.endSeq); + } + const rows = await query.select(); + const ers = rows.map((row) => { return { + service: row.service_name, + workflowFnName: row.workflow_fn_name, + key: row.key, + value: row.value, + updateTime: row.update_time, + updateSeq: (row.update_seq !== undefined && row.update_seq !== null ? BigInt(row.update_seq) : undefined), + }}); + return ers; } + async upsertEventDispatchState(state: DBOSEventReceiverState): Promise { + const res = await this.pool.query(` + INSERT INTO ${DBOSExecutor.systemDBSchemaName}.event_dispatch_kv ( + service_name, workflow_fn_name, key, value, update_time, update_seq) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (service_name, workflow_fn_name, key) + DO UPDATE SET + update_time = GREATEST(EXCLUDED.update_time, event_dispatch_kv.update_time), + update_seq = GREATEST(EXCLUDED.update_seq, event_dispatch_kv.update_seq), + value = CASE WHEN (EXCLUDED.update_time > event_dispatch_kv.update_time OR EXCLUDED.update_seq > event_dispatch_kv.update_seq OR + (event_dispatch_kv.update_time IS NULL and event_dispatch_kv.update_seq IS NULL)) + THEN EXCLUDED.value ELSE event_dispatch_kv.value END + RETURNING value, update_time, update_seq; + `, [ + state.service, + state.workflowFnName, + state.key, + state.value, + state.updateTime, + state.updateSeq, + ]); + + return { + service: state.service, + workflowFnName: state.workflowFnName, + key: state.key, + value: res.rows[0].value, + updateTime: res.rows[0].update_time, + updateSeq: (res.rows[0].update_seq !== undefined && res.rows[0].update_seq !== null ? BigInt(res.rows[0].update_seq) : undefined), + }; + } + async getWorkflows(input: GetWorkflowsInput): Promise { let query = this.knexDB<{workflow_uuid: string}>(`${DBOSExecutor.systemDBSchemaName}.workflow_status`).orderBy('created_at', 'desc'); if (input.workflowName) { @@ -880,7 +962,7 @@ export class PostgresSystemDatabase implements SystemDatabase { } async enqueueWorkflow(workflowId: string, queue: WorkflowQueue): Promise { - const _res = await this.pool.query(` + const _res = await this.pool.query(` INSERT INTO ${DBOSExecutor.systemDBSchemaName}.workflow_queue (workflow_uuid, queue_name) VALUES ($1, $2) ON CONFLICT (workflow_uuid) diff --git a/src/testing/testing_runtime.ts b/src/testing/testing_runtime.ts index 5b5dbdaf..5852c345 100644 --- a/src/testing/testing_runtime.ts +++ b/src/testing/testing_runtime.ts @@ -95,6 +95,7 @@ export class TestingRuntimeImpl implements TestingRuntime { #wfQueueRunner: Promise | null = null; #applicationConfig: object = {}; #isInitialized = false; + #dbosExec: DBOSExecutor | null = null; /** * Initialize the testing runtime by loading user functions specified in classes and using the specified config. @@ -103,20 +104,25 @@ export class TestingRuntimeImpl implements TestingRuntime { async init(userClasses?: object[], testConfig?: DBOSConfig, systemDB?: SystemDatabase) { const dbosConfig = testConfig ? [testConfig] : parseConfigFile(); DBOS.dbosConfig = dbosConfig[0]; - const dbosExec = new DBOSExecutor(dbosConfig[0], systemDB); - DBOS.globalLogger = dbosExec.logger; - await dbosExec.init(userClasses); - this.#server = new DBOSHttpServer(dbosExec); - for (const evtRcvr of dbosExec.eventReceivers) { - await evtRcvr.initialize(dbosExec); - } - this.#scheduler = new DBOSScheduler(dbosExec); + this.#dbosExec = new DBOSExecutor(dbosConfig[0], systemDB); + this.#applicationConfig = this.#dbosExec.config.application ?? {}; + DBOS.globalLogger = this.#dbosExec.logger; + await this.#dbosExec.init(userClasses); + this.#server = new DBOSHttpServer(this.#dbosExec); + await this.initEventReceivers(); + this.#scheduler = new DBOSScheduler(this.#dbosExec); this.#scheduler.initScheduler(); - this.#wfQueueRunner = wfQueueRunner.dispatchLoop(dbosExec); - this.#applicationConfig = dbosExec.config.application ?? {}; + this.#wfQueueRunner = wfQueueRunner.dispatchLoop(this.#dbosExec); + this.#applicationConfig = this.#dbosExec.config.application ?? {}; this.#isInitialized = true; } + async initEventReceivers() { + for (const evtRcvr of this.#dbosExec!.eventReceivers) { + await evtRcvr.initialize(this.#dbosExec!); + } + } + /** * Release resources after tests. */ @@ -132,14 +138,18 @@ export class TestingRuntimeImpl implements TestingRuntime { this.#server?.dbosExec?.logger.warn(`Error destroying workflow queue runner: ${e.message}`); } await this.#scheduler?.destroyScheduler(); - for (const evtRcvr of this.#server?.dbosExec?.eventReceivers || []) { - await evtRcvr.destroy(); - } + await this.destroyEventReceivers(); await this.#server?.dbosExec.destroy(); this.#isInitialized = false; } } + async destroyEventReceivers() { + for (const evtRcvr of this.#server?.dbosExec?.eventReceivers || []) { + await evtRcvr.destroy(); + } + } + /** * Get Application Configuration. */ diff --git a/tests/eventreceiver.test.ts b/tests/eventreceiver.test.ts index 9fabe433..e973f115 100644 --- a/tests/eventreceiver.test.ts +++ b/tests/eventreceiver.test.ts @@ -1,4 +1,6 @@ import { + DBNotification, + DBOSConfig, DBOSContext, DBOSEventReceiver, DBOSExecutorContext, @@ -10,7 +12,7 @@ import { associateMethodWithEventReceiver, } from "../src" import { createInternalTestRuntime } from "../src/testing/testing_runtime"; -import { generateDBOSTestConfig } from "./helpers"; +import { generateDBOSTestConfig, setUpDBOSTestDb } from "./helpers"; export interface ERDefaults { classval?: string; @@ -40,9 +42,9 @@ class ERD implements DBOSEventReceiver } async destroy() {} + async initialize(executor: DBOSExecutorContext) { this.executor = executor; - const _dropPromise = this.deliver3Events(); return Promise.resolve(); } logRegisteredEndpoints() {} @@ -98,10 +100,14 @@ class MyEventReceiver { } describe("event-receiver-tests", () => { + let config: DBOSConfig; let testRuntime: TestingRuntime; - beforeAll(async () => {}); - + beforeAll(async () => { + config = generateDBOSTestConfig(); + await setUpDBOSTestDb(config); + }); + beforeEach(async () => { testRuntime = await createInternalTestRuntime(undefined, generateDBOSTestConfig()); }, 30000); @@ -111,6 +117,7 @@ describe("event-receiver-tests", () => { }, 30000); test("wf-event", async () => { + const _dropPromise = erd.deliver3Events(); // Things will naturally start happening // We simply wait until we get all the calls, 10 seconds tops for (let i=0; i<100; ++i) { @@ -118,6 +125,111 @@ describe("event-receiver-tests", () => { await sleepms(100); } expect(MyEventReceiver.callNumSum).toBe(66); + }, 30000); + + test("user-db-query-notify", async () => { + let msg = ""; + const handler = (n: DBNotification) => { + msg = n.payload!; + }; + + const cn = await erd.executor!.userDBListen(["channel"], handler); + await erd.executor!.queryUserDB("SELECT pg_notify('channel', 'Hello');"); + let n = 100; + while (msg !== 'Hello') { + await sleepms(10); + --n; + if (!n) break; + } + await cn.close(); + expect(msg).toBe("Hello"); + }, 30000); + + test("sysdb-el-state-time", async () => { + const r0 = await erd.executor!.getEventDispatchState('test','func','key0'); + expect(r0).toBeUndefined(); + const r1 = await erd.executor!.upsertEventDispatchState({ + service: 'test', + workflowFnName: 'func', + key: 'key0', + value: 'V1', + }); + expect(r1.value).toBe('V1'); + const r2 = await erd.executor!.upsertEventDispatchState({ + service: 'test', + workflowFnName: 'func', + key: 'key0', + value: 'V0', + }); + expect(r2.value).toBe('V0'); + const r3 = await erd.executor!.upsertEventDispatchState({ + service: 'test', + workflowFnName: 'func', + key: 'key0', + value: 'V2', + }); + expect(r3.value).toBe('V2'); + expect((await erd.executor!.getEventDispatchState('test','func','key0'))?.value).toBe('V2'); + }); + + test("sysdb-el-state-time", async () => { + const r0 = await erd.executor!.getEventDispatchState('test','func','key1'); + expect(r0).toBeUndefined(); + const r1 = await erd.executor!.upsertEventDispatchState({ + service: 'test', + workflowFnName: 'func', + key: 'key1', + value: 'V1', + updateTime: new Date().getTime(), + }); + expect(r1.value).toBe('V1'); + const r2 = await erd.executor!.upsertEventDispatchState({ + service: 'test', + workflowFnName: 'func', + key: 'key1', + value: 'V0', + updateTime: new Date().getTime() - 1000, + }); + expect(r2.value).toBe('V1'); + const r3 = await erd.executor!.upsertEventDispatchState({ + service: 'test', + workflowFnName: 'func', + key: 'key1', + value: 'V2', + updateTime: new Date().getTime()+1000, + }); + expect(r3.value).toBe('V2'); + expect((await erd.executor!.getEventDispatchState('test','func','key1'))?.value).toBe('V2'); + }); + + test("sysdb-el-state-seqn", async () => { + const r0 = await erd.executor!.getEventDispatchState('test','func','key2'); + expect(r0).toBeUndefined(); + const r1 = await erd.executor!.upsertEventDispatchState({ + service: 'test', + workflowFnName: 'func', + key: 'key2', + value: 'V1', + updateSeq: 111111111111111111111111111111n, + }); + expect(r1.value).toBe('V1'); + const r2 = await erd.executor!.upsertEventDispatchState({ + service: 'test', + workflowFnName: 'func', + key: 'key2', + value: 'V0', + updateSeq: 111111111111111111111111111110n, + }); + expect(r2.value).toBe('V1'); + const r3 = await erd.executor!.upsertEventDispatchState({ + service: 'test', + workflowFnName: 'func', + key: 'key2', + value: 'V2', + updateSeq: 211111111111111111111111111111n, + }); + expect(r3.value).toBe('V2'); + expect((await erd.executor!.getEventDispatchState('test','func','key2'))?.value).toBe('V2'); + expect((await erd.executor!.getEventDispatchState('test','func','key2'))?.updateSeq).toBe(211111111111111111111111111111n); }); }); -