Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
JonasKruckenberg committed Feb 3, 2025
2 parents 73a87d4 + bbac06a commit 85adad4
Show file tree
Hide file tree
Showing 14 changed files with 1,211 additions and 1,252 deletions.
9 changes: 5 additions & 4 deletions kernel/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,21 @@
mod queue;
mod scheduler;
mod task;
mod wake_list;
mod yield_now;

use crate::executor::task::JoinHandle;
use core::future::Future;
use rand::RngCore;
use sync::OnceLock;
pub use task::JoinHandle;

static EXECUTOR: OnceLock<Executor> = OnceLock::new();

pub struct Executor {
/// Handle to the scheduler used by this runtime
// If we ever want to support multiple runtimes, this should become an enum over the different
// variants. For now, we only support the multithreaded scheduler.
scheduler: scheduler::multi_thread::Handle,
scheduler: scheduler::Handle,
}

/// Get a reference to the current executor.
Expand All @@ -109,7 +110,7 @@ pub fn current() -> &'static Executor {
pub fn init(num_cores: usize, rng: &mut impl RngCore, shutdown_on_idle: bool) -> &'static Executor {
#[expect(tail_expr_drop_order, reason = "")]
EXECUTOR.get_or_init(|| Executor {
scheduler: scheduler::multi_thread::Handle::new(num_cores, rng, shutdown_on_idle),
scheduler: scheduler::Handle::new(num_cores, rng, shutdown_on_idle),
})
}

Expand All @@ -118,7 +119,7 @@ pub fn init(num_cores: usize, rng: &mut impl RngCore, shutdown_on_idle: bool) ->
/// This function will not return until the runtime is shut down.
#[inline]
pub fn run(rt: &'static Executor, hartid: usize, initial: impl FnOnce()) -> Result<(), ()> {
scheduler::multi_thread::worker::run(&rt.scheduler, hartid, initial)
scheduler::worker::run(&rt.scheduler, hartid, initial)
}

impl Executor {
Expand Down
File renamed without changes.
114 changes: 113 additions & 1 deletion kernel/src/executor/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,116 @@
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.

pub mod multi_thread;
mod idle;
pub mod worker;

use crate::executor::scheduler::idle::Idle;
use crate::executor::task::{JoinHandle, OwnedTasks, TaskRef};
use crate::executor::{queue, task};
use crate::hart_local::HartLocal;
use crate::util::condvar::Condvar;
use crate::util::fast_rand::FastRand;
use crate::util::parking_spot::ParkingSpot;
use alloc::boxed::Box;
use alloc::vec::Vec;
use core::future::Future;
use core::sync::atomic::{AtomicBool, Ordering};
use core::task::Waker;
use rand::RngCore;
use sync::Mutex;
pub struct Handle {
shared: worker::Shared,
}

impl Handle {
#[expect(tail_expr_drop_order, reason = "")]
pub fn new(num_cores: usize, rand: &mut impl RngCore, shutdown_on_idle: bool) -> Self {
let mut cores = Vec::with_capacity(num_cores);
let mut remotes = Vec::with_capacity(num_cores);

for i in 0..num_cores {
let (steal, run_queue) = queue::new();

cores.push(Box::new(worker::Core {
index: i,
run_queue,
lifo_slot: None,
is_searching: false,
rand: FastRand::new(rand.next_u64()),
}));
remotes.push(worker::Remote { steal });
}

let (idle, idle_synced) = Idle::new(cores);

let stub = TaskRef::new_stub();
let run_queue = mpsc_queue::MpscQueue::new_with_stub(stub);

Self {
shared: worker::Shared {
shutdown: AtomicBool::new(false),
remotes: remotes.into_boxed_slice(),
owned: OwnedTasks::new(),
synced: Mutex::new(worker::Synced {
assigned_cores: (0..num_cores).map(|_| None).collect(),
idle: idle_synced,
shutdown_cores: Vec::with_capacity(num_cores),
}),
run_queue,
idle,
condvars: (0..num_cores).map(|_| Condvar::new()).collect(),
parking_spot: ParkingSpot::default(),
per_hart: HartLocal::with_capacity(num_cores),
shutdown_on_idle,
},
}
}

pub fn spawn<F>(&'static self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let id = task::Id::next();
let (handle, maybe_task) = self.shared.owned.bind(future, self, id);

if let Some(task) = maybe_task {
self.shared.schedule_task(task, false);
}

handle
}

pub fn shutdown(&self) {
if !self.shared.shutdown.swap(true, Ordering::AcqRel) {
let mut synced = self.shared.synced.lock();

// Set the shutdown flag on all available cores
self.shared.idle.shutdown(&mut synced, &self.shared);

// Any unassigned cores need to be shutdown, but we have to first drop
// the lock
drop(synced);
self.shared.idle.shutdown_unassigned_cores(&self.shared);
}
}

#[inline]
pub(in crate::executor) fn defer(&self, waker: &Waker) {
self.shared.per_hart.get().unwrap().defer(waker);
}
}

impl task::Schedule for &'static Handle {
fn schedule(&self, task: TaskRef) {
self.shared.schedule_task(task, false);
}

fn release(&self, task: &TaskRef) -> Option<TaskRef> {
self.shared.owned.remove(task)
}

fn yield_now(&self, task: TaskRef) {
self.shared.schedule_task(task, true);
}
}
122 changes: 0 additions & 122 deletions kernel/src/executor/scheduler/multi_thread/mod.rs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.

