Inspired by the beauty of PocketFlow (https://github.com/The-Pocket/PocketFlow/) here's FocketPlow.
A flexible, type-safe framework for building multi-agent systems using flow-based programming patterns. It supports both synchronous and asynchronous execution with retry semantics, batch processing, and parallel execution capabilities.
npm install focketplow
The library is built around Nodes which are the fundamental building blocks of any flow. Each node has three main lifecycle phases:
- Prep: Prepare data and set up the node for execution
- Exec: Perform the actual work/business logic
- Post: Handle results and decide what action to take next
- Sync Flow: Execute nodes synchronously
- Async Flow: Execute nodes asynchronously with Promise support
- Batch Flow: Process multiple parameter bundles
- Parallel Flow: Process items in parallel for better performance
interface NodeGenerics<S, P extends Dict, Prep, Out> {
shared: S; // Immutable shared context
params: P; // Mutable parameters
prep: Prep; // Data from prep() phase
out: Out; // Return value from exec() phase
}type Action = string | undefineddefaultorundefined: Follow the default path- Custom strings: Select specific edges for conditional flows
The foundation of all nodes with basic lifecycle management.
setParams(params: P): Configure node parametersnext(node, action?): Connect successor nodeson(action).to(node): Fluent API for connecting nodesrun(shared): Execute node standalone
prep(shared: S): Prep | voidexec(prepRes: Prep): Out | voidpost(shared: S, prepRes: Prep, execRes: Out): Action
Extends BaseNode with retry capabilities.
{
maxRetries?: number, // default: 1
waitMs?: number // default: 0 (no delay)
}Process arrays of items by running exec over each item.
Orchestrates multiple nodes by using actions to determine the path.
start(node): Set the starting noderun(shared): Execute the entire flow
Asynchronous version of Node with Promise-based execution.
prepAsync(shared: S): Promise<Prep | void>execAsync(prepRes: Prep): Promise<Out | void>execFallbackAsync(prepRes: Prep, exc): Promise<Out | void>postAsync(shared: S, prepRes: Prep, execRes: Out): Promise<Action>
runAsync(shared: S): Promise<Action>
Asynchronous flow orchestration that automatically handles both sync and async nodes.
- BatchFlow: Sequential processing of parameter bundles
- AsyncBatchFlow: Async sequential processing
- AsyncParallelBatchFlow: Parallel async processing
const flow = new Flow()
.start(new NodeA())
.next(new NodeB())
.next(new NodeC());class MyBatchFlow extends BatchFlow<MyContext, MyParams> {
prep(shared: MyContext) {
return [{ id: 1 }, { id: 2 }, { id: 3 }]; // Parameter bundles
}
}class MyParallelFlow extends AsyncParallelBatchFlow<MyContext, MyParams> {
prep(shared: MyContext) {
return items.map(item => ({ id: item.id }));
}
}All nodes support retry semantics with configurable retry counts and delays. When max retries are exceeded, you can provide custom fallback behavior.
Use action values to create branching flows:
class DecisionNode extends Node<SharedContext, Params> {
post(shared: SharedContext, prep: any, out: any): Action {
return out.success ? 'success_path' : 'failure_path';
}
}
const flow = new Flow()
.start(decisionNode)
.on('success_path').to(successHandler)
.on('failure_path').to(errorHandler);The library provides full TypeScript type safety for all node parameters, shared context, preparation data, and output types.
interface Context {
username: string;
requestId: string;
}
interface Config {
rateLimit?: number;
timeout?: number;
}
class TypedNode extends Node<Context, Config, string, number> {
// type-safe parameters
exec(input: string): number {
return input.length;
}
}/**
* Focketplow Node Lifecycle:
* - prep(): Prepare phase - Extract data from shared context, perform setup
* - exec(): Execute phase - Main logic of the node, receives result from prep()
* - post(): Post-execute phase - Handle results, update shared context, determine next action
*/
import { Flow, Node } from 'focketplow';
class NodeA extends Node {
prep(shared) {
console.log('NodeA: prep phase');
return {};
}
exec(prepData) {
console.log('NodeA: exec phase');
return 1;
}
post(shared, prepData, output) {
console.log('NodeA: post phase - result:', output);
shared.results = shared.results || [];
shared.results.push(output);
return undefined;
}
}
class NodeB extends Node {
prep(shared) {
console.log('NodeB: prep phase');
return {};
}
exec(prepData) {
console.log('NodeB: exec phase');
return 2;
}
post(shared, prepData, output) {
console.log('NodeB: post phase - result:', output);
shared.results.push(output);
return undefined;
}
}
class NodeC extends Node {
prep(shared) {
console.log('NodeC: prep phase');
return {};
}
exec(prepData) {
console.log('NodeC: exec phase');
return 3;
}
post(shared, prepData, output) {
console.log('NodeC: post phase - result:', output);
shared.results.push(output);
return shared.results.join(',');
}
}
const nodeA = new NodeA();
const nodeB = new NodeB();
const nodeC = new NodeC();
nodeA.next(nodeB);
nodeB.next(nodeC);
const flow = new Flow();
flow.start(nodeA);
const sharedContext = {};
const result = flow.run(sharedContext);
console.log('Flow result:', result);
/**
* result:
NodeA: prep phase
NodeA: exec phase
NodeA: post phase - result: 1
NodeB: prep phase
NodeB: exec phase
NodeB: post phase - result: 2
NodeC: prep phase
NodeC: exec phase
NodeC: post phase - result: 3
Flow result: 1,2,3
*/