Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DX-1510: Agents #51

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified bun.lockb
Binary file not shown.
7 changes: 0 additions & 7 deletions examples/nextjs/app/sleep/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,4 @@ export const { POST } = serve<string>(async (context) => {
console.log('step 2 input', result1, 'output', output)
return output
})

await context.sleep('sleep2', 2)

await context.run('step3', async () => {
const output = someWork(result2)
console.log('step 3 input', result2, 'output', output)
})
})
2 changes: 1 addition & 1 deletion examples/nextjs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@
"tailwindcss": "^3.4.1",
"typescript": "^5.6.2"
}
}
}
5 changes: 4 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,10 @@
"typescript-eslint": "^8.18.0"
},
"dependencies": {
"@upstash/qstash": "^2.7.20"
"@ai-sdk/openai": "^1.0.15",
"@upstash/qstash": "^2.7.20",
"ai": "^4.0.30",
"zod": "^3.24.1"
},
"directories": {
"example": "examples"
Expand Down
152 changes: 152 additions & 0 deletions src/agents/adapters.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import { describe, test, expect } from "bun:test";
import { WorkflowContext } from "../context";
import { Client } from "@upstash/qstash";
import { MOCK_QSTASH_SERVER_URL, mockQStashServer, WORKFLOW_ENDPOINT } from "../test-utils";
import { getWorkflowRunId, nanoid } from "../utils";
import { wrapTools } from "./adapters";
import { tool } from "ai";
import { z } from "zod";
import { LangchainTool } from "./types";

describe("wrapTools", () => {
const token = getWorkflowRunId();
const workflowRunId = nanoid();
const createContext = () =>
new WorkflowContext({
headers: new Headers({}) as Headers,
initialPayload: "mock",
qstashClient: new Client({ baseUrl: MOCK_QSTASH_SERVER_URL, token }),
steps: [],
url: WORKFLOW_ENDPOINT,
workflowRunId,
});

const aiSDKToolDescription = "ai sdk tool";
const langChainToolDescription = "langchain sdk tool";
const parameters = z.object({ expression: z.string() });
const execute = async ({ expression }: { expression: string }) => expression;

const aiSDKTool = tool({
description: aiSDKToolDescription,
parameters,
execute,
});

const langChainTool: LangchainTool = {
description: langChainToolDescription,
schema: parameters,
invoke: execute,
};

test("should wrap AI SDK tool with execute", async () => {
const context = createContext();
const wrappedTools = wrapTools({ context, tools: { aiSDKTool } });

expect(Object.entries(wrappedTools).length).toBe(1);
const wrappedTool = wrappedTools["aiSDKTool"];
// @ts-expect-error description exists but can't resolve the type
expect(wrappedTool.description === aiSDKToolDescription).toBeTrue();

await mockQStashServer({
execute: () => {
const execute = wrappedTool.execute;
if (!execute) {
throw new Error("execute is missing.");
} else {
const throws = () => execute({ expression: "hello" }, { messages: [], toolCallId: "id" });
expect(throws).toThrowError(
`Aborting workflow after executing step 'Run tool aiSDKTool'`
);
}
},
responseFields: {
status: 200,
body: "msgId",
},
receivesRequest: {
method: "POST",
url: `${MOCK_QSTASH_SERVER_URL}/v2/batch`,
token,
body: [
{
body: '{"stepId":1,"stepName":"Run tool aiSDKTool","stepType":"Run","out":"\\"hello\\"","concurrent":1}',
destination: WORKFLOW_ENDPOINT,
headers: {
"content-type": "application/json",
"upstash-failure-callback-retries": "3",
"upstash-feature-set": "LazyFetch,InitialBody",
"upstash-forward-upstash-workflow-sdk-version": "1",
"upstash-method": "POST",
"upstash-retries": "3",
"upstash-workflow-init": "false",
"upstash-workflow-runid": workflowRunId,
"upstash-workflow-url": "https://requestcatcher.com/api",
},
},
],
},
});
});
test("should wrap LangChain tool with execute", async () => {
const context = createContext();
const wrappedTools = wrapTools({ context, tools: { langChainTool } });

expect(Object.entries(wrappedTools).length).toBe(1);
const wrappedTool = wrappedTools["langChainTool"];
// @ts-expect-error description exists but can't resolve the type
expect(wrappedTool.description === langChainToolDescription).toBeTrue();

await mockQStashServer({
execute: () => {
const execute = wrappedTool.execute;
if (!execute) {
throw new Error("execute is missing.");
} else {
const throws = () => execute({ expression: "hello" }, { messages: [], toolCallId: "id" });
expect(throws).toThrowError(
`Aborting workflow after executing step 'Run tool langChainTool'`
);
}
},
responseFields: {
status: 200,
body: "msgId",
},
receivesRequest: {
method: "POST",
url: `${MOCK_QSTASH_SERVER_URL}/v2/batch`,
token,
body: [
{
body: '{"stepId":1,"stepName":"Run tool langChainTool","stepType":"Run","out":"\\"hello\\"","concurrent":1}',
destination: WORKFLOW_ENDPOINT,
headers: {
"content-type": "application/json",
"upstash-failure-callback-retries": "3",
"upstash-feature-set": "LazyFetch,InitialBody",
"upstash-forward-upstash-workflow-sdk-version": "1",
"upstash-method": "POST",
"upstash-retries": "3",
"upstash-workflow-init": "false",
"upstash-workflow-runid": workflowRunId,
"upstash-workflow-url": "https://requestcatcher.com/api",
},
},
],
},
});
});
test("should wrap multiple tools", async () => {
const context = createContext();
const wrappedTools = wrapTools({ context, tools: { langChainTool, aiSDKTool } });

expect(Object.entries(wrappedTools).length).toBe(2);
const wrappedLangChainTool = wrappedTools["langChainTool"];
// @ts-expect-error description exists but can't resolve the type
expect(wrappedLangChainTool.description === langChainToolDescription).toBeTrue();

const wrappedAiSDKTool = wrappedTools["aiSDKTool"];
// @ts-expect-error description exists but can't resolve the type
expect(wrappedAiSDKTool.description === aiSDKToolDescription).toBeTrue();
});
});
129 changes: 129 additions & 0 deletions src/agents/adapters.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/**
* this file contains adapters which convert tools and models
* to workflow tools and models.
*/
import { createOpenAI } from "@ai-sdk/openai";
import { HTTPMethods } from "@upstash/qstash";
import { WorkflowContext } from "../context";
import { tool } from "ai";
import { AISDKTool, LangchainTool } from "./types";
import { AGENT_NAME_HEADER } from "./constants";

/**
* creates an AI SDK openai client with a custom
* fetch implementation which uses context.call.
*
* @param context workflow context
* @returns ai sdk openai
*/
export const createWorkflowOpenAI = (context: WorkflowContext) => {
return createOpenAI({
compatibility: "strict",
fetch: async (input, init) => {
try {
// Prepare headers from init.headers
const headers = init?.headers
? Object.fromEntries(new Headers(init.headers).entries())
: {};

// Prepare body from init.body
const body = init?.body ? JSON.parse(init.body as string) : undefined;

// create step name
const agentName = headers[AGENT_NAME_HEADER] as string | undefined;
const stepName = agentName ? `Call Agent ${agentName}` : "Call Agent";

// Make network call
const responseInfo = await context.call(stepName, {
url: input.toString(),
method: init?.method as HTTPMethods,
headers,
body,
});

// Construct headers for the response
const responseHeaders = new Headers(
Object.entries(responseInfo.header).reduce(
(acc, [key, values]) => {
acc[key] = values.join(", ");
fahreddinozcan marked this conversation as resolved.
Show resolved Hide resolved
return acc;
},
{} as Record<string, string>
)
);

// Return the constructed response
return new Response(JSON.stringify(responseInfo.body), {
status: responseInfo.status,
headers: responseHeaders,
});
} catch (error) {
if (error instanceof Error && error.name === "WorkflowAbort") {
throw error;
} else {
console.error("Error in fetch implementation:", error);
throw error; // Rethrow error for further handling
}
}
},
});
};

/**
* converts LangChain tools to AI SDK tools and updates
* the execute method of these tools by wrapping it with
* context.run.
*
* @param context workflow context
* @param tools map of AI SDK or LangChain tools and their names
* @returns
*/
export const wrapTools = ({
context,
tools,
}: {
context: WorkflowContext;
tools: Record<string, AISDKTool | LangchainTool>;
}): Record<string, AISDKTool> => {
return Object.fromEntries(
Object.entries(tools).map((toolInfo) => {
const [toolName, tool] = toolInfo;
const aiSDKTool: AISDKTool = convertToAISDKTool(tool);

const execute = aiSDKTool.execute;
if (execute) {
fahreddinozcan marked this conversation as resolved.
Show resolved Hide resolved
const wrappedExecute = (...params: Parameters<typeof execute>) => {
return context.run(`Run tool ${toolName}`, () => execute(...params));
};
aiSDKTool.execute = wrappedExecute;
}

return [toolName, aiSDKTool];
})
);
};

/**
* Converts tools to AI SDK tool if it already isn't
*
* @param tool LangChain or AI SDK Tool
* @returns AI SDK Tool
*/
const convertToAISDKTool = (tool: AISDKTool | LangchainTool): AISDKTool => {
const isLangchainTool = "invoke" in tool;
return isLangchainTool ? convertLangchainTool(tool as LangchainTool) : (tool as AISDKTool);
};

/**
* converts a langchain tool to AI SDK tool
*
* @param langchainTool
* @returns AI SDK Tool
*/
const convertLangchainTool = (langchainTool: LangchainTool): AISDKTool => {
return tool({
description: langchainTool.description,
parameters: langchainTool.schema,
execute: async (...param: unknown[]) => langchainTool.invoke(...param),
});
};
Loading
Loading