Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): add structured trace #271

Merged
merged 2 commits into from
Jan 5, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions optd-core/src/cascades/memo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ pub trait Memo<T: NodeType>: 'static + Send + Sync {
/// The group id is volatile, depending on whether the groups are merged.
fn get_group_id(&self, expr_id: ExprId) -> GroupId;

/// Reduce the group ID to the merged group ID.
fn reduce_group(&self, group_id: GroupId) -> GroupId;

/// Get the memoized representation of a node.
fn get_expr_memoed(&self, expr_id: ExprId) -> ArcMemoPlanNode<T>;

Expand Down Expand Up @@ -315,6 +318,10 @@ impl<T: NodeType> Memo<T> for NaiveMemo<T> {
fn estimated_plan_space(&self) -> usize {
self.expr_id_to_expr_node.len()
}

fn reduce_group(&self, group_id: GroupId) -> GroupId {
self.merged_group_mapping[&group_id]
}
}

impl<T: NodeType> NaiveMemo<T> {
Expand Down Expand Up @@ -396,10 +403,6 @@ impl<T: NodeType> NaiveMemo<T> {
}
}

fn reduce_group(&self, group_id: GroupId) -> GroupId {
self.merged_group_mapping[&group_id]
}

fn merge_group_inner(&mut self, merge_into: GroupId, merge_from: GroupId) {
if merge_into == merge_from {
return;
Expand Down
124 changes: 106 additions & 18 deletions optd-core/src/cascades/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ use std::pin::Pin;
use std::sync::Arc;

use anyhow::Result;
use itertools::Itertools;
use tracing::trace;

use super::memo::{ArcMemoPlanNode, GroupInfo, Memo};
use super::memo::{ArcMemoPlanNode, GroupInfo, Memo, WinnerInfo};
use super::NaiveMemo;
use crate::cascades::memo::Winner;
use crate::cascades::tasks2::{TaskContext, TaskDesc};
Expand All @@ -38,16 +39,92 @@ pub struct OptimizerContext {

#[derive(Default, Clone, Debug)]
pub struct OptimizerProperties {
/// Panic the optimizer if the budget is reached, used in planner tests.
pub panic_on_budget: bool,
/// If the number of rules applied exceeds this number, we stop applying logical rules.
pub partial_explore_iter: Option<usize>,
/// Plan space can be expanded by this number of times before we stop applying logical rules.
pub partial_explore_space: Option<usize>,
/// Disable pruning during optimization.
pub disable_pruning: bool,
/// Enable tracing during optimization.
pub enable_tracing: bool,
}

#[derive(Debug, Default)]
#[derive(Clone)]
pub enum OptimizerTrace {
/// A winner decision is made
DecideWinner {
/// The stage and step number when a trace is recorded
stage: usize,
step: usize,
/// The group ID when a trace is recorded
group_id: GroupId,
/// The proposed winner
proposed_winner_info: WinnerInfo,
/// The winner of the children
children_winner: Vec<ExprId>,
},
/// The group is created by applying a rule
ApplyRule {
/// The step number when a trace is recorded
stage: usize,
step: usize,
/// The group ID when a trace is recorded
group_id: GroupId,
/// The expression being applied
applied_expr_id: ExprId,
/// The expression being produced
produced_expr_id: ExprId,
/// The rule ID
rule_id: usize,
},
}

impl OptimizerTrace {
pub fn stage_step(&self) -> (usize, usize) {
match self {
OptimizerTrace::DecideWinner { stage, step, .. } => (*stage, *step),
OptimizerTrace::ApplyRule { stage, step, .. } => (*stage, *step),
}
}
}

impl std::fmt::Display for OptimizerTrace {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
OptimizerTrace::DecideWinner {
stage,
step,
group_id,
proposed_winner_info,
children_winner,
} => {
write!(
f,
"step={}/{} decide_winner group_id={} proposed_winner_expr={} children_winner_exprs=[{}] total_weighted_cost={}",
stage, step, group_id, proposed_winner_info.expr_id, children_winner.iter().join(","), proposed_winner_info.total_weighted_cost
)
}
OptimizerTrace::ApplyRule {
stage,
step,
group_id,
applied_expr_id,
produced_expr_id,
rule_id,
} => {
write!(
f,
"step={}/{} apply_rule group_id={} applied_expr_id={} produced_expr_id={} rule_id={}",
stage, step, group_id, applied_expr_id, produced_expr_id, rule_id
)
}
}
}
}

#[derive(Default)]
pub struct CascadesStats {
pub rule_match_count: HashMap<usize, usize>,
pub rule_total_bindings: HashMap<usize, usize>,
Expand All @@ -56,6 +133,7 @@ pub struct CascadesStats {
pub optimize_expr_count: usize,
pub apply_rule_count: usize,
pub optimize_input_count: usize,
pub trace: HashMap<GroupId, Vec<OptimizerTrace>>,
}

pub struct CascadesOptimizer<T: NodeType, M: Memo<T> = NaiveMemo<T>> {
Expand All @@ -70,6 +148,7 @@ pub struct CascadesOptimizer<T: NodeType, M: Memo<T> = NaiveMemo<T>> {
logical_property_builders: Arc<[Box<dyn LogicalPropertyBuilderAny<T>>]>,
pub ctx: OptimizerContext,
pub prop: OptimizerProperties,
stage: usize,
}

/// `RelNode` only contains the representation of the plan nodes. Sometimes, we need more context,
Expand Down Expand Up @@ -137,6 +216,7 @@ impl<T: NodeType> CascadesOptimizer<T, NaiveMemo<T>> {
prop,
stats: CascadesStats::default(),
disabled_rules: HashSet::new(),
stage: 0,
}
}

Expand All @@ -163,14 +243,6 @@ impl<T: NodeType> CascadesOptimizer<T, NaiveMemo<T>> {
}

impl<T: NodeType, M: Memo<T>> CascadesOptimizer<T, M> {
pub fn panic_on_explore_limit(&mut self, enabled: bool) {
self.prop.panic_on_budget = enabled;
}

pub fn disable_pruning(&mut self, enabled: bool) {
self.prop.disable_pruning = enabled;
}

pub fn cost(&self) -> Arc<dyn CostModel<T, M>> {
self.cost.clone()
}
Expand Down Expand Up @@ -217,7 +289,7 @@ impl<T: NodeType, M: Memo<T>> CascadesOptimizer<T, M> {
self.disabled_rules.contains(&rule_id)
}

pub fn dump(&self) {
pub fn dump(&self, mut f: impl std::fmt::Write) -> std::fmt::Result {
for group_id in self.memo.get_all_group_ids() {
let winner_str = match &self.memo.get_group_info(group_id).winner {
Winner::Impossible => "winner=<impossible>".to_string(),
Expand All @@ -234,28 +306,41 @@ impl<T: NodeType, M: Memo<T>> CascadesOptimizer<T, M> {
)
}
};
println!("group_id={} {}", group_id, winner_str);
writeln!(f, "group_id={} {}", group_id, winner_str)?;
let group = self.memo.get_group(group_id);
for (id, property) in self.logical_property_builders.iter().enumerate() {
println!(
writeln!(
f,
" {}={}",
property.property_name(),
group.properties[id].as_ref()
)
)?;
}
let mut all_predicates = BTreeSet::new();
for expr_id in self.memo.get_all_exprs_in_group(group_id) {
let memo_node = self.memo.get_expr_memoed(expr_id);
for pred in &memo_node.predicates {
all_predicates.insert(*pred);
}
println!(" expr_id={} | {}", expr_id, memo_node);
writeln!(f, " expr_id={} | {}", expr_id, memo_node)?;
}
for pred in all_predicates {
println!(" {}={}", pred, self.memo.get_pred(pred));
writeln!(f, " {}={}", pred, self.memo.get_pred(pred))?;
}
let mut traces = Vec::new();
for (that_group_id, trace) in &self.stats.trace {
if self.memo.reduce_group(*that_group_id) == group_id {
traces.extend(trace.iter());
}
}
traces.sort_by_key(|x| x.stage_step());
for t in traces {
writeln!(f, " {}", t)?;
}
}
Ok(())
}

/// Optimize a `RelNode`.
pub fn step_optimize_rel(&mut self, root_rel: ArcPlanNode<T>) -> Result<GroupId> {
trace!(event = "step_optimize_rel", rel = %root_rel);
Expand Down Expand Up @@ -287,15 +372,18 @@ impl<T: NodeType, M: Memo<T>> CascadesOptimizer<T, M> {
}
});
if res.is_err() && cfg!(debug_assertions) {
self.dump();
let mut buf = String::new();
self.dump(&mut buf).unwrap();
eprintln!("{}", buf);
}
res
}

pub fn fire_optimize_tasks(&mut self, group_id: GroupId) -> Result<()> {
use pollster::FutureExt as _;
trace!(event = "fire_optimize_tasks", root_group_id = %group_id);
let mut task = TaskContext::new(self);
self.stage += 1;
let mut task = TaskContext::new(self, self.stage);
// 32MB stack for the optimization process, TODO: reduce memory footprint
stacker::grow(32 * 1024 * 1024, || {
let fut: Pin<Box<dyn Future<Output = ()>>> = Box::pin(task.fire_optimize(group_id));
Expand Down
Loading
Loading