From 7627f1dc481f0242a0adf312c2870a1fb4422c82 Mon Sep 17 00:00:00 2001 From: Benjamin Woodruff Date: Mon, 13 Jan 2025 19:01:07 -0800 Subject: [PATCH] refactor(turbopack): Rewrite CollectiblesSource callsites to use OperationVc (part 3/3) (#74173) `OperationVc`s should be used with `CollectiblesSource` instead of `Vc`s because collectibles represent a side-effect or implicit extra return value of a function's execution. --- .../turbo-tasks-testing/tests/collectibles.rs | 129 ++++++++++-------- .../tests/dirty_in_progress.rs | 8 +- .../tests/recompute_collectibles.rs | 12 +- turbopack/crates/turbo-tasks/src/vc/mod.rs | 21 +-- .../crates/turbopack-tests/tests/execution.rs | 29 ++-- .../crates/turbopack-tests/tests/snapshot.rs | 23 ++-- .../turbopack/benches/node_file_trace.rs | 17 ++- turbopack/crates/turbopack/src/lib.rs | 8 ++ .../crates/turbopack/tests/node-file-trace.rs | 13 +- 9 files changed, 132 insertions(+), 128 deletions(-) diff --git a/turbopack/crates/turbo-tasks-testing/tests/collectibles.rs b/turbopack/crates/turbo-tasks-testing/tests/collectibles.rs index b8d04bdaf61ef..dd95eb49cc463 100644 --- a/turbopack/crates/turbo-tasks-testing/tests/collectibles.rs +++ b/turbopack/crates/turbo-tasks-testing/tests/collectibles.rs @@ -16,15 +16,15 @@ static REGISTRATION: Registration = register!(); #[tokio::test] async fn transitive_emitting() { run(®ISTRATION, || async { - let result = my_transitive_emitting_function("".into(), "".into()); - result.strongly_consistent().await?; - let list = result.peek_collectibles::>(); + let result_op = my_transitive_emitting_function("".into(), "".into()); + let result_val = result_op.connect().strongly_consistent().await?; + let list = result_op.peek_collectibles::>(); assert_eq!(list.len(), 2); let mut expected = ["123", "42"].into_iter().collect::>(); for collectible in list { assert!(expected.remove(collectible.to_string().await?.as_str())) } - assert_eq!(result.await?.0, 0); + assert_eq!(result_val.0, 0); anyhow::Ok(()) }) .await @@ -34,15 +34,15 @@ async fn transitive_emitting() { #[tokio::test] async fn transitive_emitting_indirect() { run(®ISTRATION, || async { - let result = my_transitive_emitting_function("".into(), "".into()); - let collectibles = my_transitive_emitting_function_collectibles("".into(), "".into()); - let list = collectibles.strongly_consistent().await?; + let result_op = my_transitive_emitting_function("".into(), "".into()); + let collectibles_op = my_transitive_emitting_function_collectibles("".into(), "".into()); + let list = collectibles_op.connect().strongly_consistent().await?; assert_eq!(list.len(), 2); let mut expected = ["123", "42"].into_iter().collect::>(); for collectible in list.iter() { assert!(expected.remove(collectible.to_string().await?.as_str())) } - assert_eq!(result.await?.0, 0); + assert_eq!(result_op.connect().await?.0, 0); anyhow::Ok(()) }) .await @@ -52,15 +52,15 @@ async fn transitive_emitting_indirect() { #[tokio::test] async fn multi_emitting() { run(®ISTRATION, || async { - let result = my_multi_emitting_function(); - result.strongly_consistent().await?; - let list = result.peek_collectibles::>(); + let result_op = my_multi_emitting_function(); + let result_val = result_op.connect().strongly_consistent().await?; + let list = result_op.peek_collectibles::>(); assert_eq!(list.len(), 2); let mut expected = ["123", "42"].into_iter().collect::>(); for collectible in list { assert!(expected.remove(collectible.to_string().await?.as_str())) } - assert_eq!(result.await?.0, 0); + assert_eq!(result_val.0, 0); anyhow::Ok(()) }) .await @@ -70,12 +70,13 @@ async fn multi_emitting() { #[tokio::test] async fn taking_collectibles() { run(®ISTRATION, || async { - let result = my_collecting_function(); - let list = result.take_collectibles::>(); + let result_op = my_collecting_function(); + let result_val = result_op.connect().strongly_consistent().await?; + let list = result_op.take_collectibles::>(); // my_collecting_function already processed the collectibles so the list should // be empty assert!(list.is_empty()); - assert_eq!(result.await?.0, 0); + assert_eq!(result_val.0, 0); anyhow::Ok(()) }) .await @@ -85,13 +86,13 @@ async fn taking_collectibles() { #[tokio::test] async fn taking_collectibles_extra_layer() { run(®ISTRATION, || async { - let result = my_collecting_function_indirect(); - result.strongly_consistent().await?; - let list = result.take_collectibles::>(); + let result_op = my_collecting_function_indirect(); + let result_val = result_op.connect().strongly_consistent().await?; + let list = result_op.take_collectibles::>(); // my_collecting_function already processed the collectibles so the list should // be empty assert!(list.is_empty()); - assert_eq!(result.await?.0, 0); + assert_eq!(result_val.0, 0); anyhow::Ok(()) }) .await @@ -101,38 +102,38 @@ async fn taking_collectibles_extra_layer() { #[tokio::test] async fn taking_collectibles_parallel() { run(®ISTRATION, || async { - let result = my_transitive_emitting_function("".into(), "a".into()); - result.strongly_consistent().await?; - let list = result.take_collectibles::>(); + let result_op = my_transitive_emitting_function("".into(), "a".into()); + let result_val = result_op.connect().strongly_consistent().await?; + let list = result_op.take_collectibles::>(); assert_eq!(list.len(), 2); - assert_eq!(result.await?.0, 0); + assert_eq!(result_val.0, 0); - let result = my_transitive_emitting_function("".into(), "b".into()); - result.strongly_consistent().await?; - let list = result.take_collectibles::>(); + let result_op = my_transitive_emitting_function("".into(), "b".into()); + let result_val = result_op.connect().strongly_consistent().await?; + let list = result_op.take_collectibles::>(); assert_eq!(list.len(), 2); - assert_eq!(result.await?.0, 0); + assert_eq!(result_val.0, 0); - let result = + let result_op = my_transitive_emitting_function_with_child_scope("".into(), "b".into(), "1".into()); - result.strongly_consistent().await?; - let list = result.take_collectibles::>(); + let result_val = result_op.connect().strongly_consistent().await?; + let list = result_op.take_collectibles::>(); assert_eq!(list.len(), 2); - assert_eq!(result.await?.0, 0); + assert_eq!(result_val.0, 0); - let result = + let result_op = my_transitive_emitting_function_with_child_scope("".into(), "b".into(), "2".into()); - result.strongly_consistent().await?; - let list = result.take_collectibles::>(); + let result_val = result_op.connect().strongly_consistent().await?; + let list = result_op.take_collectibles::>(); assert_eq!(list.len(), 2); - assert_eq!(result.await?.0, 0); + assert_eq!(result_val.0, 0); - let result = + let result_op = my_transitive_emitting_function_with_child_scope("".into(), "c".into(), "3".into()); - result.strongly_consistent().await?; - let list = result.take_collectibles::>(); + let result_val = result_op.connect().strongly_consistent().await?; + let list = result_op.take_collectibles::>(); assert_eq!(list.len(), 2); - assert_eq!(result.await?.0, 0); + assert_eq!(result_val.0, 0); anyhow::Ok(()) }) @@ -143,46 +144,53 @@ async fn taking_collectibles_parallel() { #[turbo_tasks::value(transparent)] struct Collectibles(AutoSet>>); -#[turbo_tasks::function] +#[turbo_tasks::function(operation)] async fn my_collecting_function() -> Result> { - let result = my_transitive_emitting_function("".into(), "".into()); - result.take_collectibles::>(); - Ok(result) + let result_op = my_transitive_emitting_function("".into(), "".into()); + let result_vc = result_op.connect(); + result_vc.await?; + result_op.take_collectibles::>(); + Ok(result_vc) } -#[turbo_tasks::function] +#[turbo_tasks::function(operation)] async fn my_collecting_function_indirect() -> Result> { - let result = my_collecting_function(); - result.strongly_consistent().await?; - let list = result.peek_collectibles::>(); + let result_op = my_collecting_function(); + let result_vc = result_op.connect(); + result_vc.strongly_consistent().await?; + let list = result_op.peek_collectibles::>(); // my_collecting_function already processed the collectibles so the list should // be empty assert!(list.is_empty()); - Ok(result) + Ok(result_vc) } -#[turbo_tasks::function] +#[turbo_tasks::function(operation)] async fn my_multi_emitting_function() -> Result> { - my_transitive_emitting_function("".into(), "a".into()).await?; - my_transitive_emitting_function("".into(), "b".into()).await?; + my_transitive_emitting_function("".into(), "a".into()) + .connect() + .await?; + my_transitive_emitting_function("".into(), "b".into()) + .connect() + .await?; my_emitting_function("".into()).await?; Ok(Thing::cell(Thing(0))) } -#[turbo_tasks::function] +#[turbo_tasks::function(operation)] async fn my_transitive_emitting_function(key: RcStr, _key2: RcStr) -> Result> { my_emitting_function(key).await?; Ok(Thing::cell(Thing(0))) } -#[turbo_tasks::function] +#[turbo_tasks::function(operation)] async fn my_transitive_emitting_function_collectibles( key: RcStr, key2: RcStr, ) -> Result> { - let result = my_transitive_emitting_function(key, key2); + let result_op = my_transitive_emitting_function(key, key2); Ok(Vc::cell( - result + result_op .peek_collectibles::>() .into_iter() .map(|v| v.to_resolved()) @@ -193,17 +201,18 @@ async fn my_transitive_emitting_function_collectibles( )) } -#[turbo_tasks::function] +#[turbo_tasks::function(operation)] async fn my_transitive_emitting_function_with_child_scope( key: RcStr, key2: RcStr, _key3: RcStr, ) -> Result> { - let thing = my_transitive_emitting_function(key, key2); - thing.strongly_consistent().await?; - let list = thing.peek_collectibles::>(); + let thing_op = my_transitive_emitting_function(key, key2); + let thing_vc = thing_op.connect(); + thing_vc.await?; + let list = thing_op.peek_collectibles::>(); assert_eq!(list.len(), 2); - Ok(thing) + Ok(thing_vc) } #[turbo_tasks::function] diff --git a/turbopack/crates/turbo-tasks-testing/tests/dirty_in_progress.rs b/turbopack/crates/turbo-tasks-testing/tests/dirty_in_progress.rs index 6ea1877472aeb..9053abcde93fe 100644 --- a/turbopack/crates/turbo-tasks-testing/tests/dirty_in_progress.rs +++ b/turbopack/crates/turbo-tasks-testing/tests/dirty_in_progress.rs @@ -71,8 +71,8 @@ impl ValueToString for Collectible { } } -#[turbo_tasks::function] -async fn inner_compute(input: Vc) -> Result> { +#[turbo_tasks::function(operation)] +async fn inner_compute(input: ResolvedVc) -> Result> { println!("start inner_compute"); let value = *input.await?.state.get(); tokio::time::sleep(Duration::from_millis(200)).await; @@ -90,10 +90,10 @@ async fn inner_compute(input: Vc) -> Result> { } #[turbo_tasks::function] -async fn compute(input: Vc) -> Result> { +async fn compute(input: ResolvedVc) -> Result> { println!("start compute"); let operation = inner_compute(input); - let value = *operation.await?; + let value = *operation.connect().await?; let collectibles = operation.peek_collectibles::>(); if collectibles.len() > 1 { bail!("expected 0..1 collectible, found {}", collectibles.len()); diff --git a/turbopack/crates/turbo-tasks-testing/tests/recompute_collectibles.rs b/turbopack/crates/turbo-tasks-testing/tests/recompute_collectibles.rs index 7f05bdb856aee..215974944d3a8 100644 --- a/turbopack/crates/turbo-tasks-testing/tests/recompute_collectibles.rs +++ b/turbopack/crates/turbo-tasks-testing/tests/recompute_collectibles.rs @@ -57,9 +57,9 @@ impl ValueToString for Collectible { } } -#[turbo_tasks::function] -fn inner_compute(input: Vc) -> Vc { - inner_compute2(input, 1000) +#[turbo_tasks::function(operation)] +fn inner_compute(input: ResolvedVc) -> Vc { + inner_compute2(*input, 1000) } #[turbo_tasks::function] @@ -79,12 +79,12 @@ async fn inner_compute2(input: Vc, innerness: u32) -> Result, innerness: u32) -> Result> { +async fn compute(input: ResolvedVc, innerness: u32) -> Result> { if innerness > 0 { - return Ok(compute(input, innerness - 1)); + return Ok(compute(*input, innerness - 1)); } let operation = inner_compute(input); - let value = *operation.await?; + let value = *operation.connect().await?; let collectibles = operation.peek_collectibles::>(); if collectibles.len() != 1 { bail!("expected 1 collectible, found {}", collectibles.len()); diff --git a/turbopack/crates/turbo-tasks/src/vc/mod.rs b/turbopack/crates/turbo-tasks/src/vc/mod.rs index b36dcc3bdce17..81729ea3397f8 100644 --- a/turbopack/crates/turbo-tasks/src/vc/mod.rs +++ b/turbopack/crates/turbo-tasks/src/vc/mod.rs @@ -17,7 +17,6 @@ use std::{ }; use anyhow::Result; -use auto_hash_map::AutoSet; use serde::{Deserialize, Serialize}; pub use self::{ @@ -35,7 +34,7 @@ use crate::{ manager::{create_local_cell, try_get_function_meta}, registry, trace::{TraceRawVcs, TraceRawVcsContext}, - CellId, CollectiblesSource, RawVc, ResolveTypeError, SharedReference, ShrinkToFit, + CellId, RawVc, ResolveTypeError, SharedReference, ShrinkToFit, }; /// A "Value Cell" (`Vc` for short) is a reference to a memoized computation result stored on the @@ -425,11 +424,6 @@ impl Vc where T: ?Sized, { - /// Connects the operation pointed to by this `Vc` to the current task. - pub fn connect(vc: Self) { - vc.node.connect() - } - /// Returns a debug identifier for this `Vc`. pub async fn debug_identifier(vc: Self) -> Result { let resolved = vc.resolve().await?; @@ -587,19 +581,6 @@ where } } -impl CollectiblesSource for Vc -where - T: ?Sized, -{ - fn take_collectibles(self) -> AutoSet> { - self.node.take_collectibles() - } - - fn peek_collectibles(self) -> AutoSet> { - self.node.peek_collectibles() - } -} - impl From for Vc where T: ?Sized, diff --git a/turbopack/crates/turbopack-tests/tests/execution.rs b/turbopack/crates/turbopack-tests/tests/execution.rs index 7f5558a2f7322..d954007e2cbc6 100644 --- a/turbopack/crates/turbopack-tests/tests/execution.rs +++ b/turbopack/crates/turbopack-tests/tests/execution.rs @@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize}; use turbo_rcstr::RcStr; use turbo_tasks::{ apply_effects, debug::ValueDebugFormat, fxindexmap, trace::TraceRawVcs, Completion, - NonLocalValue, ResolvedVc, TryJoinIterExt, TurboTasks, Value, Vc, + NonLocalValue, OperationVc, ResolvedVc, TryJoinIterExt, TurboTasks, Value, Vc, }; use turbo_tasks_bytes::stream::SingleValue; use turbo_tasks_env::CommandLineProcessEnv; @@ -170,27 +170,28 @@ async fn run(resource: PathBuf, snapshot_mode: IssueSnapshotMode) -> Result, ) -> Result> { - let prepared_test = prepare_test(resource); - let run_result = run_test(prepared_test); + let prepared_test = prepare_test(resource).to_resolved().await?; + let run_result_op = run_test_operation(prepared_test); if *snapshot_mode == IssueSnapshotMode::Snapshots { - snapshot_issues(prepared_test, run_result).await?; + snapshot_issues(*prepared_test, run_result_op).await?; } - Ok(*run_result.await?.js_result) + Ok(*run_result_op.connect().await?.js_result) } #[derive( @@ -264,8 +265,8 @@ async fn prepare_test(resource: RcStr) -> Result> { .cell()) } -#[turbo_tasks::function] -async fn run_test(prepared_test: Vc) -> Result> { +#[turbo_tasks::function(operation)] +async fn run_test_operation(prepared_test: ResolvedVc) -> Result> { let PreparedTest { path, project_path, @@ -451,10 +452,10 @@ async fn run_test(prepared_test: Vc) -> Result> #[turbo_tasks::function] async fn snapshot_issues( prepared_test: Vc, - run_result: Vc, + run_result: OperationVc, ) -> Result> { let PreparedTest { path, .. } = *prepared_test.await?; - let _ = run_result.resolve_strongly_consistent().await; + let _ = run_result.connect().resolve_strongly_consistent().await; let captured_issues = run_result.peek_issues_with_path().await?; diff --git a/turbopack/crates/turbopack-tests/tests/snapshot.rs b/turbopack/crates/turbopack-tests/tests/snapshot.rs index 5848296216db2..fce3442c2b9ef 100644 --- a/turbopack/crates/turbopack-tests/tests/snapshot.rs +++ b/turbopack/crates/turbopack-tests/tests/snapshot.rs @@ -157,9 +157,9 @@ async fn run(resource: PathBuf) -> Result<()> { let tt = TurboTasks::new(MemoryBackend::default()); let task = tt.spawn_once_task(async move { - let emit = run_inner(resource.to_str().unwrap().into()); - emit.strongly_consistent().await?; - apply_effects(emit).await?; + let emit_op = run_inner_operation(resource.to_str().unwrap().into()); + emit_op.connect().strongly_consistent().await?; + apply_effects(emit_op).await?; Ok(Vc::<()>::default()) }); @@ -169,11 +169,12 @@ async fn run(resource: PathBuf) -> Result<()> { Ok(()) } -#[turbo_tasks::function] -async fn run_inner(resource: RcStr) -> Result<()> { - let out = run_test(resource); - let _ = out.resolve_strongly_consistent().await?; - let captured_issues = out.peek_issues_with_path().await?; +#[turbo_tasks::function(operation)] +async fn run_inner_operation(resource: RcStr) -> Result<()> { + let out_op = run_test_operation(resource); + let out_vc = out_op.connect(); + let _ = out_vc.resolve_strongly_consistent().await?; + let captured_issues = out_op.peek_issues_with_path().await?; let plain_issues = captured_issues .iter_with_shortest_path() @@ -181,15 +182,15 @@ async fn run_inner(resource: RcStr) -> Result<()> { .try_join() .await?; - snapshot_issues(plain_issues, out.join("issues".into()), &REPO_ROOT) + snapshot_issues(plain_issues, out_vc.join("issues".into()), &REPO_ROOT) .await .context("Unable to handle issues")?; Ok(()) } -#[turbo_tasks::function] -async fn run_test(resource: RcStr) -> Result> { +#[turbo_tasks::function(operation)] +async fn run_test_operation(resource: RcStr) -> Result> { let test_path = canonicalize(&resource)?; assert!(test_path.exists(), "{} does not exist", resource); assert!( diff --git a/turbopack/crates/turbopack/benches/node_file_trace.rs b/turbopack/crates/turbopack/benches/node_file_trace.rs index 36c9f9abb071a..efdc73c20a151 100644 --- a/turbopack/crates/turbopack/benches/node_file_trace.rs +++ b/turbopack/crates/turbopack/benches/node_file_trace.rs @@ -3,11 +3,11 @@ use std::{fs, path::PathBuf}; use criterion::{Bencher, BenchmarkId, Criterion}; use regex::Regex; use turbo_rcstr::RcStr; -use turbo_tasks::{apply_effects, ReadConsistency, TurboTasks, Value, Vc}; +use turbo_tasks::{apply_effects, ReadConsistency, ResolvedVc, TurboTasks, Value, Vc}; use turbo_tasks_fs::{DiskFileSystem, FileSystem, NullFileSystem}; use turbo_tasks_memory::MemoryBackend; use turbopack::{ - emit_with_completion, + emit_with_completion_operation, module_options::{EcmascriptOptionsContext, ModuleOptionsContext}, register, ModuleAssetContext, }; @@ -80,7 +80,7 @@ fn bench_emit(b: &mut Bencher, bench_input: &BenchInput) { let input_dir = input.parent().parent(); let output_fs: Vc = NullFileSystem.into(); - let output_dir = output_fs.root(); + let output_dir = output_fs.root().to_resolved().await?; let source = FileSource::new(input); let compile_time_info = CompileTimeInfo::builder( @@ -115,11 +115,14 @@ fn bench_emit(b: &mut Bencher, bench_input: &BenchInput) { let module = module_asset_context .process(Vc::upcast(source), Value::new(ReferenceType::Undefined)) .module(); - let rebased = RebasedAsset::new(Vc::upcast(module), input_dir, output_dir); + let rebased = RebasedAsset::new(Vc::upcast(module), input_dir, *output_dir) + .to_resolved() + .await?; - let emit = emit_with_completion(Vc::upcast(rebased), output_dir); - emit.strongly_consistent().await?; - apply_effects(emit).await?; + let emit_op = + emit_with_completion_operation(ResolvedVc::upcast(rebased), output_dir); + emit_op.connect().strongly_consistent().await?; + apply_effects(emit_op).await?; Ok::, _>(Default::default()) }); diff --git a/turbopack/crates/turbopack/src/lib.rs b/turbopack/crates/turbopack/src/lib.rs index 786c76d2affc2..bbd5af77b2ce2 100644 --- a/turbopack/crates/turbopack/src/lib.rs +++ b/turbopack/crates/turbopack/src/lib.rs @@ -904,6 +904,14 @@ pub fn emit_with_completion(asset: Vc>, output_dir: Vc>, + output_dir: ResolvedVc, +) -> Vc<()> { + emit_with_completion(*asset, *output_dir) +} + #[turbo_tasks::function] fn emit_assets_aggregated(asset: Vc>, output_dir: Vc) { let aggregated = aggregate(asset); diff --git a/turbopack/crates/turbopack/tests/node-file-trace.rs b/turbopack/crates/turbopack/tests/node-file-trace.rs index 22798d69a3871..499485a734dcd 100644 --- a/turbopack/crates/turbopack/tests/node-file-trace.rs +++ b/turbopack/crates/turbopack/tests/node-file-trace.rs @@ -36,7 +36,7 @@ use turbo_tasks::{ use turbo_tasks_fs::{DiskFileSystem, FileSystem, FileSystemPath}; use turbo_tasks_memory::MemoryBackend; use turbopack::{ - emit_with_completion, + emit_with_completion_operation, module_options::{CssOptionsContext, EcmascriptOptionsContext, ModuleOptionsContext}, register, ModuleAssetContext, }; @@ -420,7 +420,7 @@ fn node_file_trace( let original_output = exec_node(package_root, input); let output_fs = DiskFileSystem::new("output".into(), directory.clone(), vec![]); - let output_dir = output_fs.root(); + let output_dir = output_fs.root().to_resolved().await?; let source = FileSource::new(input); let module_asset_context = ModuleAssetContext::new( @@ -457,7 +457,7 @@ fn node_file_trace( let module = module_asset_context .process(Vc::upcast(source), Value::new(ReferenceType::Undefined)) .module(); - let rebased = RebasedAsset::new(Vc::upcast(module), *input_dir, output_dir) + let rebased = RebasedAsset::new(Vc::upcast(module), *input_dir, *output_dir) .to_resolved() .await?; @@ -466,9 +466,10 @@ fn node_file_trace( print_graph(ResolvedVc::upcast(rebased)).await?; - let emit = emit_with_completion(*ResolvedVc::upcast(rebased), output_dir); - emit.strongly_consistent().await?; - apply_effects(emit).await?; + let emit_op = + emit_with_completion_operation(ResolvedVc::upcast(rebased), output_dir); + emit_op.connect().strongly_consistent().await?; + apply_effects(emit_op).await?; #[cfg(not(feature = "bench_against_node_nft"))] {