From 48484183d3f6486c9125df2104f5b2b6004a6128 Mon Sep 17 00:00:00 2001 From: Myriad-Dreamin Date: Sat, 14 Dec 2024 14:39:36 +0800 Subject: [PATCH] dev: remember state of compilation --- crates/tinymist/src/actor/typ_client.rs | 63 ++-- crates/tinymist/src/actor/typ_server.rs | 382 ++++++++++++++++++------ 2 files changed, 335 insertions(+), 110 deletions(-) diff --git a/crates/tinymist/src/actor/typ_client.rs b/crates/tinymist/src/actor/typ_client.rs index 1cda14f69..42de4e426 100644 --- a/crates/tinymist/src/actor/typ_client.rs +++ b/crates/tinymist/src/actor/typ_client.rs @@ -179,6 +179,36 @@ impl CompileHandler { } false } + + fn notify_compile_inner(&self, snap: &CompiledArtifact) { + self.notify_diagnostics( + &snap.world, + snap.doc.clone().err().unwrap_or_default(), + snap.warnings.clone(), + ); + + self.export.signal(snap, snap.signal); + + self.editor_tx + .send(EditorRequest::Status( + self.diag_group.clone(), + if snap.doc.is_ok() { + TinymistCompileStatusEnum::CompileSuccess + } else { + TinymistCompileStatusEnum::CompileError + }, + )) + .unwrap(); + + #[cfg(feature = "preview")] + if let Some(inner) = self.inner.read().as_ref() { + let res = snap + .doc + .clone() + .map_err(|_| typst_preview::CompileStatus::CompileError); + inner.notify_compile(res, snap.signal.by_fs_events, snap.signal.by_entry_update); + } + } } impl CompilationHandle for CompileHandler { @@ -232,33 +262,16 @@ impl CompilationHandle for CompileHandler { *n_rev = snap.world.revision().get(); } - self.notify_diagnostics( - &snap.world, - snap.doc.clone().err().unwrap_or_default(), - snap.warnings.clone(), - ); - - self.export.signal(snap, snap.signal); - - self.editor_tx - .send(EditorRequest::Status( - self.diag_group.clone(), - if snap.doc.is_ok() { - TinymistCompileStatusEnum::CompileSuccess - } else { - TinymistCompileStatusEnum::CompileError - }, - )) - .unwrap(); + self.notify_compile_inner(snap); + } - #[cfg(feature = "preview")] - if let Some(inner) = self.inner.read().as_ref() { - let res = snap - .doc - .clone() - .map_err(|_| typst_preview::CompileStatus::CompileError); - inner.notify_compile(res, snap.signal.by_fs_events, snap.signal.by_entry_update); + fn restore_compile(&self, snap: &CompiledArtifact, _rep: CompileReport) { + { + let mut n_rev = self.notified_revision.lock(); + *n_rev = snap.world.revision().get(); } + + self.notify_compile_inner(snap); } } diff --git a/crates/tinymist/src/actor/typ_server.rs b/crates/tinymist/src/actor/typ_server.rs index 065e55c75..6070b0786 100644 --- a/crates/tinymist/src/actor/typ_server.rs +++ b/crates/tinymist/src/actor/typ_server.rs @@ -2,13 +2,13 @@ //! //! Please check `tinymist::actor::typ_client` for architecture details. +use parking_lot::Mutex; use std::{ collections::HashSet, ops::Deref, path::Path, sync::{Arc, OnceLock}, }; - use tokio::sync::{mpsc, oneshot}; use reflexo_typst::{ @@ -17,17 +17,14 @@ use reflexo_typst::{ vfs::notify::{FilesystemEvent, MemoryEvent, NotifyMessage, UpstreamUpdateEvent}, watch_deps, world::{CompilerFeat, CompilerUniverse, CompilerWorld}, - CompileEnv, CompileReport, Compiler, ConsoleDiagReporter, EntryReader, GenericExporter, - Revising, TaskInputs, TypstDocument, WorldDeps, + CompileEnv, CompileReport, Compiler, ConsoleDiagReporter, EntryReader, EntryState, + GenericExporter, LazyHash, Revising, TaskInputs, TypstDict, TypstDocument, WorldDeps, }; -use typst::diag::{SourceDiagnostic, SourceResult, Warned}; +use typst::diag::{SourceDiagnostic, SourceResult}; use typst_shim::utils::Deferred; use crate::task::CacheTask; -type CompileRawResult = Deferred<(SourceResult>>, CompileEnv)>; -type DocState = std::sync::OnceLock; - /// A signal that possibly triggers an export. /// /// Whether to export depends on the current state of the document and the user @@ -50,21 +47,47 @@ pub struct CompileSnapshot { /// Using world pub world: Arc>, /// Compiling the document. - doc_state: Arc, + compiled_doc: Arc>>>, + doc_state: Arc>>>, + /// Compiling the document. /// The last successfully compiled document. pub success_doc: Option>, } impl CompileSnapshot { - fn start(&self) -> &CompileRawResult { + fn start( + &self, + before: impl FnOnce(&Self) + Send + Sync + 'static, + after: impl FnOnce(&CompiledArtifact) + Send + Sync + 'static, + ) -> &Deferred> { self.doc_state.get_or_init(|| { - let w = self.world.clone(); - let mut env = self.env.clone(); + let this = self.clone(); Deferred::new(move || { - let w = w.as_ref(); + let mut this = this; + before(&this); + let w = this.world.as_ref(); let mut c = std::marker::PhantomData; - let res = c.compile(w, &mut env); - (res, env) + let doc = c.compile(w, &mut this.env); + + // let (doc, env) = self.start(f).wait().clone(); + let (doc, warnings) = match doc { + Ok(doc) => (Ok(doc.output), doc.warnings), + Err(err) => (Err(err), EcoVec::default()), + }; + let res = CompiledArtifact { + signal: this.flags, + world: this.world.clone(), + env: this.env, + doc, + warnings, + success_doc: this.success_doc.clone(), + }; + + log::info!("CompileSnapshot: compiled doc"); + this.compiled_doc.lock().clone_from(&Some(res.clone())); + after(&res); + + res }) }) } @@ -92,19 +115,15 @@ impl CompileSnapshot { } pub fn compile(&self) -> CompiledArtifact { - let (doc, env) = self.start().wait().clone(); - let (doc, warnings) = match doc { - Ok(doc) => (Ok(doc.output), doc.warnings), - Err(err) => (Err(err), EcoVec::default()), - }; - CompiledArtifact { - signal: self.flags, - world: self.world.clone(), - env, - doc, - warnings, - success_doc: self.success_doc.clone(), - } + self.start(|_| {}, |_| {}).wait().clone() + } + + pub fn compile_with( + &self, + before: impl FnOnce(&Self) + Send + Sync + 'static, + after: impl FnOnce(&CompiledArtifact) + Send + Sync + 'static, + ) -> CompiledArtifact { + self.start(before, after).wait().clone() } } @@ -115,6 +134,7 @@ impl Clone for CompileSnapshot { env: self.env.clone(), world: self.world.clone(), doc_state: self.doc_state.clone(), + compiled_doc: self.compiled_doc.clone(), success_doc: self.success_doc.clone(), } } @@ -161,6 +181,7 @@ impl CompiledArtifact { pub trait CompilationHandle: Send + Sync + 'static { fn status(&self, revision: usize, rep: CompileReport); fn notify_compile(&self, res: &CompiledArtifact, rep: CompileReport); + fn restore_compile(&self, res: &CompiledArtifact, rep: CompileReport); } impl CompilationHandle @@ -168,6 +189,7 @@ impl CompilationHandle { fn status(&self, _revision: usize, _: CompileReport) {} fn notify_compile(&self, _: &CompiledArtifact, _: CompileReport) {} + fn restore_compile(&self, _: &CompiledArtifact, _: CompileReport) {} } pub enum SucceededArtifact { @@ -229,12 +251,19 @@ struct CompileReasons { } impl CompileReasons { - fn see(&mut self, reason: CompileReasons) { + /// Merge two reasons. + fn merge(&mut self, reason: CompileReasons) { self.by_memory_events |= reason.by_memory_events; self.by_fs_events |= reason.by_fs_events; self.by_entry_update |= reason.by_entry_update; } + /// Whether the behind reason is "file changed". + fn file_changed(&self) -> bool { + self.by_memory_events || self.by_fs_events + } + + /// Whether we should compile for any reason. fn any(&self) -> bool { self.by_memory_events || self.by_fs_events || self.by_entry_update } @@ -289,6 +318,56 @@ impl Default for CompileServerOpts { } } +#[derive(Clone)] +struct CompileState { + /// The revision of the state. + compiled_at: usize, + /// The snapshot for watching mode. + watch_snap: OnceLock>, + /// The compiled document. + pub(crate) doc: Option>, + /// The successly compiled document. + success_doc: Option>, + /// The reason why the compiler is suspended. + suspended_reason: CompileReasons, +} + +impl Default for CompileState { + fn default() -> Self { + Self { + compiled_at: 0, + watch_snap: OnceLock::new(), + doc: None, + success_doc: None, + suspended_reason: CompileReasons::default(), + } + } +} + +#[derive(Debug, Clone, Default, PartialEq)] +struct InputState { + inputs: Arc>, + /// The used entry. + entry: EntryState, +} + +#[derive(Clone)] +struct CompileStateHistory { + /// The used input. + input: InputState, + /// The state. + state: CompileState, +} + +impl Default for CompileStateHistory { + fn default() -> Self { + Self { + input: Default::default(), + state: Default::default(), + } + } +} + /// The compiler actor. pub struct CompileServerActor { /// The underlying universe. @@ -302,13 +381,14 @@ pub struct CompileServerActor { logical_tick: usize, /// Last logical tick when invalidation is caused by shadow update. dirty_shadow_logical_tick: usize, - + /// The latest compilation state. + latest: CompileState, + /// The latest file changes at (revision). + latest_file_changes_at: usize, + /// The compilation state history. + history: Vec>, /// Estimated latest set of shadow files. estimated_shadow_files: HashSet>, - /// The latest compiled document. - pub(crate) latest_doc: Option>, - /// The latest successly compiled document. - latest_success_doc: Option>, /// feature set for compile_once mode. once_feature_set: Arc, /// Shared feature set for watch mode. @@ -321,10 +401,8 @@ pub struct CompileServerActor { /// Shared cache evict task. cache: CacheTask, - watch_snap: OnceLock>, suspended: bool, compiling: bool, - suspended_reason: CompileReasons, committed_revision: usize, } @@ -349,10 +427,11 @@ impl CompileServerActor { compile_handle, enable_watch: false, dirty_shadow_logical_tick: 0, + latest_file_changes_at: 0, estimated_shadow_files: Default::default(), - latest_doc: None, - latest_success_doc: None, + latest: CompileState::default(), + history: vec![], once_feature_set: Arc::new(feature_set.clone()), watch_feature_set: Arc::new( feature_set.configure(&WITH_COMPILING_STATUS_FEATURE, true), @@ -362,10 +441,8 @@ impl CompileServerActor { intr_rx, cache: cache_evict, - watch_snap: OnceLock::new(), suspended: entry.is_inactive(), compiling: false, - suspended_reason: no_reason(), committed_revision: 0, } } @@ -417,7 +494,7 @@ impl CompileServerActor { if let Interrupt::CurrentRead(event) = event { curr_reads.push(event); } else { - comp_reason.see(self.process(event, |res: CompilerResponse| match res { + comp_reason.merge(self.process(event, |res: CompilerResponse| match res { CompilerResponse::Notify(msg) => { log_send_error("compile_deps", dep_tx.send(msg)); } @@ -459,7 +536,8 @@ impl CompileServerActor { by_fs_events: reason.by_fs_events, }, doc_state: Arc::new(OnceLock::new()), - success_doc: self.latest_success_doc.clone(), + compiled_doc: Arc::new(Mutex::new(None)), + success_doc: self.latest.success_doc.clone(), } } @@ -476,16 +554,22 @@ impl CompileServerActor { curr_reads: &mut Vec>>, is_once: bool, ) -> Option> { - self.suspended_reason.see(reason); - let reason = std::mem::take(&mut self.suspended_reason); + self.latest.suspended_reason.merge(reason); + let reason = std::mem::take(&mut self.latest.suspended_reason); let start = reflexo::time::now(); - let compiling = self.snapshot(is_once, reason); - self.watch_snap = OnceLock::new(); - self.watch_snap.get_or_init(|| compiling.clone()); + let compiling = if is_once { + self.snapshot(true, reason) + } else { + if reason.any() { + self.latest.watch_snap = OnceLock::new(); + } + let compiling = self.snapshot(false, reason); + self.latest.watch_snap.get_or_init(|| compiling).clone() + }; if self.suspended { - self.suspended_reason.see(reason); + self.latest.suspended_reason.merge(reason); for reader in curr_reads.drain(..) { let _ = reader.send(SucceededArtifact::Suspend(compiling.clone())); @@ -494,7 +578,7 @@ impl CompileServerActor { } if self.compiling { - self.suspended_reason.see(reason); + self.latest.suspended_reason.merge(reason); return None; } @@ -503,32 +587,56 @@ impl CompileServerActor { let h = self.compile_handle.clone(); let curr_reads = std::mem::take(curr_reads); - // todo unwrap main id - let id = compiling.world.main_id().unwrap(); - let revision = compiling.world.revision().get(); + let compile = move || { + let handle = tokio::runtime::Handle::current(); + let h_before = h.clone(); + let compiled = compiling.compile_with( + move |compiling| { + // todo unwrap main id + h_before.status( + compiling.world.revision().get(), + CompileReport::Stage( + compiling.world.main_id().unwrap(), + "compiling", + start, + ), + ); + }, + move |compiled| { + // todo unwrap main id + let id = compiled.world.main_id().unwrap(); + + // Set the runtime handle if it is not set. + let _enter = if tokio::runtime::Handle::try_current().is_err() { + Some(handle.enter()) + } else { + None + }; - h.status(revision, CompileReport::Stage(id, "compiling", start)); + let elapsed = start.elapsed().unwrap_or_default(); - let compile = move || { - let compiled = compiling.compile(); + // log::trace!("CompileServerActor: compile reason: {:?}", compiled.signal); - for reader in curr_reads { - let _ = reader.send(SucceededArtifact::Compiled(compiled.clone())); - } + let rep = match &compiled.doc { + Ok(..) => { + CompileReport::CompileSuccess(id, compiled.warnings.clone(), elapsed) + } + Err(err) => CompileReport::CompileError(id, err.clone(), elapsed), + }; - let elapsed = start.elapsed().unwrap_or_default(); - let rep = match &compiled.doc { - Ok(..) => CompileReport::CompileSuccess(id, compiled.warnings.clone(), elapsed), - Err(err) => CompileReport::CompileError(id, err.clone(), elapsed), - }; + let _ = ConsoleDiagReporter::default().export( + compiled.world.deref(), + Arc::new((compiled.env.features.clone(), rep.clone())), + ); - let _ = ConsoleDiagReporter::default().export( - compiled.world.deref(), - Arc::new((compiled.env.features.clone(), rep.clone())), + // todo: we need to check revision for really concurrent compilation + h.notify_compile(compiled, rep); + }, ); - // todo: we need to check revision for really concurrent compilation - h.notify_compile(&compiled, rep); + for reader in curr_reads { + let _ = reader.send(SucceededArtifact::Compiled(compiled.clone())); + } compiled }; @@ -559,9 +667,10 @@ impl CompileServerActor { // Update state. self.committed_revision = compiled_revision; - self.latest_doc.clone_from(&doc); + self.latest.compiled_at = compiled_revision; + self.latest.doc.clone_from(&doc); if doc.is_some() { - self.latest_success_doc.clone_from(&self.latest_doc); + self.latest.success_doc.clone_from(&self.latest.doc); } // Notify the new file dependencies. @@ -581,7 +690,7 @@ impl CompileServerActor { fn process(&mut self, event: Interrupt, send: impl Fn(CompilerResponse)) -> CompileReasons { use CompilerResponse::*; - match event { + let reason = match event { Interrupt::Compile => { // Increment the revision anyway. self.verse.increment_revision(|_| {}); @@ -591,15 +700,19 @@ impl CompileServerActor { Interrupt::SnapshotRead(task) => { log::debug!("CompileServerActor: take snapshot"); if self + .latest .watch_snap .get() .is_some_and(|e| e.world.revision() < *self.verse.revision.read()) { - self.watch_snap = OnceLock::new(); + log::info!("CompileServerActor: watch snap is outdated"); + self.latest.watch_snap = OnceLock::new(); } let _ = task.send( - self.watch_snap + // todo: suspicious no reason + self.latest + .watch_snap .get_or_init(|| self.snapshot(false, no_reason())) .clone(), ); @@ -609,16 +722,25 @@ impl CompileServerActor { unreachable!() } Interrupt::ChangeTask(change) => { - self.verse.increment_revision(|verse| { + let prev_state = self.verse.increment_revision(|verse| { + let prev_inputs = verse.inputs().clone(); if let Some(inputs) = change.inputs { verse.set_inputs(inputs); } - if let Some(entry) = change.entry.clone() { - let res = verse.mutate_entry(entry); - if let Err(err) = res { - log::error!("CompileServerActor: change entry error: {err:?}"); + if let Some(entry) = change.entry.as_ref() { + match verse.mutate_entry(entry.clone()) { + Ok(entry) => Some(InputState { + inputs: prev_inputs, + entry, + }), + Err(err) => { + log::error!("CompileServerActor: change entry error: {err:?}"); + None + } } + } else { + None } }); @@ -631,13 +753,16 @@ impl CompileServerActor { .status(self.verse.revision.get_mut().get(), CompileReport::Suspend); } - // Reset the watch state and document state. - self.latest_doc = None; - self.latest_success_doc = None; - self.suspended_reason = no_reason(); + self.switch_state( + InputState { + inputs: self.verse.inputs().clone(), + entry, + }, + prev_state, + ) + } else { + reason_by_entry_change() } - - reason_by_entry_change() } Interrupt::Compiled(artifact) => { self.process_compile(artifact, send); @@ -703,13 +828,16 @@ impl CompileServerActor { reason } Interrupt::Settle(_) => unreachable!(), - } + }; + + self.check_reason_for_state(reason); + reason } /// Process reason after each compilation. fn process_lagged_compile(&mut self) -> CompileReasons { // The reason which is kept but not used. - std::mem::take(&mut self.suspended_reason) + std::mem::take(&mut self.latest.suspended_reason) } /// Apply delayed memory changes to underlying compiler. @@ -761,6 +889,90 @@ impl CompileServerActor { } } } + + fn check_reason_for_state(&mut self, reason: CompileReasons) { + if reason.file_changed() { + self.latest_file_changes_at = self.verse.revision.get_mut().get(); + } + } + + /// Switches the watch state and document state. + fn switch_state( + &mut self, + state: InputState, + prev_state: Option, + ) -> CompileReasons { + if Some(&state) == prev_state.as_ref() { + return no_reason(); + } + + let mut history = None; + self.history.retain_mut(|h| { + if h.input == state { + history = Some(std::mem::take(h)); + false + } else { + true + } + }); + + if let Some(prev_state) = prev_state { + self.history.push(CompileStateHistory { + input: prev_state, + state: std::mem::take(&mut self.latest), + }); + self.history + .sort_by(|a, b| a.state.compiled_at.cmp(&b.state.compiled_at)); + + // Only keeps the latest history, because we have race condition... + const NUM_OF_HISTORY: usize = 1; + if self.history.len() > NUM_OF_HISTORY { + self.history.drain(0..self.history.len() - NUM_OF_HISTORY); + } + } + + if let Some(history) = history { + if history.state.compiled_at >= self.latest_file_changes_at { + log::info!( + "CompileServerActor: restore state from history: {:?} {:?}", + history.input, + history.state.watch_snap.get().is_some() + ); + self.latest = history.state; + + // todo: race condition + if let Some(compiling) = self.latest.watch_snap.get() { + log::info!( + "CompileServerActor: check restore diag {}", + compiling.compiled_doc.lock().as_ref().is_some() + ); + if let Some(compiled) = compiling.compiled_doc.lock().as_ref() { + log::info!("CompileServerActor: restore diag"); + + let id = compiled.world.main_id().unwrap(); + let elapsed = Default::default(); + + let rep = match &compiled.doc { + Ok(..) => CompileReport::CompileSuccess( + id, + compiled.warnings.clone(), + elapsed, + ), + Err(err) => CompileReport::CompileError(id, err.clone(), elapsed), + }; + + self.compile_handle.restore_compile(compiled, rep); + + // Restore the reason. + return std::mem::take(&mut self.latest.suspended_reason); + } + } + } + } + + self.latest = CompileState::default(); + reason_by_entry_change() + } } #[inline]