Skip to content

Commit

Permalink
Restart workflow on create (#30)
Browse files Browse the repository at this point in the history
* Restart workflow on creator

Signed-off-by: Marcos Candeia <[email protected]>

* Add restart qs

Signed-off-by: Marcos Candeia <[email protected]>

* Add restart qs

Signed-off-by: Marcos Candeia <[email protected]>

---------

Signed-off-by: Marcos Candeia <[email protected]>
  • Loading branch information
mcandeia authored Nov 1, 2023
1 parent a987255 commit d137270
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 7 deletions.
1 change: 1 addition & 0 deletions api/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ export const getRouter = async (
...exec,
namespace: get("namespace"),
},
{ restart: new URL(req.url).searchParams.has("restart") },
Array.isArray(exec.input) ? exec.input : [exec.input],
),
);
Expand Down
4 changes: 4 additions & 0 deletions api/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import { HistoryEvent, newEvent } from "../runtime/core/events.ts";
import { JwtIssuer } from "../security/jwt.ts";
import { Arg } from "../types.ts";

export interface StartOptions {
restart?: boolean | null;
}
/**
* WorkflowCreationOptions is used for creating workflows of a given executionId.
*/
Expand Down Expand Up @@ -136,6 +139,7 @@ export class WorkflowService {
*/
public startExecution<TArgs extends Arg = Arg>(
_execution: WorkflowExecutionBase,
startOptions: StartOptions = {},
input?: [...TArgs],
): Promise<WorkflowExecution> {
const namespace = _execution.namespace;
Expand Down
5 changes: 4 additions & 1 deletion backends/backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ export interface Events {
get(pagination?: PaginationParams): Promise<HistoryEvent[]>;
}

export interface CreateOptions {
restart?: boolean | null;
}
/**
* Execution is all operations that can be executed in a given execution.
*/
Expand All @@ -29,7 +32,7 @@ export interface Execution {
TResult = unknown,
TMetadata extends Metadata = Metadata,
>(): Promise<WorkflowExecution<TArgs, TResult, TMetadata> | undefined>;
create(execution: WorkflowExecution): Promise<void>;
create(execution: WorkflowExecution, options?: CreateOptions): Promise<void>;
update(execution: WorkflowExecution): Promise<void>;
/**
* withintransaction executes commands inside a transaction providing the ACID guarantees
Expand Down
20 changes: 18 additions & 2 deletions backends/durableObjects/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { HistoryEvent } from "../../runtime/core/events.ts";
import { Env } from "../../src/worker.ts";
import { Arg } from "../../types.ts";
import {
CreateOptions,
DB,
Events,
Execution,
Expand Down Expand Up @@ -81,8 +82,23 @@ const executionFor = (
},
pending: eventsFor({ signal, ...rest }, "/pending", durable),
history: eventsFor({ signal, ...rest }, "/history", durable),
update: useMethod("PUT"),
create: useMethod("POST"),
update: (workflow: WorkflowExecution) => {
return durable.fetch(withOrigin("/"), {
signal,
method: "PUT",
body: JSON.stringify(workflow),
}).then(parseOrThrow<void>());
},
create: (workflow: WorkflowExecution, createOpts?: CreateOptions) => {
return durable.fetch(
withOrigin(`/${createOpts?.restart ? "?restart" : ""}`),
{
signal,
method: "POST",
body: JSON.stringify(workflow),
},
).then(parseOrThrow<void>());
},
withinTransaction: async <T>(f: (db: Execution) => PromiseOrValue<T>) => {
return await f(executionFor({ signal, ...rest }, durable));
},
Expand Down
1 change: 0 additions & 1 deletion backends/durableObjects/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ export const durableExecution = (
}
return values;
},
del: async () => {}, // del is not supported on history
},
withinTransaction: async <T>(
f: (transactionalDb: Execution) => PromiseOrValue<T>,
Expand Down
3 changes: 2 additions & 1 deletion client/init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,11 @@ export const start = async <
TMetadata extends Metadata = Metadata,
>(
exec: WorkflowExecutionBase,
restart?: boolean,
opts?: ClientOptions,
): Promise<WorkflowExecution<TArgs, TResult, TMetadata>> => {
return await fetchJSON<WorkflowExecution<TArgs, TResult, TMetadata>>(
`/executions`,
`/executions${restart ? "?restart" : ""}`,
opts,
{
body: JSON.stringify(exec),
Expand Down
21 changes: 19 additions & 2 deletions src/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,28 @@ export const buildRoutes = (wkflow: Workflow): Routes => {
return {
"/": {
"GET": async (_req: Request) => {
return Response.json(await wkflow.execution.get());
const result = await wkflow.execution.get();
if (!result) {
return new Response(null, { status: 404 });
}
return Response.json(result);
},
"POST": async (req: Request) => {
const body: WorkflowExecution = await req.json();
await wkflow.execution.create(body);
const shouldRestart = new URL(req.url).searchParams.has("restart");
const pendingAndHistory = shouldRestart
? Promise.all([
wkflow.execution.pending.get(),
wkflow.execution.history.get(),
])
: Promise.resolve([[], []]);
const createPromise = wkflow.execution.create(body);
const [pending, history] = await pendingAndHistory;
await Promise.all([
wkflow.execution.pending.del(...pending),
wkflow.execution.history.del(...history),
createPromise,
]);
wkflow.workflowExecution = body;
return new Response(JSON.stringify(body), { status: 201 });
},
Expand Down

0 comments on commit d137270

Please sign in to comment.