Skip to content

Latest commit

 

History

History
526 lines (353 loc) · 18.3 KB

File metadata and controls

526 lines (353 loc) · 18.3 KB

Async Rust 03: Why shouldn't I hold a mutex guard across an await point?

假设我们想在异步代码中共享状态

Rust中的mutex和mutex guard

传统的互斥锁有两个方法(这里“传统”意味着与Rust不同):lock和unlock。代码锁定mutex,执行某些操作,然后解锁互斥锁。如果其他代码尝试锁定互斥锁,则会阻塞,直到互斥锁解锁。

我们可以通过下面的假想类型来想象rust中的流程:

fn exclusive_access(mutex: &MyMutex, protected: &MyObject) {
    mutex.lock();
    protected.do_something();
    mutex.unlock();
}

我们必须相信在访问MyObject的任何地方,都锁定了相同的mutex。

而且你可能会忘记在完成后解锁互斥锁。

更可怕的是,在 do_something() 出错时提前返回,这样永远也无法解锁互斥锁了。

Rust如何处理mutex?

在Rust中,mutex和它保护的对象是绑定在一起的,这就是 std::sync::Mutex

当你尝试锁定互斥锁时,你会得到一个MutexGuard。你可以通过它访问被保护的数据。

当MutexGuard超出作用范围时,互斥锁就自动解锁,这种行为叫做RAII。

这就意味着只有我们能成功锁定mutex时,才能得到一个MutexGuard。

我们用Rust标准库来实现上面代码的功能:

fn exclusive_access(mutex: &std::sync::Mutex<MyObject>) {
    let guard = mutex
        .lock()
        .expect("the mutex is poisoned");

    // MutexGuard会自动解引用,所以我们可以直接调用MyObject方法
    guard.do_something();
}

我们无法在没有锁定mutex的情况下意外访问MyObject,我们也无需考虑解锁mutex。

除非持有MutexGuard的线程发生了panic,那么当MutexGuard被释放时,mutex不会自动解锁,而会标记为“poisoning”。

我们不会深入讨论mutex poisoning的情况,但是你可以在这里找到更多信息:https://doc.rust-lang.org/std/sync/struct.Mutex.html#poisoning

mutex时序图

让我们想象两个线程访问我们保护的对象。

这里简化了一些细节。例如,我们不能直接在创建两个线程时传递一个mutex,而是用智能指针将它包装起来。不过这对本例而言不重要。

Alt text

现在,让我们回到在异步上下文中使用mutex保护(这里指错误地使用它)。

持有MutexGuard的异步函数

我们假设有一个例子,需要在await点持有一个mutex。

Note

await点指异步函数中遇到await时发生暂停的那个点

这可能包括

  • 读取共享计数器
  • 访问异步共享资源
  • 写入新的值到共享计数器

现在,我们用让出执行权给运行时来模拟异步共享资源。

这是我们的异步函数:

use std::sync::{Arc, Mutex};

async fn hold_mutex_guard(data: Arc<Mutex<u64>>) -> Result<(), DataAccessError> {
    let mut guard = data.lock().map_err(|_| DataAccessError {})?;
    println!("existing value: {}", *guard);

    tokio::task::yield_now().await;

    *guard = *guard + 1;
    println!("new value: {}", *guard);

    Ok(())
}
  1. 异步函数接受一个数据参数,它是一个 Arc<Mutex<u64>> 类型
  2. Mutex<u64> 表示有一个 u64 类型的数据被 Mutex 保护,只能通过锁进行访问
  3. Arc 表示这是一个原子引用计数的共享指针,可以在多个任务间共享 Mutex
  4. 首先我们锁定 Mutex 获取 MutexGuard
  5. 打印共享数据值
  6. 在await点让出执行权,模拟异步操作
  7. 修改共享函数的值
  8. 打印修改后的值
  9. 完成

that return type

锁定mutex锁可能会失败(mutex可能会poisoning),所以在这种情况下,我们应该返回一个错误。

我们为这种情况定义了一个新的类型:

use std::{error::Error, fmt};

#[derive(Debug)]
struct DataAccessError {}

impl fmt::Display for DataAccessError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "there was an error accessing the shared data")
    }
}

impl Error for DataAccessError {}

现在,让我们来运行这段代码看看。

运行持有MutexGuard的异步函数

