From a2ed701a52078bb471466c0af939e5b0ce877600 Mon Sep 17 00:00:00 2001
From: Ash Kunda <18058966+akundaz@users.noreply.github.com>
Date: Thu, 8 Jan 2026 23:17:07 -0500
Subject: [PATCH 1/6] fix: remove unnecessary wakes
---
src/pipelines/exec/mod.rs | 132 ++++++++++++++++++--------------------
1 file changed, 64 insertions(+), 68 deletions(-)
diff --git a/src/pipelines/exec/mod.rs b/src/pipelines/exec/mod.rs
index 7f1f5d7..6297686 100644
--- a/src/pipelines/exec/mod.rs
+++ b/src/pipelines/exec/mod.rs
@@ -280,69 +280,63 @@ where
// The executor has not run any steps yet, it is invoking the `before_job`
// method of each step in the pipeline.
- if let Cursor::Initializing(ref mut future) = executor.cursor {
- if let Poll::Ready(output) = future.as_mut().poll_unpin(cx) {
- match output {
- Ok(checkpoint) => {
- trace!("{} initialized successfully", executor.pipeline);
- executor.cursor = executor.first_step(checkpoint);
- }
- Err(error) => {
- trace!(
- "{} initialization failed with error: {error:?}",
- executor.pipeline
- );
- // If the initialization failed, we immediately finalize the
- // pipeline with the error that occurred during initialization
- // and not attempt to run any steps.
- executor.cursor =
- Cursor::Finalizing(executor.finalize(Err(error.into())));
- }
+ if let Cursor::Initializing(ref mut future) = executor.cursor
+ && let Poll::Ready(output) = future.as_mut().poll_unpin(cx)
+ {
+ match output {
+ Ok(checkpoint) => {
+ trace!("{} initialized successfully", executor.pipeline);
+ executor.cursor = executor.first_step(checkpoint);
+ }
+ Err(error) => {
+ trace!(
+ "{} initialization failed with error: {error:?}",
+ executor.pipeline
+ );
+ // If the initialization failed, we immediately finalize the
+ // pipeline with the error that occurred during initialization
+ // and not attempt to run any steps.
+ executor.cursor =
+ Cursor::Finalizing(executor.finalize(Err(error.into())));
}
- trace!("{} initializing completed", executor.pipeline);
}
-
- // tell the async runtime to poll again because we are still initializing
- cx.waker().wake_by_ref();
+ trace!("{} initializing completed", executor.pipeline);
}
// the pipeline has completed executing all steps or encountered and error.
// Now we are running the `after_job` of each step in the pipeline.
- if let Cursor::Finalizing(ref mut future) = executor.cursor {
- if let Poll::Ready(output) = future.as_mut().poll_unpin(cx) {
- trace!("{} completed with output: {output:#?}", executor.pipeline);
-
- // Execution of this pipeline has completed, This resolves the
- // executor future with the final output of the pipeline. Also
- // emit an appropriate system event and record metrics.
-
- let payload_id = executor.block.payload_id();
- let events_bus = &executor.pipeline.events;
- let metrics = executor.service.metrics();
-
- match &output {
- Ok(built_payload) => {
- events_bus.publish(PayloadJobCompleted::
{
- payload_id,
- built_payload: built_payload.clone(),
- });
- metrics.jobs_completed.increment(1);
- metrics.record_payload::
(built_payload, &executor.block);
- }
- Err(error) => {
- events_bus.publish(PayloadJobFailed {
- payload_id,
- error: error.clone(),
- });
- metrics.jobs_failed.increment(1);
- }
+ if let Cursor::Finalizing(ref mut future) = executor.cursor
+ && let Poll::Ready(output) = future.as_mut().poll_unpin(cx)
+ {
+ trace!("{} completed with output: {output:#?}", executor.pipeline);
+
+ // Execution of this pipeline has completed, This resolves the
+ // executor future with the final output of the pipeline. Also
+ // emit an appropriate system event and record metrics.
+
+ let payload_id = executor.block.payload_id();
+ let events_bus = &executor.pipeline.events;
+ let metrics = executor.service.metrics();
+
+ match &output {
+ Ok(built_payload) => {
+ events_bus.publish(PayloadJobCompleted::
{
+ payload_id,
+ built_payload: built_payload.clone(),
+ });
+ metrics.jobs_completed.increment(1);
+ metrics.record_payload::
(built_payload, &executor.block);
+ }
+ Err(error) => {
+ events_bus.publish(PayloadJobFailed {
+ payload_id,
+ error: error.clone(),
+ });
+ metrics.jobs_failed.increment(1);
}
-
- return Poll::Ready(output);
}
- // tell the async runtime to poll again because we are still finalizing
- cx.waker().wake_by_ref();
+ return Poll::Ready(output);
}
if matches!(executor.cursor, Cursor::BeforeStep(_, _)) {
@@ -359,23 +353,25 @@ where
trace!("{} will execute step {path}", executor.pipeline);
let future = executor.execute_step(&path, input);
executor.cursor = Cursor::StepInProgress(path, future);
- cx.waker().wake_by_ref(); // tell the async runtime to poll again
- }
- if let Cursor::StepInProgress(ref path, ref mut future) = executor.cursor {
- // If the cursor is in the StepInProgress state, we to poll the
- // future instance of that step to see if it has completed.
- if let Poll::Ready(output) = future.as_mut().poll_unpin(cx) {
- trace!(
- "{} step {path:?} completed with output: {output:#?}",
- executor.pipeline
- );
+ // This wake is necessary because we have a new future/at a state
+ // transition boundary and directly return Poll::Pending without polling
+ // it first, so there is no registered waker to drive progress without it
+ cx.waker().wake_by_ref();
+ }
- // step has completed, we can advance the cursor
- executor.cursor = executor.advance_cursor(path, output);
- }
+ // If the cursor is in the StepInProgress state, we to poll the
+ // future instance of that step to see if it has completed.
+ if let Cursor::StepInProgress(ref path, ref mut future) = executor.cursor
+ && let Poll::Ready(output) = future.as_mut().poll_unpin(cx)
+ {
+ trace!(
+ "{} step {path:?} completed with output: {output:#?}",
+ executor.pipeline
+ );
- cx.waker().wake_by_ref(); // tell the async runtime to poll again
+ // step has completed, we can advance the cursor
+ executor.cursor = executor.advance_cursor(path, output);
}
Poll::Pending
From 52279ae428eb1e97e91c25dad48ac3e66715f889 Mon Sep 17 00:00:00 2001
From: Ash Kunda <18058966+akundaz@users.noreply.github.com>
Date: Wed, 14 Jan 2026 12:28:31 -0500
Subject: [PATCH 2/6] run a loop for the PipelineExecutor future
---
src/pipelines/exec/mod.rs | 218 +++++++++++++++++++-------------------
1 file changed, 111 insertions(+), 107 deletions(-)
diff --git a/src/pipelines/exec/mod.rs b/src/pipelines/exec/mod.rs
index 6297686..e16cf83 100644
--- a/src/pipelines/exec/mod.rs
+++ b/src/pipelines/exec/mod.rs
@@ -278,108 +278,128 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll {
let executor = self.get_mut();
- // The executor has not run any steps yet, it is invoking the `before_job`
- // method of each step in the pipeline.
- if let Cursor::Initializing(ref mut future) = executor.cursor
- && let Poll::Ready(output) = future.as_mut().poll_unpin(cx)
- {
- match output {
- Ok(checkpoint) => {
- trace!("{} initialized successfully", executor.pipeline);
- executor.cursor = executor.first_step(checkpoint);
+ loop {
+ match executor.cursor {
+ // The executor has not run any steps yet, it is invoking the
+ // `before_job` method of each step in the pipeline.
+ Cursor::Initializing(ref mut future) => {
+ match future.as_mut().poll_unpin(cx) {
+ Poll::Ready(Ok(checkpoint)) => {
+ trace!("{} initialized successfully", executor.pipeline);
+ executor.cursor = executor.first_step(checkpoint);
+ }
+ Poll::Ready(Err(error)) => {
+ trace!(
+ "{} initialization failed with error: {error:?}",
+ executor.pipeline
+ );
+ // If the initialization failed, we immediately finalize the
+ // pipeline with the error that occurred during initialization
+ // and not attempt to run any steps.
+ executor.cursor =
+ Cursor::Finalizing(executor.finalize(Err(error.into())));
+ }
+ Poll::Pending => return Poll::Pending,
+ }
}
- Err(error) => {
- trace!(
- "{} initialization failed with error: {error:?}",
- executor.pipeline
- );
- // If the initialization failed, we immediately finalize the
- // pipeline with the error that occurred during initialization
- // and not attempt to run any steps.
- executor.cursor =
- Cursor::Finalizing(executor.finalize(Err(error.into())));
+ // The pipeline has completed executing all steps or encountered an
+ // error. Now we are running the `after_job` of each step.
+ Cursor::Finalizing(ref mut future) => match future
+ .as_mut()
+ .poll_unpin(cx)
+ {
+ Poll::Ready(output) => {
+ trace!("{} completed with output: {output:#?}", executor.pipeline);
+
+ // Execution of this pipeline has completed, This resolves the
+ // executor future with the final output of the pipeline. Also
+ // emit an appropriate system event and record metrics.
+
+ let payload_id = executor.block.payload_id();
+ let events_bus = &executor.pipeline.events;
+ let metrics = executor.service.metrics();
+
+ // Record metrics for the payload job
+ match &output {
+ Ok(built_payload) => {
+ events_bus.publish(PayloadJobCompleted:: {
+ payload_id,
+ built_payload: built_payload.clone(),
+ });
+ metrics.jobs_completed.increment(1);
+ metrics.record_payload::
(built_payload, &executor.block);
+ }
+ Err(error) => {
+ events_bus.publish(PayloadJobFailed {
+ payload_id,
+ error: error.clone(),
+ });
+ metrics.jobs_failed.increment(1);
+ }
+ }
+
+ return Poll::Ready(output);
+ }
+ Poll::Pending => return Poll::Pending,
+ },
+ // If the cursor is in the BeforeStep state, we need to run the next
+ // step of the pipeline. Steps are async futures, so we need to store
+ // their instance while they are running and being polled until
+ // resolved.
+ Cursor::BeforeStep(_, _) => {
+ let Cursor::BeforeStep(path, input) =
+ std::mem::replace(&mut executor.cursor, Cursor::PreparingStep)
+ else {
+ unreachable!("bug in PipelineExecutor state machine");
+ };
+
+ trace!("{} will execute step {path}", executor.pipeline);
+ let future = executor.execute_step(&path, input);
+ executor.cursor = Cursor::StepInProgress(path, future);
}
- }
- trace!("{} initializing completed", executor.pipeline);
- }
-
- // the pipeline has completed executing all steps or encountered and error.
- // Now we are running the `after_job` of each step in the pipeline.
- if let Cursor::Finalizing(ref mut future) = executor.cursor
- && let Poll::Ready(output) = future.as_mut().poll_unpin(cx)
- {
- trace!("{} completed with output: {output:#?}", executor.pipeline);
-
- // Execution of this pipeline has completed, This resolves the
- // executor future with the final output of the pipeline. Also
- // emit an appropriate system event and record metrics.
-
- let payload_id = executor.block.payload_id();
- let events_bus = &executor.pipeline.events;
- let metrics = executor.service.metrics();
-
- match &output {
- Ok(built_payload) => {
- events_bus.publish(PayloadJobCompleted::
{
- payload_id,
- built_payload: built_payload.clone(),
- });
- metrics.jobs_completed.increment(1);
- metrics.record_payload::
(built_payload, &executor.block);
+ // If the cursor is in the StepInProgress state, poll the future
+ // instance of that step to see if it has completed.
+ Cursor::StepInProgress(ref path, ref mut future) => {
+ match future.as_mut().poll_unpin(cx) {
+ Poll::Ready(output) => {
+ trace!(
+ "{} step {path:?} completed with output: {output:#?}",
+ executor.pipeline
+ );
+
+ // step has completed, we can advance the cursor
+ executor.cursor = executor.advance_cursor(path, output);
+ }
+ Poll::Pending => return Poll::Pending,
+ }
}
- Err(error) => {
- events_bus.publish(PayloadJobFailed {
- payload_id,
- error: error.clone(),
- });
- metrics.jobs_failed.increment(1);
+ // Transient state that should never be observed here
+ Cursor::PreparingStep => {
+ unreachable!("bug in PipelineExecutor state machine")
}
}
-
- return Poll::Ready(output);
- }
-
- if matches!(executor.cursor, Cursor::BeforeStep(_, _)) {
- // If the cursor is in the BeforeStep state, we need to run the next
- // step of the pipeline. Steps are async futures, so we need to store
- // their instance while they are running and being polled until resolved.
-
- let Cursor::BeforeStep(path, input) =
- std::mem::replace(&mut executor.cursor, Cursor::PreparingStep)
- else {
- unreachable!("bug in PipelineExecutor state machine");
- };
-
- trace!("{} will execute step {path}", executor.pipeline);
- let future = executor.execute_step(&path, input);
- executor.cursor = Cursor::StepInProgress(path, future);
-
- // This wake is necessary because we have a new future/at a state
- // transition boundary and directly return Poll::Pending without polling
- // it first, so there is no registered waker to drive progress without it
- cx.waker().wake_by_ref();
}
-
- // If the cursor is in the StepInProgress state, we to poll the
- // future instance of that step to see if it has completed.
- if let Cursor::StepInProgress(ref path, ref mut future) = executor.cursor
- && let Poll::Ready(output) = future.as_mut().poll_unpin(cx)
- {
- trace!(
- "{} step {path:?} completed with output: {output:#?}",
- executor.pipeline
- );
-
- // step has completed, we can advance the cursor
- executor.cursor = executor.advance_cursor(path, output);
- }
-
- Poll::Pending
}
}
/// Keeps track of the current pipeline execution progress.
enum Cursor {
+ /// The pipeline is currently initializing all steps for a new payload job.
+ ///
+ /// This happens once before any step is executed, and it calls the
+ /// `before_job` method of each step in the pipeline.
+ Initializing(
+ Pin<
+ Box<
+ dyn Future