Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid blocking in the PartitionProcessorManager when being in the control loop #2179

Merged
merged 1 commit into from
Nov 7, 2024

Conversation

muhamadazmy
Copy link
Contributor

@muhamadazmy muhamadazmy commented Oct 28, 2024

Avoid blocking in the PartitionProcessorManager when being in the control loop

Summary:

  • Improve tracing of ControlProcessors commands
  • Run the spawn of partition processors concurrently
  • Avoid blocking partition processor manager main loop

While testing I noticed that InvokerService creations takes
around 25ms (on my machine) to be created with InvokerService::from_options

I traced the delay to a single line of code here

.with_native_roots()

My first thought was to cache the HttpsConnectorBuilder but this won't work because HttpsConnectorBuilder is not
clonable.

Fixes #2146


Stack created with Sapling. Best reviewed with ReviewStack.

Comment on lines 505 to 512
Some(control_processors) = self.incoming_update_processors.next() => {
let span = debug_span!(parent: None, "control_processors");
span.follows_from(Span::current());
if let Err(err) = self.on_control_processors(control_processors).instrument(span).await {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is so each control_processors message is handled in it's own span instead of having it a child of the processor loop which is really hard to view on jaeger

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's capture this reasoning in a comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand the problem here. When I remove the extra span, I obtain the following trace which looks ok to me.

localhost_16686_trace_f29b8571ef846f5254b6974f1a875652 (1)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is without the extra span all on_control_processors spans will be a child of the partition processor manager loop. hence the span for that loop. It means ALL the on_control_processors will show up in one trace as with your screenshot.

With that extra span. the on_control_processor becomes a root trace (that links to the partition processor manager run) Hence it's easier to sort them by time, find a single event that happens at a certain time, etc...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can see that your view above will just get longer with each ControlProcessor message received by this node forever.

@muhamadazmy
Copy link
Contributor Author

muhamadazmy commented Oct 28, 2024

You can see in the screenshot below that the actual spawning of the partition processors is running in parallel. Except that there is around a 24~25ms delay still happen in sequence. This delay is caused by the InvokerService::from_options!

After investigating why this is taking to long to create (it's not even an async function) it seem like the cause of the issue is caused by this line

.with_native_roots()

My first thought was to cache the HttpsConnectorBuilder but this won't work because HttpsConnectorBuilder is not
clonable hence what i did instead is to move the InvkerService creation to the async trask and use a JoinSet.

Screenshot From 2024-10-28 10-59-57

Final result after moving the InvokerService:from_options to the async task

Screenshot From 2024-10-28 13-02-38

@muhamadazmy muhamadazmy marked this pull request as ready for review October 28, 2024 13:09
Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for creating this PR @mohammedazmy. I think the parallelization helps improving the overall situation when spawning partition processors. For my environment it does not solve the underlying problem, though. Since we still await the completion of the parallel tasks and I suspect a lock contention somewhere in Tokio, it takes 9s to start 128 partition processors (admittedly not many people will start that many PPs but it illustrates the problem).

Comment on lines 505 to 512
Some(control_processors) = self.incoming_update_processors.next() => {
let span = debug_span!(parent: None, "control_processors");
span.follows_from(Span::current());
if let Err(err) = self.on_control_processors(control_processors).instrument(span).await {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's capture this reasoning in a comment.

Comment on lines 505 to 512
Some(control_processors) = self.incoming_update_processors.next() => {
let span = debug_span!(parent: None, "control_processors");
span.follows_from(Span::current());
if let Err(err) = self.on_control_processors(control_processors).instrument(span).await {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand the problem here. When I remove the extra span, I obtain the following trace which looks ok to me.

localhost_16686_trace_f29b8571ef846f5254b6974f1a875652 (1)

Comment on lines 640 to 645
while let Some(started) = tasks.join_next().await {
let state = started.unwrap()?;
self.invokers_status_reader
.push(state.key_range.clone(), state.status_reader.clone());
self.running_partition_processors
.insert(state.partition_id, state);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parallelizing the work speeds things up but it does not solve the underlying problem that we are blocking the PartitionProcessorManager from working on other messages. When I set the number of partitions to 128, then it takes roughly 9s for the on_control_processors call to complete on my machine. I suspect that there is some lock contention on the Tokio runtime when creating new runtimes + slow locks on Mac. On Linux the problem might be less severe because of faster futexes (maybe).

localhost_16686_trace_f29b8571ef846f5254b6974f1a875652

What I was initially wondering is whether we can restructure the code such that we don't have to await the completion of spawning the partition processors in on_control_processors but rather react to the completion in the top-most event loop or when operating on the partition processor again? I assume that this would boil down to a small state machine representing the state of the partition processor (spawning, running (leader or follower)).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you could measure the time it takes on your Linux machine when starting a few more partition processors.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we could incorporate waiting for the updated metadata version in the not-blocking the event loop part, then this would be kick ass :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @tillrohrmann for clearing up my misunderstanding. My impression was that the problem is only during startup because of a slow start of the partition processors. I will work again on it

@muhamadazmy
Copy link
Contributor Author

muhamadazmy commented Oct 29, 2024

@tillrohrmann I have restructured the code to solve the root issue as you suggested. Please take another look and give it a try locally. I already tested it multiple times to make sure this does not block. In practice I never experienced that long delay even with 128 partitions. But in all cases the PPM main loop seems to be responsive.

There still some operations that run on occur on the PPM main loop like run_for_leader but this is just a push to a channel and should not be blocking (unless the PP itself is blocked). Beside running promote to a leader or demote to a follower should respect order of the recipient of the control message, so maybe it's better to keep them like this.


for control_processor in control_processors.commands {
effects.send(ManagerEffect {
parent: Span::current(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that we should include the versions here to avoid race between multiple control processors messages, by filtering on the version before applying the effect

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can happen and we need to handle it.

Probably it is enough to only apply the latest of the ControlProcessors message because by the time we have received the updated metadata, the cluster controller could have made a different decision.

So what we could do is to remember the latest ControlProcessors message in an Option<ControlProcessors> field. Then we could have something like:

let mut partition_table_version_watcher = metadata.watch(MetadataKind::PartitionTable);
let mut logs_version_watcher = metadata.watch(MetadataKind::Logs);

tokio::select! {
    ...
    _ = partition_table_version_watcher.changed(), if self.control_processors.is_some() => {
        // check whether we can apply the control_processors message
    }
    _ = logs_version_watcher.changed(), if self.control_processors.is_some() => {
        // check whether we can apply the control_processors message
    }
    ...
}

in the main event loop to await the version update. At the same tiem we can update self.control_processors field with newer control instructions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a very good approach and is definitely better than tracking versions. Thank you!

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for creating this PR @muhamadazmy. I think this goes in a very promising direction :-) The one thing we probably need to handle is the case of concurrent ControlProcessors messages. I left a suggestion how we could tackle this problem.

You are right that we still have the problem of potentially blocking metadata updates when transitioning from follower to leader. Maybe we can measure how big of a problem this is and based on this decide whether we need to do something about it. One idea that came up recently is to let the cluster controller decide on the leader epoch. So with this change, the problem might be gone as well.

Comment on lines 669 to 856
self.start_partition_processor(
let starting = self.start_partition_processor(
partition_id,
partition_key_range,
partition_key_range.clone(),
RunMode::Leader,
)
.await?;
events,
);
let task_id = self.task_center.spawn_child(
TaskKind::Disposable,
"starting-partition-processor",
Some(partition_id),
starting.instrument(
debug_span!("starting_partition_processor", partition_id=%partition_id),
),
)?;

self.running_partition_processors
.insert(partition_id, ProcessorStatus::Starting(task_id));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This snippet looks quite similar to the one above. Maybe this can be deduplicated?

let networking = self.networking.clone();
let bifrost = self.bifrost.clone();
let node_id = self.metadata.my_node_id();
enum ProcessorStatus {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is ProcessorStatus and ProcessorState maybe a bit too similar? What about renaming ProcessorState into StartedProcessorState/Status, for example?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup definitely.

Comment on lines 474 to 485
let (event_tx, mut event_rx) = mpsc::unbounded_channel();
let (effect_tx, mut effect_rx) = mpsc::unbounded_channel();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Event and effect are quite generic. Are the more descriptive names for those channels, maybe?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that those channels don't have to be unbounded, right? Maybe we add a limit to avoid too excessive resource consumption.

Copy link
Contributor Author

@muhamadazmy muhamadazmy Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was worried about using bound channels in case (for some reason) there were lots of updates (and possibly lots of effects/evetns pushed on the channel) that can cause a dead lock (loop can't processes the queues because it's trying to push more objects to the queue which is already full)

So I thought it should be safer to use unbounded channel in that case to avoid this deadlock and all messages should get processed eventually even if for a temporary spike in memory usage.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are both being pushed to from the main event loop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, only the effects actually are pushed to from the main event loop (seems I am not fully back from vacation 😅). Events then can become a bound channel then.


for control_processor in control_processors.commands {
effects.send(ManagerEffect {
parent: Span::current(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can happen and we need to handle it.

Probably it is enough to only apply the latest of the ControlProcessors message because by the time we have received the updated metadata, the cluster controller could have made a different decision.

So what we could do is to remember the latest ControlProcessors message in an Option<ControlProcessors> field. Then we could have something like:

let mut partition_table_version_watcher = metadata.watch(MetadataKind::PartitionTable);
let mut logs_version_watcher = metadata.watch(MetadataKind::Logs);

tokio::select! {
    ...
    _ = partition_table_version_watcher.changed(), if self.control_processors.is_some() => {
        // check whether we can apply the control_processors message
    }
    _ = logs_version_watcher.changed(), if self.control_processors.is_some() => {
        // check whether we can apply the control_processors message
    }
    ...
}

in the main event loop to await the version update. At the same tiem we can update self.control_processors field with newer control instructions.

@muhamadazmy
Copy link
Contributor Author

Thank you @tillrohrmann for very helpful review. I applied all your comments and it's ready for you to give it another look

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating this PR @muhamadazmy. I think it looks really nice. I left a few comments with some suggestions and questions.

}
}

fn on_processor_mgr_event(&mut self, event: ManagerEvent) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we call this method maybe on_processor_started and convert ManagerEvent into ProcessorStarted{partition_id, state}?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only reason ManagerEvent is an enum was just to be future proof in case other type of events might arise in the future.

This is probably already needed #2147 since one of the way to fix this is to raise an event if the partition processor returned with an error that is then handled by removing the PP entry from the Map.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification.

effects: mpsc::UnboundedSender<ManagerEffect>,
) -> Result<(), ShutdownError> {
self.control_processors = None;
// version already satisfied so we can directly apply th effects
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// version already satisfied so we can directly apply th effects
// version already satisfied so we can directly apply the effects

Comment on lines 775 to 755
for control_processor in control_processors.commands {
effects
.send(ManagerEffect {
parent: Span::current(),
effect: ProcessorEffect::ControlProcessor {
control_processor,
partition_table: Arc::clone(&partition_table),
},
})
.map_err(|_| ShutdownError)?
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we call on_control_processor directly here w/o going through the effects channel? W/ your changes, I assume that on_control_processor will be really fast to complete.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's definitely possible to avoid the effects channel all together and make the call directly. I will change it

crates/worker/src/partition_processor_manager.rs Outdated Show resolved Hide resolved
crates/worker/src/partition_processor_manager.rs Outdated Show resolved Hide resolved
crates/worker/src/partition_processor_manager.rs Outdated Show resolved Hide resolved
crates/worker/src/partition_processor_manager.rs Outdated Show resolved Hide resolved
crates/worker/src/partition_processor_manager.rs Outdated Show resolved Hide resolved
crates/worker/src/partition_processor_manager.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating the PR so quickly @muhamadazmy. I had a last suggestion. Once this has been resolved, +1 for merging :-)

Comment on lines 660 to 758
let control_processors = control_processor.into_body();
let Some(control_processors) = self.pending_control_processors.take() else {
return Ok(());
};

// version already satisfied so we can directly apply the effects
if control_processors.min_logs_table_version <= self.metadata.logs_version()
&& control_processors.min_partition_table_version
<= self.metadata.partition_table_version()
{
let partition_table = self.metadata.partition_table_snapshot();

self.metadata
.wait_for_version(
MetadataKind::Logs,
control_processors.min_logs_table_version,
)
.await?;
let partition_table = self
.metadata
.wait_for_partition_table(control_processors.min_partition_table_version)
.await?
.into_arc();

for control_processor in control_processors.commands {
self.on_control_processor(control_processor, &partition_table)
.await?;
for control_processor in control_processors.commands {
self.on_control_processor(control_processor, &partition_table, events.clone())
.await?;
}
} else {
// we have to wait until version catches up
self.pending_control_processors = Some(control_processors);
}

Ok(())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if self
            .pending_control_processors
            .as_ref()
            .is_some_and(|control_processors| {
                control_processors.min_logs_table_version <= self.metadata.logs_version()
                    && control_processors.min_partition_table_version
                        <= self.metadata.partition_table_version()
            })
        {
            let control_processors = self
                .pending_control_processors
                .take()
                .expect("must be some");
            let partition_table = self.metadata.partition_table_snapshot();

            for control_processor in control_processors.commands {
                self.on_control_processor(control_processor, &partition_table, events.clone())
                    .await?;
            }
        }

        Ok(())

could be a tad bit simpler.

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating this PR @muhamadazmy. I tested your changes on my machine and they don't fully seem to solve the observed problem for me. I think on my env, spawning many concurrent start PP tasks leads to a starvation of the event loop thread. Running these tasks outside of the Tokio thread seems to solve the problem for me.

.await?;
}
}
// otherwise if it's still starting we can't control it yet
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a debug log statement that we are ignoring this command because the PP is still starting.

Comment on lines 806 to 813
let task_id = self.task_center.spawn_child(
TaskKind::Disposable,
"starting-partition-processor",
Some(partition_id),
starting.instrument(
debug_span!("starting_partition_processor", partition_id=%partition_id),
),
)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least on my machine where starting seems to contend on some Tokio lock when creating new runtimes, it is a problem running the starting task on a Tokio thread. The problem is if enough PPs are started concurrently, they will block all Tokio threads and therefore lead to starvation of the event loop thread. What seems to work on my machine is to run the starting future on a blocking thread (spawn_blocking).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's okay to use spawn_blocking instead. On my machine using spawn/spawn_child does not show any problem (for either startup time or on with tokio-console)

None,
async move {
partition_processor_rpc
.into_outgoing(Err(PartitionProcessorRpcError::Starting))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good!

Copy link
Contributor

@slinkydeveloper slinkydeveloper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looked at the changes for the ingress <-> pp communication, LGTM

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for getting rid of the blocking in the PartitionProcessorManager @muhamadazmy. LGTM. +1 for merging :-)

@tillrohrmann
Copy link
Contributor

Maybe wait for a second with merging. I am trying to figure out whether I ran into a problem which is a tad bit more pronounced with these changes.

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we cannot single out the problem with loading the certs, we could also limit the concurrency of how many partition processors are started concurrently. I've tried it here and it seems to work: tillrohrmann@d3d76a6.

let handle = self.task_center.spawn_blocking_unmanaged(
"starting-partition-processor",
Some(partition_id),
starting_task.run().instrument(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can limit the concurrency of the number of starting tasks using a Semaphore to work around the problem I encountered on my machine?

…trol loop

Summary:
- Improve tracing of ControlProcessors commands
- Run the spawn of partition processors concurrently
- Avoid blocking partition processor manager main loop

While testing I noticed that InvokerService creations takes
around 25ms (on my machine) to be created with `InvokerService::from_options`

I traced the delay to a single line of code here
https://github.com/restatedev/restate/blob/1cb01203f18bfc6ddb5fe4c45f6697799a79c27e/crates/service-client/src/http.rs#L73

My first thought was to cache the `HttpsConnectorBuilder` but this won't work because `HttpsConnectorBuilder` is not
clonable.

Fixes restatedev#2146
if let ProcessorStatus::Started(state) = status {
// if we error here, then the system is shutting down
if run_mode == RunMode::Follower {
state.step_down()?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wanna verify something here that came to my mind when reading the ingress <-> PP code.

With this PR do you make asynchronous the "stopping"/"become follower" process too? If that's the case, can we now incur in a situation where network messages are in-flight to the PP while the PP is stopping/becoming follower, thus ending up with those messages essentially stuck between channels?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I get your concerns right, then the answer is No. stepping_down or running_for_leader both runs on the main event loop, thus no rpc messages are processed. Only starting new PP runs asynchronously

That being said, both stepping down and running for leader happens by pushing a message to the PP handler channel. But that has been always the case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@slinkydeveloper if the PP becomes a follower, it should eventually process all messages in the rpc_rx. However, if we stop the PP (e.g. due to a failure or a planned stop), we currently don't ensure that the rpc_rx channel is drained. That is something we probably should do to prevent the situation you are describing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@muhamadazmy muhamadazmy merged commit c825714 into restatedev:main Nov 7, 2024
16 of 18 checks passed
@muhamadazmy muhamadazmy deleted the pr2179 branch November 7, 2024 13:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Avoid blocking in the PartitionProcessorManager when being in the control loop
3 participants