让我们调用我们的future,或者说await它。

#[tokio::main]
async fn main() {
    let data = Arc::new(Mutex::new(0_u64));

    hold_mutex_guard(Arc::clone(&data))
        .await
        .expect("failed to perform operation");
}

输出:

existing value: 0
new value: 1

请记住,它在await点上持有一个互斥锁!我们需要更努力地尝试“做坏事”。

spawn

我们不能连续两次等待我们的异步函数,因为这样它会依次运行。

然而,有一种方法可以在异步运行时并发运行多个future.它被称为spawn。在tokio中,它是 tokio::spawn

使用 tokio::spawn 时,你不需要 .await。future将被设置为立即在新任务中执行。

但是,新任务可能不会立即被轮询。这取决于异步运行时工作线程的忙碌程度。

让我们创建一个简单的示例:

// `flavor`` has to be one of these values, not both. This code won't compile.
#[tokio::main(flavor = "current_thread|multi_thread")]
async fn main() {
    tokio::spawn(spawn_again());
    do_nothing().await;

    tokio::task::yield_now().await
    tokio::task::yield_now().await

    // ... Let's pretend there's more here and we're not returning yet.
}

async fn spawn_again() {
    tokio::spawn(do_nothing());
}

async fn do_nothing() {
    // There's nothing here
}

这里我们的async main使用 spawn_again 函数生成一个任务。

然后,它等待一个名为 do_nothing 的异步函数。

异步函数 spawn_again 使用 do_nothing 生成一个任务。

让我们看看这在不同的运行时调度器下会如何工作。

在当前线程上生成任务

一个异步运行时可能只有一个工作线程。例如,在tokio中,current thread scheduler就是这样的。然后我们可以在另一个任务内部启动给一个任务。但是,直到当前任务让出控制权给调度器,这个任务才会被轮询。

这是时序图:

Alt text

注意,被启动的任务需要等待运行时空闲后才能被轮询。当一个任务等待一个future时,并且没有新任务产生,它会立即被轮询。

  • spawn: 创建新任务,但可能要等待运行时轮询
  • await:在当前任务中执行,立即轮询,并暂停当前任务

在多线程上生成任务

另一种情况是,异步运行时有多个工作线程。例如,在tokio中,multi thread scheduler就是这样的。这意味着我们可以在另一个任务内部启动一个任务,而不必等待运行时空闲。

让我们以有两个工作线程的运行时为例,看看时序图会是什么样子。注意现在有了并行,所以操作的确切顺序可能有所不同。

Alt text

实际上,任务是在同一个工作线程上启动的,该工作线程正在运行生成任务的任务。如果另一个工作线程空闲,它可能会从第一个工作线程的队列中窃取任务(这部分内容超出本文讨论范围)。

wait for me to finish

启动操作返回一个连接句柄:tokio::task::JoinHandle。我们可以使用它来等待任务完成。它也可以来中止启动的任务。

现在让我们回到尝试破坏的过程中。

生成多个异步函数

让我们生成几个异步函数实例:

#[tokio::main]
async fn main() {
    let data = Arc::new(Mutex::new(0_u64));

    tokio::spawn(hold_mutex_guard(Arc::clone(&data)));
    tokio::spawn(hold_mutex_guard(Arc::clone(&data)));
}

这会编译失败,并出现大量错误。

error: future cannot be sent between threads safely
   --> resources/understanding-async-await/src/bin/mutex_guard_async.rs:5:18
    |
5   |     tokio::spawn(hold_mutex_guard(Arc::clone(&data)));
    |                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ future returned by `hold_mutex_guard` is not `Send`
    |
    = help: within `impl Future<Output = Result<(), DataAccessError>>`, the trait `Send` is not implemented for `std::sync::MutexGuard<'_, u64>`
note: future is not `Send` as this value is used across an await
   --> resources/understanding-async-await/src/bin/mutex_guard_async.rs:15:29
    |
12  |     let mut guard = data.lock().map_err(|_| DataAccessError {})?;
    |         --------- has type `std::sync::MutexGuard<'_, u64>` which is not `Send`
...
15  |     tokio::task::yield_now().await;
    |                             ^^^^^^ await occurs here, with `mut guard` maybe used later
...
21  | }
    | - `mut guard` is later dropped here
