fix(prefect-dbt): pre-compute cache keys for isolated node selections#20898
fix(prefect-dbt): pre-compute cache keys for isolated node selections#20898desertaxle wants to merge 6 commits intomainfrom
Conversation
When running `PrefectDbtOrchestrator` in PER_NODE mode with a `select=` filter, upstream dependencies outside the selection had no cache keys, silently disabling caching for the entire subgraph. Pre-compute cache keys for all executable nodes in topological order at the start of `_execute_per_node()`. Selective runs use a salted namespace (`:unexecuted`) so they never poison full-build caches. A persistent execution-state file tracks each node's last-executed key to detect upstream warehouse rebuilds between selective runs. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The execution state was recording the unsalted precomputed key for selectively-executed nodes, even when the node ran with salted upstream keys. This allowed downstream nodes in subsequent selective runs to incorrectly match the execution state and reuse stale cached results. Store computed_cache_keys[node_id] (the actual key used during execution) instead of precomputed_cache_keys[node_id]. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ted-nodes # Conflicts: # src/integrations/prefect-dbt/prefect_dbt/core/_orchestrator.py
- Extract _make_precompute_orch helper to remove 9 duplicate orchestrator construction blocks in TestPrecomputeAllCacheKeys - Move `from dataclasses import replace` and `from unittest.mock import patch` to module-level imports (were repeated as local imports in 7+ test methods each) - Remove stale blank lines left by the import hoisting - Replace elif/pass branch with trailing comment in _precompute_all_cache_keys (simpler control flow) - Use _ for unused project_dir captures in two tests Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: a3f1ff03ed
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
…istence Execution state was only persisted when cache_key_storage was a local path (str/Path), silently degrading to always-salted keys when using block-backed WritableFileSystem storage. This meant selective runs after upstream rebuilds could never switch to unsalted cache keys, causing unnecessary cache misses. Use WritableFileSystem.read_path/write_path (via run_coro_as_sync) to persist execution state for non-local storage backends. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: deb35abd9f
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if node_id in computed_cache_keys: | ||
| execution_state[node_id] = computed_cache_keys[node_id] |
There was a problem hiding this comment.
Clear execution state when a node execution fails
This block only updates execution_state on fresh success, but the failure paths below never remove an existing entry for the same node. If a node had a prior successful key and then later fails after mutating warehouse data (for example, a non-atomic incremental/materialization failure), downstream selective runs can still treat that upstream as "current" (state == precomputed) and reuse unsalted cached descendants that were computed against different warehouse contents. Consider deleting execution_state[node_id] in the error handlers (or _mark_failed) to keep failures from preserving stale trust state.
Useful? React with 👍 / 👎.
If a node had prior successful execution state and then failed after partially mutating warehouse data, its stale state entry could cause downstream selective runs to treat it as "current" and reuse cached results built against pre-failure data. Pop the node's entry from execution_state in both error handlers so failures never preserve stale trust state. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: d9df27afbc
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| from prefect.utilities.asyncutils import run_coro_as_sync | ||
|
|
||
| data = run_coro_as_sync(ks.read_path(self._EXECUTION_STATE_KEY)) | ||
| return _json.loads(data) |
There was a problem hiding this comment.
Persist execution state without assuming async filesystem APIs
In sync flow runs, many WritableFileSystem blocks (e.g. LocalFileSystem) dispatch read_path/write_path to synchronous implementations, so ks.read_path(...) and ks.write_path(...) return bytes/str immediately; wrapping those return values in run_coro_as_sync(...) raises TypeError (it expects a coroutine), and this code then silently swallows the exception, so execution state is never loaded/saved for block-backed cache storage. That reverts selective-run safety for those users (state is always empty), allowing stale downstream cache reuse after upstream-only rebuilds in the same scenarios this change is trying to prevent.
Useful? React with 👍 / 👎.
Summary
PrefectDbtOrchestratorin PER_NODE mode with aselect=filter, upstream dependencies outside the selection had no cache keys, silently disabling caching for the entire subgraph_execute_per_node(), so downstream nodes always have upstream keys available:unexecuted) to prevent cross-contamination with full-build caches, and a persistent execution-state file detects upstream warehouse rebuilds between selective runs🤖 Generated with Claude Code