Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
408 changes: 408 additions & 0 deletions doc/developer/design/20251215_query_tracker.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def get_variable_system_parameters(
),
VariableSystemParameter(
"enable_frontend_peek_sequencing",
"false",
"true",
["true", "false"],
),
VariableSystemParameter(
Expand Down
32 changes: 26 additions & 6 deletions src/adapter/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ impl Client {
transient_id_gen,
optimizer_metrics,
persist_client,
statement_logging_frontend,
query_tracker,
} = response;

let peek_client = PeekClient::new(
Expand All @@ -272,6 +274,8 @@ impl Client {
transient_id_gen,
optimizer_metrics,
persist_client,
statement_logging_frontend,
query_tracker,
);

let mut client = SessionClient {
Expand Down Expand Up @@ -692,6 +696,9 @@ impl SessionClient {
/// Executes a previously-bound portal.
///
/// Note: the provided `cancel_future` must be cancel-safe as it's polled in a `select!` loop.
///
/// `outer_ctx_extra` is Some when we are executing as part of an outer statement, e.g., a FETCH
/// triggering the execution of the underlying query.
#[mz_ore::instrument(level = "debug")]
pub async fn execute(
&mut self,
Expand All @@ -704,11 +711,19 @@ impl SessionClient {
// Attempt peek sequencing in the session task.
// If unsupported, fall back to the Coordinator path.
// TODO(peek-seq): wire up cancel_future
if let Some(resp) = self.try_frontend_peek(&portal_name).await? {
let mut outer_ctx_extra = outer_ctx_extra;
if let Some(resp) = self
.try_frontend_peek(&portal_name, &mut outer_ctx_extra)
.await?
{
debug!("frontend peek succeeded");
// Frontend peek handled the execution and retired outer_ctx_extra if it existed.
// No additional work needed here.
return Ok((resp, execute_started));
} else {
debug!("frontend peek did not happen");
debug!("frontend peek did not happen, falling back to `Command::Execute`");
// If we bailed out, outer_ctx_extra is still present (if it was originally).
// `Command::Execute` will handle it.
}

let response = self
Expand Down Expand Up @@ -1020,7 +1035,8 @@ impl SessionClient {
| Command::StoreTransactionReadHolds { .. }
| Command::ExecuteSlowPathPeek { .. }
| Command::ExecuteCopyTo { .. }
| Command::ExecuteSideEffectingFunc { .. } => {}
| Command::ExecuteSideEffectingFunc { .. }
| Command::FrontendStatementLogging(..) => {}
};
cmd
});
Expand Down Expand Up @@ -1105,16 +1121,20 @@ impl SessionClient {

/// Attempt to sequence a peek from the session task.
///
/// Returns Some(response) if we handled the peek, or None to fall back to the Coordinator's
/// peek sequencing.
/// Returns `Ok(Some(response))` if we handled the peek, or `Ok(None)` to fall back to the
/// Coordinator's sequencing. If it returns an error, it should be returned to the user.
///
/// `outer_ctx_extra` is Some when we are executing as part of an outer statement, e.g., a FETCH
/// triggering the execution of the underlying query.
pub(crate) async fn try_frontend_peek(
&mut self,
portal_name: &str,
outer_ctx_extra: &mut Option<ExecuteContextExtra>,
) -> Result<Option<ExecuteResponse>, AdapterError> {
if self.enable_frontend_peek_sequencing {
let session = self.session.as_mut().expect("SessionClient invariant");
self.peek_client
.try_frontend_peek_inner(portal_name, session)
.try_frontend_peek(portal_name, session, outer_ctx_extra)
.await
} else {
Ok(None)
Expand Down
26 changes: 23 additions & 3 deletions src/adapter/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,13 @@ use crate::coord::consistency::CoordinatorInconsistencies;
use crate::coord::peek::{PeekDataflowPlan, PeekResponseUnary};
use crate::coord::timestamp_selection::TimestampDetermination;
use crate::error::AdapterError;
use crate::query_tracker;
use crate::session::{EndTransactionAction, RowBatchStream, Session};
use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
use crate::statement_logging::WatchSetCreation;
use crate::statement_logging::{
FrontendStatementLoggingEvent, StatementEndedExecutionReason, StatementExecutionStrategy,
StatementLoggingFrontend,
};
use crate::util::Transmittable;
use crate::webhook::AppendWebhookResponse;
use crate::{AdapterNotice, AppendWebhookError, ReadHolds};
Expand Down Expand Up @@ -210,6 +215,9 @@ pub enum Command {
conn_id: ConnectionId,
max_result_size: u64,
max_query_result_size: Option<u64>,
/// If statement logging is enabled, contains all info needed for installing watch sets
/// and logging the statement execution.
watch_set: Option<WatchSetCreation>,
tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
},

Expand All @@ -219,6 +227,9 @@ pub enum Command {
target_replica: Option<ReplicaId>,
source_ids: BTreeSet<GlobalId>,
conn_id: ConnectionId,
/// If statement logging is enabled, contains all info needed for installing watch sets
/// and logging the statement execution.
watch_set: Option<WatchSetCreation>,
tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
},

Expand All @@ -230,6 +241,10 @@ pub enum Command {
current_role: RoleId,
tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
},

/// Statement logging event from frontend peek sequencing.
/// No response channel needed - this is fire-and-forget.
FrontendStatementLogging(FrontendStatementLoggingEvent),
}

impl Command {
Expand Down Expand Up @@ -257,7 +272,8 @@ impl Command {
| Command::StoreTransactionReadHolds { .. }
| Command::ExecuteSlowPathPeek { .. }
| Command::ExecuteCopyTo { .. }
| Command::ExecuteSideEffectingFunc { .. } => None,
| Command::ExecuteSideEffectingFunc { .. }
| Command::FrontendStatementLogging(..) => None,
}
}

Expand Down Expand Up @@ -285,7 +301,8 @@ impl Command {
| Command::StoreTransactionReadHolds { .. }
| Command::ExecuteSlowPathPeek { .. }
| Command::ExecuteCopyTo { .. }
| Command::ExecuteSideEffectingFunc { .. } => None,
| Command::ExecuteSideEffectingFunc { .. }
| Command::FrontendStatementLogging(..) => None,
}
}
}
Expand Down Expand Up @@ -318,6 +335,9 @@ pub struct StartupResponse {
pub transient_id_gen: Arc<TransientIdGen>,
pub optimizer_metrics: OptimizerMetrics,
pub persist_client: PersistClient,
pub statement_logging_frontend: StatementLoggingFrontend,
#[derivative(Debug = "ignore")]
pub query_tracker: query_tracker::Handle,
}

/// The response to [`Client::authenticate`](crate::Client::authenticate).
Expand Down
Loading