note: required by a bound in `tokio::spawn`
   --> /Users/stainsby/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.27.0/src/task/spawn.rs:163:21
    |
163 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `spawn`

其实只有一个错误:Future不能安全地在线程间传递。

生成的任务可能在任何工作线程上启动。即使是当前线程的运行时,也可能有来自其他线程的任务生成。

所以future需要能够在线程间传递,但为什么不能呢?

note1:

note: future is not `Send` as this value is used across an await

它指向了mut guard,并告诉我们它不是Send,然后指向我们yield的 .await 作为有问题的await点。

note2:

note: requited by a bound in `tokio::spawn`

这个Send的东西并不是魔法,它是由tokio在 tokio::spawn 中显式指定的。

tokio::spawn 的代码:

pub fn spawn<T>(future: T) -> JoinHandle<T::Output>
where
    T: Future + Send + 'static,
    T::Output: Send + 'static,
{
    // (we're skipping the actual implementation)
}

我们可以看到T必须满需三个条件:

  • 实现Future trait
  • 实现Send trait
  • 拥有 'static 生命周期

我们知道不能在线程见传递数据,那为什么future不可以呢?

那么,一个类型要怎么实现Send呢?

marker traits

Rust中有一些特殊的trait,它们不需要任何方法,只是用来标记类型。

这些trait被称为标记trait(marker traits)。

如果我们查看 std::marker::Send trait,我们会发现它是unsafe的!

它遵循了rust的约定:当编译器无法判断是否安全时,用unsafe来表示我们担保其安全性。

默认情况下,如果一个结构体里所有成员都是Send的,那么它也是Send的。所以,通常我们不用关心手动标记Send。但是,我们需要警惕在哪些地方不能使用非Send类型。

回到搞破坏的正轨

我们不需要并行运行非Send的异步函数,只需要从别的地方尝试锁定mutex就行。

所以,让我们创建一个可以被生成的异步函数。它和之前的函数类似,但是没有yield(所以没有await点,也就不算真正的异步函数):

async fn yieldless_mutex_access(data: Arc<Mutex<u64>>) -> Result<(), DataAccessError> {
    let mut guard = data.lock().map_err(|_| DataAccessError {})?;
    println!("existing value: {}", *guard);

    *guard = *guard + 1;
    println!("new value: {}", *guard);

    Ok(())
}

我们没有在await点上持有MutexGuard,所以它是Send的。

为了确保会出问题,我们还需要做出一个改动:使用当前线程运行时。这意味着任务不会并行运行,所以更容易制造某些情况:

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let data = Arc::new(Mutex::new(0_u64));

    tokio::spawn(yieldless_mutex_access(Arc::clone(&data)));
    hold_mutex_guard(Arc::clone(&data))
        .await
        .expect("failed to perform operation");
}

这里我们生成了Send的异步函数 yieldless_mutex_access,然后await了我们的坏函数 hold_mutex_guard

输出:

existing value: 0

然后它就卡住了。

我们成功制造了死锁。

持有MutexGuard的Future

我们将手动实现一个执行相同操作的future。

future通常以状态机的方式实现。我们需要一个初始状态,最好还有一个明确的完成状态,中间还需要一个执行过一次yield后的状态。

我们的Future可以如下:

use std::sync::{Arc, Mutex};

enum HoldMutexGuard<'a> {
    Init {
        data: Arc<Mutex<u64>>,
    },
    Yielded {
        guard: MutexGuard<'a, u64>,
        _data: Arc<Mutex<u64>>,
    },
    Done,
}

我们把即将成为Future的东西封装在一个函数中:

fn hold_mutex_guard(
    data: Arc<Mutex<u64>>,
) -> impl Future<Output = Result<(), DataAccessError>> {
    HoldMutexGuard::Init { data }
}

状态机如下:

Alt text

Future从Init状态开始,第一次轮询时返回 Poll::Pending 并转移到 Yielded 状态。第二次轮询时返回 Poll::Ready 并转移到 Done 状态。

然而,实现起来会稍微复杂一点。

实现 HoldMutexGuard Future

