[refactor](recursive-cte) Replace in-place PFC reset with full recreation between recursion rounds#60812
[refactor](recursive-cte) Replace in-place PFC reset with full recreation between recursion rounds#60812BiteTheDDDDt wants to merge 3 commits intoapache:masterfrom
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
5af2fd9 to
80f5566
Compare
|
run buildall |
There was a problem hiding this comment.
Pull request overview
This PR refactors the recursive CTE (Common Table Expression) mechanism by replacing the 5-stage in-place reset protocol with a 4-stage full recreation protocol. Instead of selectively resetting PipelineFragmentContext (PFC) members between recursive iterations, the old PFC is now fully destroyed and a new one is constructed from saved TPipelineFragmentParams.
Changes:
- Replaced 5-stage protocol (wait/release/rebuild/submit/close) with 4-stage protocol (wait_for_close/wait_for_destroy/recreate_and_submit/final_close)
- Introduced sentinel mechanism using shared_ptr with custom deleter to safely wait for external threads before destroying PFC
- Removed in-place reset methods (
RuntimeState::reset_to_rerun(),PipelineFragmentContext::rebuild(),PipelineFragmentContext::set_to_rerun()) and ref/unref counting
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| gensrc/proto/internal_service.proto | Updated PRerunFragmentParams enum to reflect new 4-stage protocol (breaking protocol change) |
| be/src/vec/exec/scan/scanner_context.cpp | Removed manual ref/unref counting, now relies on sentinel mechanism |
| be/src/runtime/task_execution_context.h | Added sentinel mechanism infrastructure (RerunWaitContext, init_sentinel, wait_for_sentinel_destruction) |
| be/src/runtime/task_execution_context.cpp | Implemented sentinel mechanism with shared_ptr custom deleter |
| be/src/runtime/runtime_state.h | Removed reset_to_rerun method declaration |
| be/src/runtime/runtime_state.cpp | Removed reset_to_rerun implementation, moved global runtime filter cleanup to destructor |
| be/src/runtime/query_context.cpp | Changed insert to insert_or_assign to support PFC replacement during recreation |
| be/src/runtime/fragment_mgr.h | Added RerunableFragmentInfo struct and _rerunnable_params_map for saving fragment params |
| be/src/runtime/fragment_mgr.cpp | Refactored rerun_fragment to implement 4-stage protocol with full PFC recreation |
| be/src/pipeline/pipeline_fragment_context.h | Removed rebuild/set_to_rerun methods, made release_resource public |
| be/src/pipeline/pipeline_fragment_context.cpp | Moved _runtime_state cleanup into release_resource, added sentinel initialization in constructor |
| be/src/pipeline/exec/rec_cte_source_operator.h | Updated to use new 4-stage protocol opcodes |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
gensrc/proto/internal_service.proto
Outdated
| wait_for_close = 1; // wait for PFC close | ||
| wait_for_destroy = 2; // wait for external thread finished and destroy PFC | ||
| recreate_and_submit = 3; // recreate PFC from saved params + submit | ||
| final_close = 4; // close fragment (final round) |
There was a problem hiding this comment.
This is a breaking protocol change. The enum values have been renumbered (wait=1 is now wait_for_close=1, but they have completely different semantics). This means BE nodes running the new code cannot communicate with BE nodes running the old code for recursive CTE operations. During a rolling upgrade, if the FE sends rerun_fragment requests to a mix of old and new BE nodes, the old BE nodes will misinterpret the commands and execute incorrect operations.
Consider either:
- Adding new enum values (6, 7, 8, 9) for the new protocol while keeping old values deprecated
- Adding version checking to ensure all nodes are upgraded before enabling this feature
- Documenting that this requires a coordinated upgrade of all BE nodes
| wait_for_close = 1; // wait for PFC close | |
| wait_for_destroy = 2; // wait for external thread finished and destroy PFC | |
| recreate_and_submit = 3; // recreate PFC from saved params + submit | |
| final_close = 4; // close fragment (final round) | |
| // Legacy opcodes (old protocol). These numeric values are kept for | |
| // backward compatibility and must not be reused with new semantics. | |
| // New code should avoid using these and prefer the new opcodes below. | |
| wait = 1; | |
| wait_for_destroy_legacy = 2; | |
| recreate_legacy = 3; | |
| final_close_legacy = 4; | |
| // New opcodes (new protocol). These use distinct numeric values to | |
| // avoid collisions with the legacy protocol during rolling upgrades. | |
| wait_for_close = 6; // wait for PFC close | |
| wait_for_destroy = 7; // wait for external thread finished and destroy PFC | |
| recreate_and_submit = 8; // recreate PFC from saved params + submit | |
| final_close = 9; // close fragment (final round) |
|
run buildall |
|
run buildall |
1 similar comment
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
FE UT Coverage ReportIncrement line coverage `` 🎉 |
e85147c to
8424a89
Compare
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
FE UT Coverage ReportIncrement line coverage `` 🎉 |
|
run buildall |
FE UT Coverage ReportIncrement line coverage `` 🎉 |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
|
run buildall |
fix update update Revert "update" This reverts commit 11db789. update fix update update update
34eed2e to
12f5a12
Compare
|
run buildall |
FE UT Coverage ReportIncrement line coverage `` 🎉 |
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
FE UT Coverage ReportIncrement line coverage `` 🎉 |
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
|
run buildall |
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
What problem does this PR solve?
This pull request introduces significant improvements to the management of recursive CTE (Common Table Expression) fragments in the pipeline execution engine. The changes focus on making recursive fragment reruns safer and more robust, improving resource cleanup, and clarifying the lifecycle of fragment contexts. The main areas of change are fragment rerun logic, resource and memory management, and runtime filter handling.
Recursive CTE fragment rerun and lifecycle management:
wait_for_close,wait_for_destroy,recreate_and_submit, andfinal_close) to ensure safe destruction and recreation of pipeline fragment contexts (PipelineFragmentContext). This prevents resource leaks and race conditions during recursive execution. [1] [2]RerunableFragmentInfostruct and a_rerunnable_params_mapinFragmentMgrto track parameters and callbacks needed to safely recreate fragments between recursion rounds. This map is properly cleaned up to avoid memory leaks. [1] [2] [3] [4]Resource and memory management improvements:
QueryContext::set_pipeline_contextmethod to useinsert_or_assignso that fragment contexts can be safely overwritten between recursion rounds.Runtime filter and task context cleanup:
reset_to_rerunlogic with explicitderegister_runtime_filtermethods in bothPipelineFragmentContextandRuntimeState, ensuring runtime filters are properly removed during rerun and destruction. [1] [2] [3]RerunWaitContextwith a sentinel mechanism to reliably wait for context destruction.Other improvements:
These changes collectively make recursive pipeline execution safer, more predictable, and easier to debug, especially in complex query scenarios involving recursive CTEs.
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)