Skip to content

Latest commit

 

History

History
417 lines (369 loc) · 13.9 KB

07-ipc.org

File metadata and controls

417 lines (369 loc) · 13.9 KB

Inter-process Communication (IPC)

After the last section we can now run user programs with their own stack and heap memory allocation. They can print strings to screen, but not much else. Now we’re going to enable threads to read and write to other streams of data, enabling user programs to use the keyboard input to make our operating system interactive.

To keep things simple we’re going to try and treat reading and writing from files, devices like keyboards, networks etc. as communications between processes.

Communication will be over Rendezvous channels with no message buffering. A thread which sends to a channel blocks until another thread receives, and vice-versa. We can therefore have three states: The rendezvous can be empty, with no reader or writer; a thread can be sending and waiting for a receiver; or a thread is receiving and waiting for a sender. We can represent this with a Rust enum in a new file kernel/src/rendezvous.rs

use alloc::boxed::Box;
use crate::process::Thread;

pub enum Rendezvous {
    Empty,
    Sending(Box<Thread>),
    Receiving(Box<Thread>)
}

then add this new file to lib.rs:

pub mod allocator;
pub mod syscalls;
pub mod process;
pub mod rendezvous; // New

Every file that a process opens is going to correspond to a Rendezvous, which will be referred to by number like a file handle. Perhaps each process could have a HashMap of integer to Rendezvous, but HashMap is only in std, not in alloc::collections (see this open issue), and it’s probably easier to just store Rendezvous in a Vec inside each Process (process.rs):

struct Process {
    page_table_physaddr: u64,
    handles: Vec<Arc<RwLock<Rendezvous>>>, // New
}

We use an Arc because although a Rendezvous could be used to communicate between threads, they will likely be shared between two (or more) processes. We’ve now created opportunities for reference loops because Threads have Arc references to Processes, which have Arc references to Rendezvous, which can own Threads (in boxes).

Arc containers don’t allow obtaining mutable references the contents, so to allow mutation of the Rendezvous (i.e. sending and receiving messages) we use an RwLock to control access.

We’ve added another field to Process so have to update new_kernel_thread and new_user_thread e.g.:

process: Arc::new(Process {
    page_table_physaddr: 0, // kernel thread
    handles: Vec::new(), // New
}),

and we can add a Thread method to access these handles:

impl Thread {
    pub fn rendezvous(&self, id: u64)
                      -> Option<Arc<RwLock<Rendezvous>>> {
        self.process.handles
            .get(id as usize)
            .map(|rv| rv.clone())
    }
}

Sending and receiving

First we need to decide what we’re going to send and receive. To keep it general we can define a Message enum with a short kind and a long kind. The short kind might represent messages which fit in registers, while the long may involve transferring memory pages. In rendezvous.rs:

pub enum Message {
    Short(usize),
    Long,
}

When send is called with a Thread and the Message, the action to take depends on the state of the Rendezvous, which can be can be one of:

  1. Empty: Change to Sending state, storing the Thread and returning nothing
  2. Sending: Keep Sending state, return the calling Thread and signal an error to the caller because only one thread can be waiting to send.
  3. Receiving: Change to Empty, return both the receiving thread and sending thread

The send method can therefore return zero, one or two Threads, so we’ll use a return type of =

To modify the Rendezvous in place via the mutable reference (&mut self), we can use the same method that Option<> does, and use <a href=” https://doc.rust-lang.org/beta/core/mem/fn.replace.html”>core::mem::replace

use core::mem;

impl Rendezvous {
    pub fn send(&mut self, thread: Option<Box<Thread>>, message: Message)
                -> (Option<Box<Thread>>, Option<Box<Thread>>) {
        match &*self {
            Rendezvous::Empty => {
                *self = Rendezvous::Sending(thread, message);
                (None, None)
            }
            Rendezvous::Sending(_, _) => {
                if let Some(t) = &thread {
                    t.return_error(1);
                }
                (thread, None)
            }
            Rendezvous::Receiving(_) => {
                if let Rendezvous::Receiving(rec_thread) = mem::replace(self, Rendezvous::Empty) {
                    rec_thread.return_message(message);
                    if let Some(ref t) = thread {
                        t.return_error(0);
                    }
                    return (Some(rec_thread), thread);
                }
                (None, None) // This should never be reached
            }
        }
    }
}

