Skip to content

Commit

Permalink
refactor(turbopack): Rewrite CollectiblesSource callsites to use Oper…
Browse files Browse the repository at this point in the history
…ationVc (part 3/3) (#74173)

`OperationVc`s should be used with `CollectiblesSource` instead of `Vc`s because collectibles represent a side-effect or implicit extra return value of a function's execution.
  • Loading branch information
bgw authored Jan 14, 2025
1 parent 299d541 commit 7627f1d
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 128 deletions.
129 changes: 69 additions & 60 deletions turbopack/crates/turbo-tasks-testing/tests/collectibles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ static REGISTRATION: Registration = register!();
#[tokio::test]
async fn transitive_emitting() {
run(&REGISTRATION, || async {
let result = my_transitive_emitting_function("".into(), "".into());
result.strongly_consistent().await?;
let list = result.peek_collectibles::<Box<dyn ValueToString>>();
let result_op = my_transitive_emitting_function("".into(), "".into());
let result_val = result_op.connect().strongly_consistent().await?;
let list = result_op.peek_collectibles::<Box<dyn ValueToString>>();
assert_eq!(list.len(), 2);
let mut expected = ["123", "42"].into_iter().collect::<HashSet<_>>();
for collectible in list {
assert!(expected.remove(collectible.to_string().await?.as_str()))
}
assert_eq!(result.await?.0, 0);
assert_eq!(result_val.0, 0);
anyhow::Ok(())
})
.await
Expand All @@ -34,15 +34,15 @@ async fn transitive_emitting() {
#[tokio::test]
async fn transitive_emitting_indirect() {
run(&REGISTRATION, || async {
let result = my_transitive_emitting_function("".into(), "".into());
let collectibles = my_transitive_emitting_function_collectibles("".into(), "".into());
let list = collectibles.strongly_consistent().await?;
let result_op = my_transitive_emitting_function("".into(), "".into());
let collectibles_op = my_transitive_emitting_function_collectibles("".into(), "".into());
let list = collectibles_op.connect().strongly_consistent().await?;
assert_eq!(list.len(), 2);
let mut expected = ["123", "42"].into_iter().collect::<HashSet<_>>();
for collectible in list.iter() {
assert!(expected.remove(collectible.to_string().await?.as_str()))
}
assert_eq!(result.await?.0, 0);
assert_eq!(result_op.connect().await?.0, 0);
anyhow::Ok(())
})
.await
Expand All @@ -52,15 +52,15 @@ async fn transitive_emitting_indirect() {
#[tokio::test]
async fn multi_emitting() {
run(&REGISTRATION, || async {
let result = my_multi_emitting_function();
result.strongly_consistent().await?;
let list = result.peek_collectibles::<Box<dyn ValueToString>>();
let result_op = my_multi_emitting_function();
let result_val = result_op.connect().strongly_consistent().await?;
let list = result_op.peek_collectibles::<Box<dyn ValueToString>>();
assert_eq!(list.len(), 2);
let mut expected = ["123", "42"].into_iter().collect::<HashSet<_>>();
for collectible in list {
assert!(expected.remove(collectible.to_string().await?.as_str()))
}
assert_eq!(result.await?.0, 0);
assert_eq!(result_val.0, 0);
anyhow::Ok(())
})
.await
Expand All @@ -70,12 +70,13 @@ async fn multi_emitting() {
#[tokio::test]
async fn taking_collectibles() {
run(&REGISTRATION, || async {
let result = my_collecting_function();
let list = result.take_collectibles::<Box<dyn ValueToString>>();
let result_op = my_collecting_function();
let result_val = result_op.connect().strongly_consistent().await?;
let list = result_op.take_collectibles::<Box<dyn ValueToString>>();
// my_collecting_function already processed the collectibles so the list should
// be empty
assert!(list.is_empty());
assert_eq!(result.await?.0, 0);
assert_eq!(result_val.0, 0);
anyhow::Ok(())
})
.await
Expand All @@ -85,13 +86,13 @@ async fn taking_collectibles() {
#[tokio::test]
async fn taking_collectibles_extra_layer() {
run(&REGISTRATION, || async {
let result = my_collecting_function_indirect();
result.strongly_consistent().await?;
let list = result.take_collectibles::<Box<dyn ValueToString>>();
let result_op = my_collecting_function_indirect();
let result_val = result_op.connect().strongly_consistent().await?;
let list = result_op.take_collectibles::<Box<dyn ValueToString>>();
// my_collecting_function already processed the collectibles so the list should
// be empty
assert!(list.is_empty());
assert_eq!(result.await?.0, 0);
assert_eq!(result_val.0, 0);
anyhow::Ok(())
})
.await
Expand All @@ -101,38 +102,38 @@ async fn taking_collectibles_extra_layer() {
#[tokio::test]
async fn taking_collectibles_parallel() {
run(&REGISTRATION, || async {
let result = my_transitive_emitting_function("".into(), "a".into());
result.strongly_consistent().await?;
let list = result.take_collectibles::<Box<dyn ValueToString>>();
let result_op = my_transitive_emitting_function("".into(), "a".into());
let result_val = result_op.connect().strongly_consistent().await?;
let list = result_op.take_collectibles::<Box<dyn ValueToString>>();
assert_eq!(list.len(), 2);
assert_eq!(result.await?.0, 0);
assert_eq!(result_val.0, 0);

let result = my_transitive_emitting_function("".into(), "b".into());
result.strongly_consistent().await?;
let list = result.take_collectibles::<Box<dyn ValueToString>>();
let result_op = my_transitive_emitting_function("".into(), "b".into());
let result_val = result_op.connect().strongly_consistent().await?;
let list = result_op.take_collectibles::<Box<dyn ValueToString>>();
assert_eq!(list.len(), 2);
assert_eq!(result.await?.0, 0);
assert_eq!(result_val.0, 0);

let result =
let result_op =
my_transitive_emitting_function_with_child_scope("".into(), "b".into(), "1".into());
result.strongly_consistent().await?;
let list = result.take_collectibles::<Box<dyn ValueToString>>();
let result_val = result_op.connect().strongly_consistent().await?;
let list = result_op.take_collectibles::<Box<dyn ValueToString>>();
assert_eq!(list.len(), 2);
assert_eq!(result.await?.0, 0);
assert_eq!(result_val.0, 0);

let result =
let result_op =
my_transitive_emitting_function_with_child_scope("".into(), "b".into(), "2".into());
result.strongly_consistent().await?;
let list = result.take_collectibles::<Box<dyn ValueToString>>();
let result_val = result_op.connect().strongly_consistent().await?;
let list = result_op.take_collectibles::<Box<dyn ValueToString>>();
assert_eq!(list.len(), 2);
assert_eq!(result.await?.0, 0);
assert_eq!(result_val.0, 0);

let result =
let result_op =
my_transitive_emitting_function_with_child_scope("".into(), "c".into(), "3".into());
result.strongly_consistent().await?;
let list = result.take_collectibles::<Box<dyn ValueToString>>();
let result_val = result_op.connect().strongly_consistent().await?;
let list = result_op.take_collectibles::<Box<dyn ValueToString>>();
assert_eq!(list.len(), 2);
assert_eq!(result.await?.0, 0);
assert_eq!(result_val.0, 0);

anyhow::Ok(())
})
Expand All @@ -143,46 +144,53 @@ async fn taking_collectibles_parallel() {
#[turbo_tasks::value(transparent)]
struct Collectibles(AutoSet<ResolvedVc<Box<dyn ValueToString>>>);

#[turbo_tasks::function]
#[turbo_tasks::function(operation)]
async fn my_collecting_function() -> Result<Vc<Thing>> {
let result = my_transitive_emitting_function("".into(), "".into());
result.take_collectibles::<Box<dyn ValueToString>>();
Ok(result)
let result_op = my_transitive_emitting_function("".into(), "".into());
let result_vc = result_op.connect();
result_vc.await?;
result_op.take_collectibles::<Box<dyn ValueToString>>();
Ok(result_vc)
}

#[turbo_tasks::function]
#[turbo_tasks::function(operation)]
async fn my_collecting_function_indirect() -> Result<Vc<Thing>> {
let result = my_collecting_function();
result.strongly_consistent().await?;
let list = result.peek_collectibles::<Box<dyn ValueToString>>();
let result_op = my_collecting_function();
let result_vc = result_op.connect();
result_vc.strongly_consistent().await?;
let list = result_op.peek_collectibles::<Box<dyn ValueToString>>();
// my_collecting_function already processed the collectibles so the list should
// be empty
assert!(list.is_empty());
Ok(result)
Ok(result_vc)
}

#[turbo_tasks::function]
#[turbo_tasks::function(operation)]
async fn my_multi_emitting_function() -> Result<Vc<Thing>> {
my_transitive_emitting_function("".into(), "a".into()).await?;
my_transitive_emitting_function("".into(), "b".into()).await?;
my_transitive_emitting_function("".into(), "a".into())
.connect()
.await?;
my_transitive_emitting_function("".into(), "b".into())
.connect()
.await?;
my_emitting_function("".into()).await?;
Ok(Thing::cell(Thing(0)))
}

#[turbo_tasks::function]
#[turbo_tasks::function(operation)]
async fn my_transitive_emitting_function(key: RcStr, _key2: RcStr) -> Result<Vc<Thing>> {
my_emitting_function(key).await?;
Ok(Thing::cell(Thing(0)))
}

#[turbo_tasks::function]
#[turbo_tasks::function(operation)]
async fn my_transitive_emitting_function_collectibles(
key: RcStr,
key2: RcStr,
) -> Result<Vc<Collectibles>> {
let result = my_transitive_emitting_function(key, key2);
let result_op = my_transitive_emitting_function(key, key2);
Ok(Vc::cell(
result
result_op
.peek_collectibles::<Box<dyn ValueToString>>()
.into_iter()
.map(|v| v.to_resolved())
Expand All @@ -193,17 +201,18 @@ async fn my_transitive_emitting_function_collectibles(
))
}

#[turbo_tasks::function]
#[turbo_tasks::function(operation)]
async fn my_transitive_emitting_function_with_child_scope(
key: RcStr,
key2: RcStr,
_key3: RcStr,
) -> Result<Vc<Thing>> {
let thing = my_transitive_emitting_function(key, key2);
thing.strongly_consistent().await?;
let list = thing.peek_collectibles::<Box<dyn ValueToString>>();
let thing_op = my_transitive_emitting_function(key, key2);
let thing_vc = thing_op.connect();
thing_vc.await?;
let list = thing_op.peek_collectibles::<Box<dyn ValueToString>>();
assert_eq!(list.len(), 2);
Ok(thing)
Ok(thing_vc)
}

#[turbo_tasks::function]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ impl ValueToString for Collectible {
}
}

#[turbo_tasks::function]
async fn inner_compute(input: Vc<ChangingInput>) -> Result<Vc<u32>> {
#[turbo_tasks::function(operation)]
async fn inner_compute(input: ResolvedVc<ChangingInput>) -> Result<Vc<u32>> {
println!("start inner_compute");
let value = *input.await?.state.get();
tokio::time::sleep(Duration::from_millis(200)).await;
Expand All @@ -90,10 +90,10 @@ async fn inner_compute(input: Vc<ChangingInput>) -> Result<Vc<u32>> {
}

#[turbo_tasks::function]
async fn compute(input: Vc<ChangingInput>) -> Result<Vc<Output>> {
async fn compute(input: ResolvedVc<ChangingInput>) -> Result<Vc<Output>> {
println!("start compute");
let operation = inner_compute(input);
let value = *operation.await?;
let value = *operation.connect().await?;
let collectibles = operation.peek_collectibles::<Box<dyn ValueToString>>();
if collectibles.len() > 1 {
bail!("expected 0..1 collectible, found {}", collectibles.len());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ impl ValueToString for Collectible {
}
}

#[turbo_tasks::function]
fn inner_compute(input: Vc<ChangingInput>) -> Vc<u32> {
inner_compute2(input, 1000)
#[turbo_tasks::function(operation)]
fn inner_compute(input: ResolvedVc<ChangingInput>) -> Vc<u32> {
inner_compute2(*input, 1000)
}

#[turbo_tasks::function]
Expand All @@ -79,12 +79,12 @@ async fn inner_compute2(input: Vc<ChangingInput>, innerness: u32) -> Result<Vc<u
}

#[turbo_tasks::function]
async fn compute(input: Vc<ChangingInput>, innerness: u32) -> Result<Vc<Output>> {
async fn compute(input: ResolvedVc<ChangingInput>, innerness: u32) -> Result<Vc<Output>> {
if innerness > 0 {
return Ok(compute(input, innerness - 1));
return Ok(compute(*input, innerness - 1));
}
let operation = inner_compute(input);
let value = *operation.await?;
let value = *operation.connect().await?;
let collectibles = operation.peek_collectibles::<Box<dyn ValueToString>>();
if collectibles.len() != 1 {
bail!("expected 1 collectible, found {}", collectibles.len());
Expand Down
21 changes: 1 addition & 20 deletions turbopack/crates/turbo-tasks/src/vc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::{
};

use anyhow::Result;
use auto_hash_map::AutoSet;
use serde::{Deserialize, Serialize};

pub use self::{
Expand All @@ -35,7 +34,7 @@ use crate::{
manager::{create_local_cell, try_get_function_meta},
registry,
trace::{TraceRawVcs, TraceRawVcsContext},
CellId, CollectiblesSource, RawVc, ResolveTypeError, SharedReference, ShrinkToFit,
CellId, RawVc, ResolveTypeError, SharedReference, ShrinkToFit,
};

/// A "Value Cell" (`Vc` for short) is a reference to a memoized computation result stored on the
Expand Down Expand Up @@ -425,11 +424,6 @@ impl<T> Vc<T>
where
T: ?Sized,
{
/// Connects the operation pointed to by this `Vc` to the current task.
pub fn connect(vc: Self) {
vc.node.connect()
}

/// Returns a debug identifier for this `Vc`.
pub async fn debug_identifier(vc: Self) -> Result<String> {
let resolved = vc.resolve().await?;
Expand Down Expand Up @@ -587,19 +581,6 @@ where
}
}

impl<T> CollectiblesSource for Vc<T>
where
T: ?Sized,
{
fn take_collectibles<Vt: VcValueTrait>(self) -> AutoSet<Vc<Vt>> {
self.node.take_collectibles()
}

fn peek_collectibles<Vt: VcValueTrait>(self) -> AutoSet<Vc<Vt>> {
self.node.peek_collectibles()
}
}

impl<T> From<RawVc> for Vc<T>
where
T: ?Sized,
Expand Down
Loading

0 comments on commit 7627f1d

Please sign in to comment.