Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/durable_execution/temporal.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ Other than that, any agent and toolset will just work!

### Instructions Functions, Output Functions, and History Processors

Pydantic AI runs non-async [instructions](../agents.md#instructions) and [system prompt](../agents.md#system-prompts) functions, [history processors](../message-history.md#processing-message-history), [output functions](../output.md#output-functions), and [output validators](../output.md#output-validator-functions) in threads, which are not supported inside Temporal workflows and require an activity. Ensure that these functions are async instead.
Pydantic AI runs non-async [instructions](../agents.md#instructions) and [system prompt](../agents.md#system-prompts) functions, [history processors](../message-history.md#processing-message-history), [output functions](../output.md#output-functions), and [output validators](../output.md#output-validator-functions) in threads. Inside Temporal workflows, [`TemporalAgent`][pydantic_ai.durable_exec.temporal.TemporalAgent] automatically enables blocking execution mode to avoid threading, allowing synchronous versions of these functions to work correctly. For consistency and performance, it's still recommended to use async functions where possible.

Synchronous tool functions are supported, as tools are automatically run in activities unless this is [explicitly disabled](#activity-configuration). Still, it's recommended to make tool functions async as well to improve performance.

Expand Down
40 changes: 38 additions & 2 deletions pydantic_ai_slim/pydantic_ai/_utils.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,23 @@
import time
import uuid
from collections.abc import AsyncIterable, AsyncIterator, Awaitable, Callable, Iterable, Iterator
from contextlib import asynccontextmanager, suppress
from contextlib import asynccontextmanager, contextmanager, suppress
from contextvars import ContextVar
from dataclasses import dataclass, fields, is_dataclass
from datetime import datetime, timezone
from functools import partial
from types import GenericAlias
from typing import TYPE_CHECKING, Any, Generic, TypeAlias, TypeGuard, TypeVar, get_args, get_origin, overload
from typing import (
TYPE_CHECKING,
Any,
Generic,
TypeAlias,
TypeGuard,
TypeVar,
get_args,
get_origin,
overload,
)

from anyio.to_thread import run_sync
from pydantic import BaseModel, TypeAdapter
Expand Down Expand Up @@ -41,8 +52,33 @@
_P = ParamSpec('_P')
_R = TypeVar('_R')

_prefer_blocking_execution: ContextVar[bool] = ContextVar('_prefer_blocking_execution', default=False)


@contextmanager
def blocking_execution() -> Iterator[None]:
"""Context manager to enable blocking execution mode.

Inside this context, sync functions will execute inline rather than
being sent to a thread pool via [`anyio.to_thread.run_sync`][anyio.to_thread.run_sync].

This is useful in environments where threading is restricted, such as
Temporal workflows which use a sandboxed event loop.

Yields:
None
"""
token = _prefer_blocking_execution.set(True)
try:
yield
finally:
_prefer_blocking_execution.reset(token)


async def run_in_executor(func: Callable[_P, _R], *args: _P.args, **kwargs: _P.kwargs) -> _R:
if _prefer_blocking_execution.get():
return func(*args, **kwargs)

wrapped_func = partial(func, *args, **kwargs)
return await run_sync(wrapped_func)

Expand Down
11 changes: 8 additions & 3 deletions pydantic_ai_slim/pydantic_ai/durable_exec/temporal/_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,17 +243,22 @@ def temporal_activities(self) -> list[Callable[..., Any]]:

@contextmanager
def _temporal_overrides(self) -> Iterator[None]:
from pydantic_ai._utils import blocking_execution

# We reset tools here as the temporalized function toolset is already in self._toolsets.
with super().override(model=self._model, toolsets=self._toolsets, tools=[]):
token = self._temporal_overrides_active.set(True)
with (
super().override(model=self._model, toolsets=self._toolsets, tools=[]),
blocking_execution(),
):
temporal_active_token = self._temporal_overrides_active.set(True)
try:
yield
except PydanticSerializationError as e:
raise UserError(
"The `deps` object failed to be serialized. Temporal requires all objects that are passed to activities to be serializable using Pydantic's `TypeAdapter`."
) from e
finally:
self._temporal_overrides_active.reset(token)
self._temporal_overrides_active.reset(temporal_active_token)

@overload
async def run(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
interactions:
- request:
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '105'
content-type:
- application/json
host:
- api.openai.com
method: POST
parsed_body:
messages:
- content: What is the capital of Mexico?
role: user
model: gpt-4o
stream: false
uri: https://api.openai.com/v1/chat/completions
response:
headers:
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
connection:
- keep-alive
content-length:
- '838'
content-type:
- application/json
openai-organization:
- user-grnwlxd1653lxdzp921aoihz
openai-processing-ms:
- '324'
openai-project:
- proj_FYsIItHHgnSPdHBVMzhNBWGa
openai-version:
- '2020-10-01'
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
transfer-encoding:
- chunked
parsed_body:
choices:
- finish_reason: stop
index: 0
logprobs: null
message:
annotations: []
content: The capital of Mexico is Mexico City.
refusal: null
role: assistant
created: 1765424931
id: chatcmpl-ClRxbbqMv20jQuYMqU1BaFBftlZWS
model: gpt-4o-2024-08-06
object: chat.completion
service_tier: default
system_fingerprint: fp_83554c687e
usage:
completion_tokens: 8
completion_tokens_details:
accepted_prediction_tokens: 0
audio_tokens: 0
reasoning_tokens: 0
rejected_prediction_tokens: 0
prompt_tokens: 14
prompt_tokens_details:
audio_tokens: 0
cached_tokens: 0
total_tokens: 22
status:
code: 200
message: OK
version: 1
42 changes: 42 additions & 0 deletions tests/test_temporal.py
Original file line number Diff line number Diff line change
Expand Up @@ -1249,6 +1249,48 @@ async def test_temporal_agent_run_sync_in_workflow(allow_model_requests: None, c
)


def drop_first_message(msgs: list[ModelMessage]) -> list[ModelMessage]:
return msgs[1:] if len(msgs) > 1 else msgs


agent_with_sync_history_processor = Agent(
model, name='agent_with_sync_history_processor', history_processors=[drop_first_message]
)
temporal_agent_with_sync_history_processor = TemporalAgent(
agent_with_sync_history_processor, activity_config=BASE_ACTIVITY_CONFIG
)


@workflow.defn
class AgentWithSyncHistoryProcessorWorkflow:
@workflow.run
async def run(self, prompt: str) -> str:
result = await temporal_agent_with_sync_history_processor.run(prompt)
return result.output


async def test_temporal_agent_with_sync_history_processor(allow_model_requests: None, client: Client):
"""Test that sync history processors work inside Temporal workflows.

This validates that the _prefer_blocking_execution ContextVar is properly set
by TemporalAgent._temporal_overrides(), allowing sync history processors to
execute without triggering NotImplementedError from anyio.to_thread.run_sync.
"""
async with Worker(
client,
task_queue=TASK_QUEUE,
workflows=[AgentWithSyncHistoryProcessorWorkflow],
plugins=[AgentPlugin(temporal_agent_with_sync_history_processor)],
):
output = await client.execute_workflow(
AgentWithSyncHistoryProcessorWorkflow.run,
args=['What is the capital of Mexico?'],
id=AgentWithSyncHistoryProcessorWorkflow.__name__,
task_queue=TASK_QUEUE,
)
assert output == snapshot('The capital of Mexico is Mexico City.')


@workflow.defn
class SimpleAgentWorkflowWithRunStream:
@workflow.run
Expand Down
22 changes: 22 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,28 @@ async def test_run_in_executor_with_contextvars() -> None:
assert old_result != ctx_var.get()


async def test_run_in_executor_with_blocking_execution_enabled() -> None:
from pydantic_ai._utils import blocking_execution

calls: list[str] = []

def sync_func() -> str:
calls.append('called')
return 'result'

# Without blocking mode, should use threading
result = await run_in_executor(sync_func)
assert result == 'result'
assert calls == ['called']

# With blocking mode enabled, should execute directly
calls.clear()
with blocking_execution():
result = await run_in_executor(sync_func)
assert result == 'result'
assert calls == ['called']


def test_is_async_callable():
def sync_func(): ... # pragma: no branch

Expand Down