//! A scheduler is initialized with a fixed number of workers. Each worker is
//! driven by a thread. Each worker has a "core" which contains data such as the
//! run queue and other state. When `block_in_place` is called, the worker's
//! "core" is handed off to a new thread allowing the scheduler to continue to
//! Scheduler worker implementation.
//!
//! A scheduler worker is a hart that is running the scheduling loop and which we therefore can
//! schedule work on. A scheduler is initialized with a fixed number of workers. Each worker has
//! a "core" which contains data such as the run queue and other state. When `block_in_place` is called,
//! the worker's "core" is handed off to a new thread allowing the scheduler to continue to
//! make progress while the originating thread blocks.
//!
//! # Shutdown
Expand Down Expand Up @@ -63,8 +65,8 @@
//! the global queue indefinitely. This would be a ref-count cycle and a memory
//! leak.
use super::{idle, Handle};
use crate::executor::queue::Overflow;
use crate::executor::scheduler::{idle, Handle};
use crate::executor::task::{OwnedTasks, TaskRef};
use crate::executor::{queue, task};
use crate::hart_local::HartLocal;
Expand Down Expand Up @@ -95,8 +97,8 @@ static NUM_NOTIFY_LOCAL: Counter = counter!("scheduler.num-notify-local");

/// A scheduler worker
///
/// Data is stack-allocated and never migrates threads
pub struct Worker {
/// Data is stack-allocated and never migrates harts.
struct Worker {
hartid: usize,
/// True if the scheduler is being shutdown
is_shutdown: bool,
Expand All @@ -113,9 +115,14 @@ pub struct Worker {

/// Core data
///
/// Data is heap-allocated and migrates threads.
/// Data is heap-allocated and migrates harts.
///
/// You can think of `Core`s and `Worker`s a bit like robots with pluggable batteries. Just like a
/// robot needs the battery to operate, a `Worker` needs a `Core` to operate. Workers are cooperative
/// and will give up their `Core` if they are done with their work or become blocked waiting for
/// interrupts. This allows other `Worker`s to pick up the `Core` and continue work.
#[repr(align(128))]
pub(super) struct Core {
pub struct Core {
/// Index holding this core's remote/shared state.
pub(super) index: usize,
/// The worker-local run queue.
Expand Down Expand Up @@ -150,7 +157,7 @@ pub(super) struct Shared {
/// Per-hart thread-local data. Logically this is part of the [`Worker`] struct, but placed here
/// into a TLS slot instead of stack allocated so we can access it from other places (i.e. we only
/// need access to the scheduler handle instead of access to the workers stack which wouldn't work).
pub(super) tls: HartLocal<Context>,
pub(super) per_hart: HartLocal<Context>,
/// Signal to workers that they should be shutting down.
pub(super) shutdown: AtomicBool,
/// Whether to shut down the executor when all tasks are processed, used in tests.
Expand Down Expand Up @@ -178,7 +185,10 @@ pub(super) struct Remote {
pub(super) steal: queue::Steal,
}

/// Thread-local context
/// Hart-local context
///
/// Logically this is part of the [`Worker`] struct, but is kept separate to allow access from
/// other parts of the code.
pub(super) struct Context {
/// Handle to the current scheduler
handle: &'static Handle,
Expand Down Expand Up @@ -208,7 +218,7 @@ pub fn run(handle: &'static Handle, hartid: usize, initial: impl FnOnce()) -> Re
};

#[expect(tail_expr_drop_order, reason = "")]
let cx = handle.shared.tls.get_or(|| Context {
let cx = handle.shared.per_hart.get_or(|| Context {
handle,
core: RefCell::new(None),
lifo_enabled: Cell::new(true),
Expand Down Expand Up @@ -518,6 +528,7 @@ impl Worker {

// Safety: we're parking only for a very small amount of time, this is fine
unsafe {
// TODO cleanup
log::trace!("spin stalling for {:?}", Duration::from_micros(i as u64));
arch::hart_park_timeout(Duration::from_micros(i as u64));
log::trace!("after spin stall");
Expand Down Expand Up @@ -740,7 +751,7 @@ impl Core {

impl Shared {
pub(in crate::executor) fn schedule_task(&self, task: TaskRef, is_yield: bool) {
if let Some(cx) = self.tls.get() {
if let Some(cx) = self.per_hart.get() {
// And the current thread still holds a core
if let Some(core) = cx.core.borrow_mut().as_mut() {
if is_yield {
Expand Down
3 changes: 2 additions & 1 deletion kernel/src/executor/task/join_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.

use super::raw::{Header, TaskRef};
use super::raw::Header;
use crate::executor::task::TaskRef;
use core::fmt;
use core::future::Future;
use core::marker::PhantomData;
Expand Down
Loading

0 comments on commit 85adad4

Please sign in to comment.