Skip to content

Commit

Permalink
copilot workspace translation of temporalio/samples-python#123
Browse files Browse the repository at this point in the history
  • Loading branch information
dandavison committed Jul 2, 2024
1 parent 7b2e902 commit f2bb721
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 0 deletions.
33 changes: 33 additions & 0 deletions updates_and_signals/safe_message_handlers/src/activities.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { proxyActivities } from '@temporalio/workflow';
import { ActivityInput as AllocateNodesToJobInput } from './interfaces';

// Activities with TypeScript syntax and Temporal TypeScript SDK specifics
const { allocateNodesToJob, deallocateNodesForJob, findBadNodes } = proxyActivities<{
allocateNodesToJob(input: AllocateNodesToJobInput): Promise<void>;
deallocateNodesForJob(input: DeallocateNodesForJobInput): Promise<void>;
findBadNodes(input: FindBadNodesInput): Promise<string[]>;
}>({
startToCloseTimeout: '1 minute',
});

export async function allocateNodesToJob(input: AllocateNodesToJobInput): Promise<void> {
console.log(`Assigning nodes ${input.nodes} to job ${input.jobName}`);
await new Promise(resolve => setTimeout(resolve, 100)); // Simulate async operation
}

export async function deallocateNodesForJob(input: DeallocateNodesForJobInput): Promise<void> {
console.log(`Deallocating nodes ${input.nodes} from job ${input.jobName}`);
await new Promise(resolve => setTimeout(resolve, 100)); // Simulate async operation
}

export async function findBadNodes(input: FindBadNodesInput): Promise<string[]> {
await new Promise(resolve => setTimeout(resolve, 100)); // Simulate async operation
const badNodes = input.nodesToCheck.filter(n => parseInt(n) % 5 === 0);
if (badNodes.length) {
console.log(`Found bad nodes: ${badNodes}`);
} else {
console.log("No new bad nodes found.");
}
return badNodes;
}

20 changes: 20 additions & 0 deletions updates_and_signals/safe_message_handlers/src/starter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { WorkflowClient } from '@temporalio/client';
import { ClusterManagerWorkflow } from './workflow';
import { doClusterLifecycle } from './utils';

async function main() {
const client = new WorkflowClient();

// Define the workflow handle
const workflow = client.createWorkflowHandle(ClusterManagerWorkflow, {
workflowId: 'cluster-management-workflow',
});

// Start the cluster lifecycle
await doClusterLifecycle(workflow);
}

main().catch((err) => {
console.error(err);
process.exit(1);
});
17 changes: 17 additions & 0 deletions updates_and_signals/safe_message_handlers/src/worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { Worker } from '@temporalio/worker';
import path from 'path';

async function run() {
const worker = await Worker.create({
workflowsPath: path.join(__dirname, './workflows'),
activitiesPath: path.join(__dirname, './activities'),
taskQueue: 'safe-message-handlers-task-queue',
});

await worker.run();
}

run().catch((err) => {
console.error(err);
process.exit(1);
});
102 changes: 102 additions & 0 deletions updates_and_signals/safe_message_handlers/src/workflow.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// This file contains the TypeScript port of the Python workflow for managing cluster updates and signals

import { proxyActivities, defineSignal, defineQuery, setHandler, condition, sleep, defineWorkflow } from '@temporalio/workflow';
import type { AllocateNodesToJobInput, DeallocateNodesForJobInput, FindBadNodesInput } from './interfaces';

// Define signals
const startClusterSignal = defineSignal('startCluster');
const shutdownClusterSignal = defineSignal('shutdownCluster');
const allocateNodesToJobSignal = defineSignal<[AllocateNodesToJobInput]>('allocateNodesToJob');
const deallocateNodesForJobSignal = defineSignal<[DeallocateNodesForJobInput]>('deallocateNodesForJob');

// Define queries
const getClusterStatusQuery = defineQuery<{}>('getClusterStatus');

// Define activities
const { allocateNodesToJob, deallocateNodesForJob, findBadNodes } = proxyActivities<{
allocateNodesToJob(input: AllocateNodesToJobInput): Promise<void>;
deallocateNodesForJob(input: DeallocateNodesForJobInput): Promise<void>;
findBadNodes(input: FindBadNodesInput): Promise<string[]>;
}>({
startToCloseTimeout: '1 minute',
});

// Define workflow interface
export interface ClusterManagerWorkflow {
run(input: ClusterManagerWorkflowInput): Promise<ClusterManagerWorkflowResult>;
}

// Define workflow input and result types
export interface ClusterManagerWorkflowInput {
testContinueAsNew: boolean;
}

export interface ClusterManagerWorkflowResult {
maxAssignedNodes: number;
numCurrentlyAssignedNodes: number;
numBadNodes: number;
}

// Workflow implementation
export const clusterManagerWorkflow: ClusterManagerWorkflow = defineWorkflow({
async run(input: ClusterManagerWorkflowInput) {
let state = {
clusterStarted: false,
clusterShutdown: false,
nodes: {} as Record<string, string | null>,
jobsAdded: new Set<string>(),
maxAssignedNodes: 0,
};

// Signal handlers
setHandler(startClusterSignal, () => {
state.clusterStarted = true;
for (let i = 0; i < 25; i++) {
state.nodes[i.toString()] = null;
}
});

setHandler(shutdownClusterSignal, () => {
state.clusterShutdown = true;
});

setHandler(allocateNodesToJobSignal, async (input: AllocateNodesToJobInput) => {
if (!state.clusterStarted || state.clusterShutdown) {
throw new Error('Cluster is not in a valid state for node allocation');
}
// Allocate nodes to job logic
});

setHandler(deallocateNodesForJobSignal, async (input: DeallocateNodesForJobInput) => {
if (!state.clusterStarted || state.clusterShutdown) {
throw new Error('Cluster is not in a valid state for node deallocation');
}
// Deallocate nodes from job logic
});

// Query handler
setHandler(getClusterStatusQuery, () => {
return {
clusterStarted: state.clusterStarted,
clusterShutdown: state.clusterShutdown,
numNodes: Object.keys(state.nodes).length,
numAssignedNodes: Object.values(state.nodes).filter(n => n !== null).length,
};
});

// Main workflow logic
await condition(() => state.clusterStarted, 'Waiting for cluster to start');
// Perform operations while cluster is active
while (!state.clusterShutdown) {
// Example: perform periodic health checks
await sleep(60000); // Sleep for 60 seconds
}

// Return workflow result
return {
maxAssignedNodes: state.maxAssignedNodes,
numCurrentlyAssignedNodes: Object.values(state.nodes).filter(n => n !== null).length,
numBadNodes: Object.values(state.nodes).filter(n => n === 'BAD').length,
};
},
});
28 changes: 28 additions & 0 deletions updates_and_signals/safe_message_handlers/src/workflow_test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { WorkflowClient } from '@temporalio/client';
import { ClusterManagerWorkflow } from './workflow';
import { v4 as uuidv4 } from 'uuid';

async function run() {
const client = new WorkflowClient();

// Define the workflow handle
const workflow = client.createWorkflowHandle(ClusterManagerWorkflow, {
workflowId: `cluster-management-workflow-${uuidv4()}`,
});

// Test workflow functionality
await workflow.start();
await workflow.signal.startCluster();
await workflow.executeUpdate('allocateNodesToJob', {
numNodes: 5,
jobName: 'job1',
});
await workflow.signal.shutdownCluster();
const result = await workflow.result();
console.log('Workflow result:', result);
}

run().catch((err) => {
console.error(err);
process.exit(1);
});

0 comments on commit f2bb721

Please sign in to comment.