impl<'a> Future for HoldMutexGuard<'a> {
    type Output = Result<(), DataAccessError>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let state = &mut *self;
        match state {
            Self::Init { data } => {
                let guard = unsafe {
                    // SAFETY: We will hold on to the Arc containing the mutex as long
                    //         as we hold onto the guard.
                    std::mem::transmute::<MutexGuard<'_, u64>, MutexGuard<'static, u64>>(
                        data.lock().map_err(|_| DataAccessError {})?,
                    )
                };
                println!("existing value: {}", *guard);

                cx.waker().wake_by_ref();
                *state = Self::Yielded {
                    guard: guard,
                    _data: Arc::clone(data),
                };

                Poll::Pending
            }
            Self::Yielded { guard, _data } => {
                println!("new value: {}", *guard);

                *state = Self::Done;

                Poll::Ready(Ok(()))
            }
            Self::Done => panic!("Please stop polling me!"),
        }
    }
}

现在,我们看一下 poll() 的实现。

等等,这是什么?

let state = &mut *self;

借用检查器会对任何与Pin有关的东西进行严格检查。我们需要修改self,但它被pin了,我们也需要引用它的部分。所以我们解引用pinned的self,然后获取一个可变引用。

第一次被轮询时,我们处于Init状态,所以执行到async函数中的yield_now调用。

一点点unsafe

不幸的是,我们不能直接将MutexGuard存储到它所保护的Mutex中。这会导致循环引用结构。Rust强烈反对这种结构,我们不得不使用unsafe来实现我们想要的功能。

我们需要将MutexGuard转换为一个拥有'static生命周期的对象。我们可以使用 std::mem::transmute 来做到这一点。

holding onto that guard

一旦我们有了MutexGuard,我们就打印该值。现在我们要返回给运行时。 所以就像我们的 YieldNow future中一样,我们需要先唤醒我们的waker。 否则我们的future将永远不会被再次轮询。 然后我们设置下一个状态:Yielded。(使用那个有趣的 &mut *self)并返回 Poll::Pending

下次对我们的future轮询时,我们已经处于Yielded状态。我们将打印 MutexGuard 中的值。然后进入 Done 状态并返回 Poll::Ready。此时,MutexGuard 将被丢弃。至此,实现就结束了。

这里重要的一点是,在 Yielded 状态下,我们持有 MutexGuard 并返回。 这也是我们的异步函数正在做的事情。 但我们并没有看得那么清楚。 我们只是看到.await。 但每当你的异步函数包含一个await点时,那就是future潜在的返回。 在返回之前,它必须将所有范围内的局部变量存储在自身中。

hanging around again

让我们用我们的Future再次重现那个挂起的程序吧。

我们将生成相同的异步函数yieldless_mutex_access来帮助引发挂起(实际上不做任何异步事情),并直接展开 async main()

fn main() {
    let body = async {
        let data = Arc::new(Mutex::new(0_u64));

        tokio::spawn(yieldless_mutex_access(Arc::clone(&data)));
        hold_mutex_guard(Arc::clone(&data))
            .await
            .expect("failed to perform operation");
    };

    return tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .expect("failed to build runtime")
        .block_on(body);
}

我们正在创建一个当前线程运行时,和之前一样。这使得触发悬挂行为更容易。

我们来看看时序图吧!

Alt text

重要的一点是两个Future。

首先生成 yieldless_mutex_access()

然后等待 HoldMutexGuard

正如我们在引入spawn时所看到的,新任务必须等待。运行时是单线程的,因此,使用 yieldless_mutex_access() 创建的新任务必须等待,直到当前任务向运行时转移执行权。

这意味着 HoldMutexGuard future 首先运行。它锁定互斥体并接收 MutexGuard。它唤醒了它的waker(所以返回 Poll::Pending 后会再次进行轮询),然后将状态更改为 Yielded,将 MutexGuard 存储在其自身中,然后返回 Poll::Pending,向运行时转移执行权。

现在运行时可以轮询下一个任务,即由 yieldless_mutex_access()产生的。此任务禅师锁定mutex,但mutex已经被锁定,因此它会阻塞,直到解锁为止。由于运行时只有一个线程,这会阻塞整个运行时,并造成死锁。

现在我们明白为什么了!

now what?

持有MutexGuard在await点上是安全的,因为它的 lock() 方法是异步的,在等待解锁时不会阻塞线程,这样其他持有锁的任务就可以继续执行。

然而,最好的方式是不使用mutex。应该将共享资源的完整所有权交给单个任务,并通过消息传递与该任务进行通信。

在第四部分,我们将研究消息传递和通道。