-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Fix sync history processors in Temporal workflows using ContextVar #3704
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…oralAgent context management
Docs Preview
|
| with super().override(model=self._model, toolsets=self._toolsets, tools=[]): | ||
| token = self._temporal_overrides_active.set(True) | ||
| temporal_active_token = self._temporal_overrides_active.set(True) | ||
| blocking_token = _prefer_blocking_execution.set(True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make this a contextmanager exposed by the utils module, so that we don't deal with the contextvar directly here. See the current_run_context feature in #3537
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then we can add that contextmanager to the with statement above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tests/test_temporal.py
Outdated
| return messages[1:] if len(messages) > 1 else messages | ||
|
|
||
|
|
||
| agent_with_sync_history_processor = Agent( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this to just ahead of the workflow that uses it, to keep the test file consistently ordered
tests/test_temporal.py
Outdated
| simple_temporal_agent = TemporalAgent(simple_agent, activity_config=BASE_ACTIVITY_CONFIG) | ||
|
|
||
|
|
||
| def drop_first_message_sync(messages: list[ModelMessage]) -> list[ModelMessage]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make this an inline lambda as it's very simple
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pyright always complains about lambdas, and the length causes a line break, so I'd prefer to keep the extra 1-2 lines with proper types and a nice name
tests/test_utils.py
Outdated
|
|
||
|
|
||
| async def test_run_in_executor_with_blocking_execution_enabled() -> None: | ||
| from pydantic_ai._utils import _prefer_blocking_execution # pyright: ignore[reportPrivateUsage] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above; let's not use the private var, but a new public contextmanager
Problem
Sync history processors fail inside Temporal workflows because
anyio.to_thread.run_synctries to create threads, which Temporal's sandboxed event loop doesn't support. This causesNotImplementedError.Solution
Use a ContextVar to control when
run_in_executor()should execute sync functions directly (blocking) vs using threading. This follows the approach discussed in the issue comments and approved by @DouweM.Changes
_utils.py: Added_prefer_blocking_executionContextVar, removed Temporal detection codetemporal/_agent.py: Set ContextVar in_temporal_overrides()context managertest_utils.py: Updated test to use ContextVar instead of mocking Temporal internalstest_temporal.py: Added integration test for sync history processors in workflowsTesting