/// /// 1. Empty -> Receiving, return (None, None) /// 2. Sending -> Empty, return (receiving thread, sending thread) /// 3. Receiving -> return (receiving thread, None) /// Error returned to thread /// /// Returns /// ------- /// /// Zero, one or two threads (thread1, thread2) /// /// thread1 should be started asap /// thread2 should be scheduled

impl Rendezvous {
    ...
    pub fn receive(&mut self, thread: Box<Thread>) -> (Option<Box<Thread>>, Option<Box<Thread>>) {
        match &*self {
            Rendezvous::Empty => {
                *self = Rendezvous::Receiving(thread);
                (None, None)
            }
            Rendezvous::Sending(_, _) => {
                if let Rendezvous::Sending(snd_thread, message) = mem::replace(self, Rendezvous::Empty) {
                    thread.return_message(message);
                    if let Some(ref t) = snd_thread {
                        t.return_error(0);
                    }
                    return (Some(thread), snd_thread);
                }
                (None, None) // This should never be reached
            }
            Rendezvous::Receiving(_) => {
                thread.return_error(2);
                (Some(thread), None)
            }
        }
    }
}

In send and receive the Thread methods return_error() and return_message are used to send data to the threads:

use crate::rendezvous::Message;

impl Thread {
    fn context_mut(&self) -> &mut Context {
        unsafe {&mut *(self.context as *mut Context)}
    }

    pub fn return_error(&self, error_code: usize) {
        self.context_mut().rax = error_code;
    }

    pub fn return_message(&self, message: Message) {
        let context = self.context_mut();
        context.rax = 0; // No error
        match message {
            Message::Short(value) => {
                context.rdi = value;
            },
            Message::Long => {
                context.rdi = 42;
            }
        }
    }
}

Note that we have to be a little careful with which registers are modified in return_message(): the sysret instruction takes the instruction pointer from RCX, and RFLAGS from R11. LLVM seems to reserve RBX.

Sending keyboard events

use alloc::sync::Arc;
use spin::RwLock;
use crate::rendezvous::{Rendezvous, Message};

lazy_static! {
    static ref KEYBOARD_RENDEZVOUS: Arc<RwLock<Rendezvous>> =
        Arc::new(RwLock::new(Rendezvous::Empty));
}

pub fn keyboard_rendezvous() -> Arc<RwLock<Rendezvous>> {
    KEYBOARD_RENDEZVOUS.clone()
}
extern "x86-interrupt" fn keyboard_interrupt_handler(
    _stack_frame: InterruptStackFrame)
{
    ...
        match key {
            DecodedKey::Unicode(character) => {
                print!("{}", character);
                let (thread1, thread2) =
                    KEYBOARD_RENDEZVOUS.write()
                    .send(None, Message::Short(character as usize));
                if let Some(t) = thread2 {
                    process::schedule_thread(t);
                }
                if let Some(t) = thread1 {
                    process::schedule_thread(t);
                }
            },
            DecodedKey::RawKey(key) => print!("{:?}", key),
        }
}

In process.rs:

pub fn schedule_thread(thread: Box<Thread>) {
    interrupts::without_interrupts(|| {
        RUNNING_QUEUE.write().push_front(thread);
    });
}
use crate::interrupts::{Context, INTERRUPT_CONTEXT_SIZE, keyboard_rendezvous};

...
    Box::new(Thread {
        tid: unique_id(),
        // Create a new process
        process: Arc::new(Process {
            page_table_physaddr: user_page_table_physaddr,
            handles: Vec::from([keyboard_rendezvous()]), // New
        }),
        ...

Adding a sys_receive syscall

In syscall.rs

 use crate::rendezvous;

2 => sys_write(arg1 as *const u8, arg2 as usize),
3 => sys_receive(context_ptr, arg1), // New
_ => println!("Unknown syscall {:?} {} {} {}",
              context_ptr, syscall_id, arg1, arg2)
pub const SYSCALL_ERROR_SEND_BLOCKING: usize = 1;
pub const SYSCALL_ERROR_RECV_BLOCKING: usize = 2;
pub const SYSCALL_ERROR_INVALID_HANDLE: usize = 3;

fn sys_receive(context_ptr: *mut Context, handle: u64) {
    // Extract the current thread
    if let Some(thread) = process::take_current_thread() {
        let current_tid = thread.tid();
        thread.set_context(context_ptr);

        // Get the Rendezvous and call
        if let Some(rdv) = thread.rendezvous(handle) {
            let (thread1, thread2) = rdv.write().receive(thread);
            // thread1 should be started asap
            // thread2 should be scheduled

            let mut returning = false;
            for maybe_thread in [thread1, thread2] {
                if let Some(t) = maybe_thread {
                    if t.tid() == current_tid {
                        // Same thread -> return
                        process::set_current_thread(t);
                        returning = true;
                    } else {
                        process::schedule_thread(t);
                    }
                }
            }

            if !returning {
                // Original thread is waiting.
                // Should switch to a different thread
                // For now just wait for the timer interrupt
                unsafe {
                    asm!("sti",
                         "2:",
                         "hlt",
                         "jmp 2b");
                }
            }
        } else {
            // Missing handle
            thread.return_error(SYSCALL_ERROR_INVALID_HANDLE);
            process::set_current_thread(thread);
        }
    }
}

which needs a few more utilities in process.rs, to interact with the scheduling and set the thread context:

impl Thread {
    /// Get the Thread ID
    pub fn tid(&self) -> u64 {
        self.tid
    }
    pub fn set_context(&mut self, context_ptr: *mut Context) {
      self.context = context_ptr as u64;
    }
    ...
}

/// Takes ownership of the current Thread
pub fn take_current_thread() -> Option<Box<Thread>> {
    CURRENT_THREAD.write().take()
}

/// Makes the given thread the current thread
/// If another thread was running schedule it
pub fn set_current_thread(thread: Box<Thread>) {
    let old_current = CURRENT_THREAD.write().replace(thread);
    if let Some(t) = old_current {
        schedule_thread(t);
    }
}

Calling from user space

In hello.rs

pub unsafe extern "sysv64" fn _start() -> ! {
    ...
    loop{
        let err: u64;
        let value: u64;
        asm!("mov rax, 3", // sys_receive
             "mov rdi, 0", // handle
             "syscall",
             lateout("rax") err,    // Error code
             lateout("rdi") value); // Short message value
        println!("Received: {} , {} => {}", err, value,
                 char::from_u32(value as u32).unwrap());
    }
}

Running this an pressing some keys should print the characters from this user thread! Try holding down a key so keyboard events are generated rapidly: That should check what happens if send is called on the Rendezvous before receive and so the sys_receive function returns via sysret rather than waiting for an interrupt.

There are still some problems:

  1. If we have more than one user thread, and they both try to read from the keyboard, one of them will get an error message. You can test this by creating two user threads in main.rs (the kernel_thread_main function): One of the threads will keep printing error code 2, which we set in Rendezvous.receive().
  2. There are unnecessary delays between an event occurring (e.g. a key press) and the message reaching the code that will respond (the hello program): In sys_receive if there is no message then we wait (call hlt) until the next timer interrupt. More seriously, when an event occurs (in keyboard_interrupt_handler) we have to schedule the receiving thread and wait until the next timer interrupt. That limits a sequence of messages to a maximum rate of one message per timer interrupt, about 10 per second. At that rate a chain of only a few message hops will introduce a noticeable delay.

In the next section we’ll look at fixing the second problem to make IPC faster.