diff --git a/.typos.toml b/.typos.toml index ace1d88f63a..82205b67f03 100644 --- a/.typos.toml +++ b/.typos.toml @@ -19,6 +19,7 @@ WeeChat = "WeeChat" [default.extend-words] # all of these are valid words, but should never appear in this repo bellow = "below" +stat = "state" sing = "sign" singed = "signed" singing = "signing" diff --git a/benchmarks/benches/room_bench.rs b/benchmarks/benches/room_bench.rs index d9a11227e22..cdd92b55736 100644 --- a/benchmarks/benches/room_bench.rs +++ b/benchmarks/benches/room_bench.rs @@ -188,10 +188,7 @@ pub fn load_pinned_events_benchmark(c: &mut Criterion) { .unwrap(); let timeline = TimelineBuilder::new(&room) - .with_focus(TimelineFocus::PinnedEvents { - max_events_to_load: 100, - max_concurrent_requests: 10, - }) + .with_focus(TimelineFocus::PinnedEvents) .build() .await .expect("Could not create timeline"); diff --git a/bindings/matrix-sdk-ffi/src/timeline/configuration.rs b/bindings/matrix-sdk-ffi/src/timeline/configuration.rs index 496432a9a68..c0ef06562b9 100644 --- a/bindings/matrix-sdk-ffi/src/timeline/configuration.rs +++ b/bindings/matrix-sdk-ffi/src/timeline/configuration.rs @@ -127,10 +127,7 @@ pub enum TimelineFocus { /// The thread root event ID to focus on. root_event_id: String, }, - PinnedEvents { - max_events_to_load: u16, - max_concurrent_requests: u16, - }, + PinnedEvents, } impl TryFrom for matrix_sdk_ui::timeline::TimelineFocus { @@ -160,9 +157,7 @@ impl TryFrom for matrix_sdk_ui::timeline::TimelineFocus { Ok(Self::Thread { root_event_id: parsed_root_event_id }) } - TimelineFocus::PinnedEvents { max_events_to_load, max_concurrent_requests } => { - Ok(Self::PinnedEvents { max_events_to_load, max_concurrent_requests }) - } + TimelineFocus::PinnedEvents => Ok(Self::PinnedEvents), } } } diff --git a/crates/matrix-sdk-common/src/linked_chunk/mod.rs b/crates/matrix-sdk-common/src/linked_chunk/mod.rs index be13ea4ada6..e13134e1c2c 100644 --- a/crates/matrix-sdk-common/src/linked_chunk/mod.rs +++ b/crates/matrix-sdk-common/src/linked_chunk/mod.rs @@ -113,8 +113,12 @@ pub use updates::*; /// An identifier for a linked chunk; borrowed variant. #[derive(Debug, Clone, Copy, PartialEq)] pub enum LinkedChunkId<'a> { + /// A room's unthreaded timeline. Room(&'a RoomId), + /// A room's thread. Thread(&'a RoomId, &'a EventId), + /// A room's list of pinned events. + PinnedEvents(&'a RoomId), } impl Display for LinkedChunkId<'_> { @@ -124,6 +128,9 @@ impl Display for LinkedChunkId<'_> { Self::Thread(room_id, thread_root) => { write!(f, "{room_id}:thread:{thread_root}") } + Self::PinnedEvents(room_id) => { + write!(f, "{room_id}:pinned") + } } } } @@ -133,6 +140,7 @@ impl LinkedChunkId<'_> { match self { LinkedChunkId::Room(room_id) => room_id.to_string(), LinkedChunkId::Thread(room_id, event_id) => format!("t:{room_id}:{event_id}"), + LinkedChunkId::PinnedEvents(room_id) => format!("pinned:{room_id}"), } } @@ -142,6 +150,9 @@ impl LinkedChunkId<'_> { LinkedChunkId::Thread(room_id, event_id) => { OwnedLinkedChunkId::Thread((*room_id).to_owned(), (*event_id).to_owned()) } + LinkedChunkId::PinnedEvents(room_id) => { + OwnedLinkedChunkId::PinnedEvents((*room_id).to_owned()) + } } } } @@ -156,11 +167,11 @@ impl PartialEq<&OwnedLinkedChunkId> for LinkedChunkId<'_> { fn eq(&self, other: &&OwnedLinkedChunkId) -> bool { match (self, other) { (LinkedChunkId::Room(a), OwnedLinkedChunkId::Room(b)) => *a == b, + (LinkedChunkId::PinnedEvents(a), OwnedLinkedChunkId::PinnedEvents(b)) => *a == b, (LinkedChunkId::Thread(r, ev), OwnedLinkedChunkId::Thread(r2, ev2)) => { r == r2 && ev == ev2 } - (LinkedChunkId::Room(..), OwnedLinkedChunkId::Thread(..)) - | (LinkedChunkId::Thread(..), OwnedLinkedChunkId::Room(..)) => false, + _ => false, } } } @@ -176,6 +187,7 @@ impl PartialEq> for OwnedLinkedChunkId { pub enum OwnedLinkedChunkId { Room(OwnedRoomId), Thread(OwnedRoomId, OwnedEventId), + PinnedEvents(OwnedRoomId), } impl Display for OwnedLinkedChunkId { @@ -191,13 +203,17 @@ impl OwnedLinkedChunkId { OwnedLinkedChunkId::Thread(room_id, event_id) => { LinkedChunkId::Thread(room_id.as_ref(), event_id.as_ref()) } + OwnedLinkedChunkId::PinnedEvents(room_id) => { + LinkedChunkId::PinnedEvents(room_id.as_ref()) + } } } pub fn room_id(&self) -> &RoomId { match self { - OwnedLinkedChunkId::Room(room_id) => room_id, - OwnedLinkedChunkId::Thread(room_id, ..) => room_id, + OwnedLinkedChunkId::Room(room_id) + | OwnedLinkedChunkId::Thread(room_id, ..) + | OwnedLinkedChunkId::PinnedEvents(room_id, ..) => room_id, } } } diff --git a/crates/matrix-sdk-ui/CHANGELOG.md b/crates/matrix-sdk-ui/CHANGELOG.md index e3a2245cc29..747b4f5cd45 100644 --- a/crates/matrix-sdk-ui/CHANGELOG.md +++ b/crates/matrix-sdk-ui/CHANGELOG.md @@ -256,7 +256,7 @@ All notable changes to this project will be documented in this file. - Don't consider rooms in the banned state to be non-left rooms. This bug was introduced due to the introduction of the banned state for rooms, and the - non-left room filter did not take the new room stat into account. + non-left room filter did not take the new room state into account. ([#4448](https://github.com/matrix-org/matrix-rust-sdk/pull/4448)) - Fix `EventTimelineItem::latest_edit_json()` when it is populated by a live diff --git a/crates/matrix-sdk-ui/src/timeline/builder.rs b/crates/matrix-sdk-ui/src/timeline/builder.rs index 3d0d77043e6..0a655e04fde 100644 --- a/crates/matrix-sdk-ui/src/timeline/builder.rs +++ b/crates/matrix-sdk-ui/src/timeline/builder.rs @@ -185,8 +185,15 @@ impl TimelineBuilder { let has_events = controller.init_focus(&focus, &room_event_cache).await?; - let pinned_events_join_handle = if matches!(focus, TimelineFocus::PinnedEvents { .. }) { - Some(spawn(pinned_events_task(room.pinned_event_ids_stream(), controller.clone()))) + let pinned_events_join_handle = if matches!(focus, TimelineFocus::PinnedEvents) { + let (_initial_events, pinned_events_recv) = + room_event_cache.subscribe_to_pinned_events().await?; + + Some(spawn(pinned_events_task( + room_event_cache.clone(), + controller.clone(), + pinned_events_recv, + ))) } else { None }; diff --git a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs index a02766ba4ea..8abd859fc40 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs @@ -75,9 +75,7 @@ use crate::{ MsgLikeContent, MsgLikeKind, Room, TimelineEventFilterFn, TimelineEventFocusThreadMode, algorithms::rfind_event_by_item_id, controller::decryption_retry_task::compute_redecryption_candidates, - date_dividers::DateDividerAdjuster, - event_item::TimelineItemHandle, - pinned_events_loader::{PinnedEventsLoader, PinnedEventsLoaderError}, + date_dividers::DateDividerAdjuster, event_item::TimelineItemHandle, }, unable_to_decrypt_hook::UtdHookManager, }; @@ -121,9 +119,7 @@ pub(in crate::timeline) enum TimelineFocusKind { root_event_id: OwnedEventId, }, - PinnedEvents { - loader: PinnedEventsLoader, - }, + PinnedEvents, } #[derive(Debug)] @@ -225,7 +221,7 @@ impl TimelineFocusKind

{ TimelineFocusKind::Event { paginator } => { paginator.get().is_some_and(|paginator| paginator.hide_threaded_events()) } - TimelineFocusKind::Thread { .. } | TimelineFocusKind::PinnedEvents { .. } => false, + TimelineFocusKind::Thread { .. } | TimelineFocusKind::PinnedEvents => false, } } @@ -241,7 +237,7 @@ impl TimelineFocusKind

{ TimelineFocusKind::Event { paginator, .. } => { paginator.get().and_then(|paginator| paginator.thread_root()) } - TimelineFocusKind::Live { .. } | TimelineFocusKind::PinnedEvents { .. } => None, + TimelineFocusKind::Live { .. } | TimelineFocusKind::PinnedEvents => None, TimelineFocusKind::Thread { root_event_id } => Some(root_event_id), } } @@ -404,15 +400,7 @@ impl TimelineController

{ TimelineFocusKind::Thread { root_event_id } } - TimelineFocus::PinnedEvents { max_events_to_load, max_concurrent_requests } => { - TimelineFocusKind::PinnedEvents { - loader: PinnedEventsLoader::new( - Arc::new(room_data_provider.clone()), - max_events_to_load as usize, - max_concurrent_requests as usize, - ), - } - } + TimelineFocus::PinnedEvents => TimelineFocusKind::PinnedEvents, }; let focus = Arc::new(focus); @@ -623,23 +611,14 @@ impl TimelineController

{ Ok(has_events) } - TimelineFocus::PinnedEvents { .. } => { - let TimelineFocusKind::PinnedEvents { loader } = &*self.focus else { - // NOTE: this is sync'd with code in the ctor. - unreachable!(); - }; + TimelineFocus::PinnedEvents => { + let (initial_events, _update_receiver) = + room_event_cache.subscribe_to_pinned_events().await?; - let Some(loaded_events) = - loader.load_events().await.map_err(Error::PinnedEventsError)? - else { - // There wasn't any events. - return Ok(false); - }; - - let has_events = !loaded_events.is_empty(); + let has_events = !initial_events.is_empty(); self.replace_with_initial_remote_events( - loaded_events, + initial_events, RemoteEventOrigin::Pagination, ) .await; @@ -680,16 +659,6 @@ impl TimelineController

{ } } - pub(crate) async fn reload_pinned_events( - &self, - ) -> Result>, PinnedEventsLoaderError> { - if let TimelineFocusKind::PinnedEvents { loader } = &*self.focus { - loader.load_events().await - } else { - Err(PinnedEventsLoaderError::TimelineFocusNotPinnedEvents) - } - } - /// Run a lazy backwards pagination (in live mode). /// /// It adjusts the `count` value of the `Skip` higher-order stream so that @@ -723,7 +692,7 @@ impl TimelineController

{ ) -> Result { let PaginationResult { events, hit_end_of_timeline } = match &*self.focus { TimelineFocusKind::Live { .. } - | TimelineFocusKind::PinnedEvents { .. } + | TimelineFocusKind::PinnedEvents | TimelineFocusKind::Thread { .. } => { return Err(PaginationError::NotSupported); } @@ -756,7 +725,7 @@ impl TimelineController

{ ) -> Result { let PaginationResult { events, hit_end_of_timeline } = match &*self.focus { TimelineFocusKind::Live { .. } - | TimelineFocusKind::PinnedEvents { .. } + | TimelineFocusKind::PinnedEvents | TimelineFocusKind::Thread { .. } => return Err(PaginationError::NotSupported), TimelineFocusKind::Event { paginator, .. } => paginator diff --git a/crates/matrix-sdk-ui/src/timeline/controller/state.rs b/crates/matrix-sdk-ui/src/timeline/controller/state.rs index 47838fb5a54..0ac07119610 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/state.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/state.rs @@ -170,7 +170,7 @@ impl TimelineState

{ TimelineFocusKind::Thread { root_event_id, .. } => { thread_root.as_ref().is_some_and(|r| r == root_event_id) } - TimelineFocusKind::Event { .. } | TimelineFocusKind::PinnedEvents { .. } => { + TimelineFocusKind::Event { .. } | TimelineFocusKind::PinnedEvents => { // Don't add new items to these timelines; aggregations are added independently // of the `should_add_new_items` value. false diff --git a/crates/matrix-sdk-ui/src/timeline/controller/state_transaction.rs b/crates/matrix-sdk-ui/src/timeline/controller/state_transaction.rs index c7bfef05438..a59d7f9d37e 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/state_transaction.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/state_transaction.rs @@ -480,9 +480,9 @@ impl<'a, P: RoomDataProvider> TimelineStateTransaction<'a, P> { } match &self.focus { - TimelineFocusKind::PinnedEvents { .. } => { - // Only add pinned events for the pinned events timeline. - room_data_provider.is_pinned_event(event.event_id()) + TimelineFocusKind::PinnedEvents => { + // The pinned events timeline only receives updates for, well, pinned events. + true } TimelineFocusKind::Event { paginator } => { diff --git a/crates/matrix-sdk-ui/src/timeline/error.rs b/crates/matrix-sdk-ui/src/timeline/error.rs index 2f40cd70880..36713959585 100644 --- a/crates/matrix-sdk-ui/src/timeline/error.rs +++ b/crates/matrix-sdk-ui/src/timeline/error.rs @@ -18,7 +18,7 @@ use matrix_sdk::{ }; use thiserror::Error; -use crate::timeline::{TimelineEventItemId, pinned_events_loader::PinnedEventsLoaderError}; +use crate::timeline::TimelineEventItemId; /// Errors specific to the timeline. #[derive(Error, Debug)] @@ -60,10 +60,6 @@ pub enum Error { #[error(transparent)] PaginationError(#[from] PaginationError), - /// An error happened during pagination. - #[error(transparent)] - PinnedEventsError(#[from] PinnedEventsLoaderError), - /// An error happened while operating the room's send queue. #[error(transparent)] SendQueueError(#[from] RoomSendQueueError), diff --git a/crates/matrix-sdk-ui/src/timeline/mod.rs b/crates/matrix-sdk-ui/src/timeline/mod.rs index d949bdc91bc..4de82b953ea 100644 --- a/crates/matrix-sdk-ui/src/timeline/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/mod.rs @@ -75,7 +75,6 @@ pub mod futures; mod item; mod latest_event; mod pagination; -mod pinned_events_loader; mod subscriber; mod tasks; #[cfg(test)] @@ -146,7 +145,7 @@ pub enum TimelineFocus { Thread { root_event_id: OwnedEventId }, /// Only show pinned events. - PinnedEvents { max_events_to_load: u16, max_concurrent_requests: u16 }, + PinnedEvents, } /// Options for controlling the behaviour of [`TimelineFocus::Event`] @@ -180,7 +179,7 @@ impl TimelineFocus { TimelineFocus::Live { .. } => "live".to_owned(), TimelineFocus::Event { target, .. } => format!("permalink:{target}"), TimelineFocus::Thread { root_event_id, .. } => format!("thread:{root_event_id}"), - TimelineFocus::PinnedEvents { .. } => "pinned-events".to_owned(), + TimelineFocus::PinnedEvents => "pinned-events".to_owned(), } } } diff --git a/crates/matrix-sdk-ui/src/timeline/pagination.rs b/crates/matrix-sdk-ui/src/timeline/pagination.rs index 7ab016f65ef..6c8bd0b9ab8 100644 --- a/crates/matrix-sdk-ui/src/timeline/pagination.rs +++ b/crates/matrix-sdk-ui/src/timeline/pagination.rs @@ -57,7 +57,7 @@ impl super::Timeline { .event_cache .paginate_thread_backwards(root_event_id.to_owned(), num_events) .await?), - TimelineFocusKind::PinnedEvents { .. } => Err(Error::PaginationError(NotSupported)), + TimelineFocusKind::PinnedEvents => Err(Error::PaginationError(NotSupported)), } } diff --git a/crates/matrix-sdk-ui/src/timeline/pinned_events_loader.rs b/crates/matrix-sdk-ui/src/timeline/pinned_events_loader.rs deleted file mode 100644 index 6a1bdc82fca..00000000000 --- a/crates/matrix-sdk-ui/src/timeline/pinned_events_loader.rs +++ /dev/null @@ -1,209 +0,0 @@ -// Copyright 2024 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::{fmt::Formatter, sync::Arc}; - -use futures_util::{StreamExt, stream}; -use matrix_sdk::{BoxFuture, Room, SendOutsideWasm, SyncOutsideWasm, config::RequestConfig}; -use matrix_sdk_base::deserialized_responses::TimelineEvent; -use ruma::{EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, events::relation::RelationType}; -use thiserror::Error; -use tokio::sync::Mutex; -use tracing::warn; - -/// Utility to load the pinned events in a room. -pub struct PinnedEventsLoader { - /// Backend to load pinned events. - room: Arc, - - /// A list of pinned event ids we've observed previously. - /// - /// Starts as an empty vector and is updated when the list of pinned events - /// is updated. - previous_pinned_event_ids: Mutex>, - - /// Maximum number of pinned events to load (either from network or the - /// cache). - max_events_to_load: usize, - - /// Number of requests to load pinned events that can run concurrently. This - /// is used to avoid overwhelming a home server with dozens or hundreds - /// of concurrent requests. - max_concurrent_requests: usize, -} - -impl PinnedEventsLoader { - /// Creates a new `PinnedEventsLoader` instance. - pub fn new( - room: Arc, - max_events_to_load: usize, - max_concurrent_requests: usize, - ) -> Self { - Self { - room, - max_events_to_load, - max_concurrent_requests, - previous_pinned_event_ids: Mutex::new(Vec::new()), - } - } - - /// Loads the pinned events in this room, using the cache first and then - /// requesting the event from the homeserver if it couldn't be found. - /// This method will perform as many concurrent requests for events as - /// `max_concurrent_requests` allows, to avoid overwhelming the server. - /// - /// Returns `None` if the list of pinned events hasn't changed since the - /// previous time we loaded them. May return an error if there was an - /// issue fetching the full events. - pub async fn load_events(&self) -> Result>, PinnedEventsLoaderError> { - let pinned_event_ids: Vec = self - .room - .pinned_event_ids() - .unwrap_or_default() - .into_iter() - .rev() - .take(self.max_events_to_load) - .rev() - .collect(); - - // Check if the list of pinned events has changed since the last time. - if pinned_event_ids == *self.previous_pinned_event_ids.lock().await { - return Ok(None); - } - - if pinned_event_ids.is_empty() { - *self.previous_pinned_event_ids.lock().await = Vec::new(); - return Ok(Some(Vec::new())); - } - - let request_config = Some(RequestConfig::default().retry_limit(3)); - - let mut loaded_events: Vec = - stream::iter(pinned_event_ids.clone().into_iter().map(|event_id| { - let provider = self.room.clone(); - let relations_filter = - Some(vec![RelationType::Annotation, RelationType::Replacement]); - async move { - match provider - .load_event_with_relations(&event_id, request_config, relations_filter) - .await - { - Ok((event, related_events)) => { - let mut events = vec![event]; - events.extend(related_events); - Some(events) - } - Err(err) => { - warn!("error when loading pinned event: {err}"); - None - } - } - } - })) - .buffer_unordered(self.max_concurrent_requests) - // Get only the `Some>` results - .flat_map(stream::iter) - // Flatten the `Vec`s into a single one containing all their items - .flat_map(stream::iter) - .collect() - .await; - - if loaded_events.is_empty() { - return Err(PinnedEventsLoaderError::TimelineReloadFailed); - } - - // Sort using chronological ordering (oldest -> newest) - loaded_events.sort_by_key(|item| { - item.raw() - .deserialize() - .map(|e| e.origin_server_ts()) - .unwrap_or_else(|_| MilliSecondsSinceUnixEpoch::now()) - }); - - // We've successfully loaded *some* pinned events, so we can update the list of - // previously seen pinned events. - *self.previous_pinned_event_ids.lock().await = pinned_event_ids; - - Ok(Some(loaded_events)) - } -} - -pub trait PinnedEventsRoom: SendOutsideWasm + SyncOutsideWasm { - /// Load a single room event using the cache or network and any events - /// related to it, if they are cached. - /// - /// You can control which types of related events are retrieved using - /// `related_event_filters`. A `None` value will retrieve any type of - /// related event. - fn load_event_with_relations<'a>( - &'a self, - event_id: &'a EventId, - request_config: Option, - related_event_filters: Option>, - ) -> BoxFuture<'a, Result<(TimelineEvent, Vec), matrix_sdk::Error>>; - - /// Get the pinned event ids for a room. - fn pinned_event_ids(&self) -> Option>; - - /// Checks whether an event id is pinned in this room. - /// - /// It avoids having to clone the whole list of event ids to check a single - /// value. - fn is_pinned_event(&self, event_id: &EventId) -> bool; -} - -impl PinnedEventsRoom for Room { - fn load_event_with_relations<'a>( - &'a self, - event_id: &'a EventId, - request_config: Option, - related_event_filters: Option>, - ) -> BoxFuture<'a, Result<(TimelineEvent, Vec), matrix_sdk::Error>> { - Box::pin(self.load_or_fetch_event_with_relations( - event_id, - related_event_filters, - request_config, - )) - } - - fn pinned_event_ids(&self) -> Option> { - self.clone_info().pinned_event_ids() - } - - fn is_pinned_event(&self, event_id: &EventId) -> bool { - self.clone_info().is_pinned_event(event_id) - } -} - -#[cfg(not(tarpaulin_include))] -impl std::fmt::Debug for PinnedEventsLoader { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("PinnedEventsLoader") - .field("max_events_to_load", &self.max_events_to_load) - .finish() - } -} - -/// Errors related to `PinnedEventsLoader` usage. -#[derive(Error, Debug)] -pub enum PinnedEventsLoaderError { - #[error("No event found for the given event id.")] - EventNotFound(OwnedEventId), - - #[error("Timeline focus is not pinned events.")] - TimelineFocusNotPinnedEvents, - - #[error("Could not load pinned events.")] - TimelineReloadFailed, -} diff --git a/crates/matrix-sdk-ui/src/timeline/tasks.rs b/crates/matrix-sdk-ui/src/timeline/tasks.rs index 5f1455ba2f4..cffb13be98e 100644 --- a/crates/matrix-sdk-ui/src/timeline/tasks.rs +++ b/crates/matrix-sdk-ui/src/timeline/tasks.rs @@ -16,8 +16,6 @@ use std::collections::BTreeSet; -use futures_core::Stream; -use futures_util::pin_mut; use matrix_sdk::{ event_cache::{ EventsOrigin, RoomEventCache, RoomEventCacheSubscriber, RoomEventCacheUpdate, @@ -27,7 +25,6 @@ use matrix_sdk::{ }; use ruma::OwnedEventId; use tokio::sync::broadcast::{Receiver, error::RecvError}; -use tokio_stream::StreamExt as _; use tracing::{error, instrument, trace, warn}; use crate::timeline::{TimelineController, TimelineFocus, event_item::RemoteEventOrigin}; @@ -40,32 +37,66 @@ use crate::timeline::{TimelineController, TimelineFocus, event_item::RemoteEvent room_id = %timeline_controller.room().room_id(), ) )] -pub(in crate::timeline) async fn pinned_events_task( - pinned_event_ids_stream: S, +pub(in crate::timeline) async fn pinned_events_task( + room_event_cache: RoomEventCache, timeline_controller: TimelineController, -) where - S: Stream>, -{ - pin_mut!(pinned_event_ids_stream); + mut pinned_events_recv: Receiver, +) { + loop { + trace!("Waiting for an event."); + + let update = match pinned_events_recv.recv().await { + Ok(up) => up, + Err(RecvError::Closed) => break, + Err(RecvError::Lagged(num_skipped)) => { + warn!(num_skipped, "Lagged behind event cache updates, resetting timeline"); - while pinned_event_ids_stream.next().await.is_some() { - trace!("received a pinned events update"); + // The updates might have lagged, but the room event cache might have + // events, so retrieve them and add them back again to the timeline, + // after clearing it. + let (initial_events, _) = match room_event_cache.subscribe_to_pinned_events().await + { + Ok(initial_events) => initial_events, + Err(err) => { + error!( + ?err, + "Failed to replace the initial remote events in the event cache" + ); + break; + } + }; - match timeline_controller.reload_pinned_events().await { - Ok(Some(events)) => { - trace!("successfully reloaded pinned events"); timeline_controller - .replace_with_initial_remote_events(events, RemoteEventOrigin::Pagination) + .replace_with_initial_remote_events(initial_events, RemoteEventOrigin::Cache) .await; + + continue; } + }; + + match update { + RoomEventCacheUpdate::UpdateTimelineEvents { diffs, origin } => { + trace!("Received new timeline events diffs"); + let origin = match origin { + EventsOrigin::Sync => RemoteEventOrigin::Sync, + EventsOrigin::Pagination => RemoteEventOrigin::Pagination, + EventsOrigin::Cache => RemoteEventOrigin::Cache, + }; + + let has_diffs = !diffs.is_empty(); + + timeline_controller.handle_remote_events_with_diffs(diffs, origin).await; - Ok(None) => { - // The list of pinned events hasn't changed since the previous - // time. + if has_diffs && matches!(origin, RemoteEventOrigin::Cache) { + timeline_controller.retry_event_decryption(None).await; + } } - Err(err) => { - warn!("Failed to reload pinned events: {err}"); + RoomEventCacheUpdate::MoveReadMarkerTo { .. } + | RoomEventCacheUpdate::AddEphemeralEvents { .. } + | RoomEventCacheUpdate::UpdateMembers { .. } => { + // Nothing to do; these shouldn't happen for a pinned event sub. + // TODO(bnjbvr): then use a different type :) } } } @@ -189,8 +220,9 @@ pub(in crate::timeline) async fn room_event_cache_updates_task( if matches!(timeline_focus, TimelineFocus::Live { .. }) { timeline_controller.handle_remote_events_with_diffs(diffs, origin).await; - } else { - // Only handle the remote aggregation for a non-live timeline. + } else if !matches!(timeline_focus, TimelineFocus::PinnedEvents) { + // Only handle the remote aggregation for a non-live timeline, that's not the + // pinned events one (since this one handles its own remote events diffs). timeline_controller.handle_remote_aggregations(diffs, origin).await; } diff --git a/crates/matrix-sdk-ui/src/timeline/tests/mod.rs b/crates/matrix-sdk-ui/src/timeline/tests/mod.rs index 657d545e06c..4932bfc0d3d 100644 --- a/crates/matrix-sdk-ui/src/timeline/tests/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/tests/mod.rs @@ -55,9 +55,7 @@ use super::{ TimelineItem, algorithms::rfind_event_by_item_id, controller::TimelineSettings, event_item::RemoteEventOrigin, traits::RoomDataProvider, }; -use crate::{ - timeline::pinned_events_loader::PinnedEventsRoom, unable_to_decrypt_hook::UtdHookManager, -}; +use crate::unable_to_decrypt_hook::UtdHookManager; mod basic; mod echo; @@ -301,25 +299,6 @@ impl PaginableThread for TestRoomDataProvider { } } -impl PinnedEventsRoom for TestRoomDataProvider { - fn load_event_with_relations<'a>( - &'a self, - _event_id: &'a EventId, - _request_config: Option, - _related_event_filters: Option>, - ) -> BoxFuture<'a, Result<(TimelineEvent, Vec), matrix_sdk::Error>> { - unimplemented!(); - } - - fn pinned_event_ids(&self) -> Option> { - unimplemented!(); - } - - fn is_pinned_event(&self, _event_id: &EventId) -> bool { - unimplemented!(); - } -} - impl RoomDataProvider for TestRoomDataProvider { fn own_user_id(&self) -> &UserId { self.own_user_id.as_deref().unwrap_or(&ALICE) @@ -403,4 +382,13 @@ impl RoomDataProvider for TestRoomDataProvider { async fn load_event<'a>(&'a self, _event_id: &'a EventId) -> matrix_sdk::Result { unimplemented!(); } + + fn load_event_with_relations<'a>( + &'a self, + _event_id: &'a EventId, + _request_config: Option, + _related_event_filters: Option>, + ) -> BoxFuture<'a, Result<(TimelineEvent, Vec), matrix_sdk::Error>> { + unimplemented!(); + } } diff --git a/crates/matrix-sdk-ui/src/timeline/traits.rs b/crates/matrix-sdk-ui/src/timeline/traits.rs index 677f94644c4..9130b7640dd 100644 --- a/crates/matrix-sdk-ui/src/timeline/traits.rs +++ b/crates/matrix-sdk-ui/src/timeline/traits.rs @@ -17,7 +17,8 @@ use std::future::Future; use eyeball::Subscriber; use indexmap::IndexMap; use matrix_sdk::{ - Result, Room, SendOutsideWasm, + BoxFuture, Result, Room, SendOutsideWasm, + config::RequestConfig, deserialized_responses::TimelineEvent, paginators::{PaginableRoom, thread::PaginableThread}, }; @@ -28,6 +29,7 @@ use ruma::{ AnyMessageLikeEventContent, fully_read::FullyReadEventContent, receipt::{Receipt, ReceiptThread, ReceiptType}, + relation::RelationType, }, room_version_rules::RoomVersionRules, }; @@ -36,7 +38,6 @@ use tracing::error; use super::{Profile, RedactError, TimelineBuilder}; use crate::timeline::{ self, Timeline, TimelineReadReceiptTracking, latest_event::LatestEventValue, - pinned_events_loader::PinnedEventsRoom, }; pub trait RoomExt { @@ -85,7 +86,7 @@ impl RoomExt for Room { } pub(super) trait RoomDataProvider: - Clone + PaginableRoom + PaginableThread + PinnedEventsRoom + 'static + Clone + PaginableRoom + PaginableThread + 'static { fn own_user_id(&self) -> &UserId; fn room_version_rules(&self) -> RoomVersionRules; @@ -137,6 +138,19 @@ pub(super) trait RoomDataProvider: &'a self, event_id: &'a EventId, ) -> impl Future> + SendOutsideWasm + 'a; + + /// Load a single room event using the cache or network and any events + /// related to it, if they are cached. + /// + /// You can control which types of related events are retrieved using + /// `related_event_filters`. A `None` value will retrieve any type of + /// related event. + fn load_event_with_relations<'a>( + &'a self, + event_id: &'a EventId, + request_config: Option, + related_event_filters: Option>, + ) -> BoxFuture<'a, Result<(TimelineEvent, Vec), matrix_sdk::Error>>; } impl RoomDataProvider for Room { @@ -245,4 +259,17 @@ impl RoomDataProvider for Room { async fn load_event<'a>(&'a self, event_id: &'a EventId) -> Result { self.load_or_fetch_event(event_id, None).await } + + fn load_event_with_relations<'a>( + &'a self, + event_id: &'a EventId, + request_config: Option, + related_event_filters: Option>, + ) -> BoxFuture<'a, Result<(TimelineEvent, Vec), matrix_sdk::Error>> { + Box::pin(self.load_or_fetch_event_with_relations( + event_id, + related_event_filters, + request_config, + )) + } } diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/mod.rs b/crates/matrix-sdk-ui/tests/integration/timeline/mod.rs index ceb4f849cd0..06f7f7daf4a 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/mod.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/mod.rs @@ -284,10 +284,7 @@ async fn test_timeline_is_threaded() { { // A pinned events timeline isn't threaded. let timeline = TimelineBuilder::new(&room) - .with_focus(TimelineFocus::PinnedEvents { - max_events_to_load: 0, - max_concurrent_requests: 10, - }) + .with_focus(TimelineFocus::PinnedEvents) .build() .await .unwrap(); diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/pinned_event.rs b/crates/matrix-sdk-ui/tests/integration/timeline/pinned_event.rs index d07a478b604..a2ab0050297 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/pinned_event.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/pinned_event.rs @@ -4,12 +4,13 @@ use assert_matches2::assert_let; use eyeball_im::VectorDiff; use futures_util::StreamExt as _; use matrix_sdk::{ - Client, Room, + Client, Room, assert_let_timeout, config::SyncSettings, test_utils::{ logged_in_client_with_server, mocks::{MatrixMockServer, RoomMessagesResponseTemplate, RoomRelationsResponseTemplate}, }, + timeout::timeout, }; use matrix_sdk_base::deserialized_responses::TimelineEvent; use matrix_sdk_common::executor::spawn; @@ -80,7 +81,7 @@ async fn test_new_pinned_events_are_not_added_on_sync() { .await .expect("Room should be synced"); let timeline = - TimelineBuilder::new(&room).with_focus(pinned_events_focus(100)).build().await.unwrap(); + TimelineBuilder::new(&room).with_focus(TimelineFocus::PinnedEvents).build().await.unwrap(); assert!( timeline.live_back_pagination_status().await.is_none(), @@ -88,7 +89,14 @@ async fn test_new_pinned_events_are_not_added_on_sync() { ); // Load timeline items - let (items, mut timeline_stream) = timeline.subscribe().await; + let (mut items, mut timeline_stream) = timeline.subscribe().await; + + if items.is_empty() { + assert_let_timeout!(Some(updates) = timeline_stream.next()); + for up in updates { + up.apply(&mut items); + } + } assert_eq!(items.len(), 1 + 1); // event item + a date divider assert!(items[0].is_date_divider()); @@ -150,7 +158,7 @@ async fn test_pinned_event_with_reaction() { .await .expect("Room should be synced"); let timeline = - TimelineBuilder::new(&room).with_focus(pinned_events_focus(100)).build().await.unwrap(); + TimelineBuilder::new(&room).with_focus(TimelineFocus::PinnedEvents).build().await.unwrap(); assert!( timeline.live_back_pagination_status().await.is_none(), @@ -158,7 +166,14 @@ async fn test_pinned_event_with_reaction() { ); // Load timeline items - let (items, mut timeline_stream) = timeline.subscribe().await; + let (mut items, mut timeline_stream) = timeline.subscribe().await; + + if items.is_empty() { + assert_let_timeout!(Some(updates) = timeline_stream.next()); + for up in updates { + up.apply(&mut items); + } + } // Verify that the timeline contains the pinned event and its reaction. assert_eq!(items.len(), 1 + 1); // event item + a date divider @@ -222,7 +237,7 @@ async fn test_pinned_event_with_paginated_reactions() { .await .expect("Room should be synced"); let timeline = - TimelineBuilder::new(&room).with_focus(pinned_events_focus(100)).build().await.unwrap(); + TimelineBuilder::new(&room).with_focus(TimelineFocus::PinnedEvents).build().await.unwrap(); assert!( timeline.live_back_pagination_status().await.is_none(), @@ -230,7 +245,14 @@ async fn test_pinned_event_with_paginated_reactions() { ); // Load timeline items - let (items, mut timeline_stream) = timeline.subscribe().await; + let (mut items, mut timeline_stream) = timeline.subscribe().await; + + if items.is_empty() { + assert_let_timeout!(Some(updates) = timeline_stream.next()); + for up in updates { + up.apply(&mut items); + } + } // Verify that the timeline contains the pinned event and its reactions. assert_eq!(items.len(), 1 + 1); // event item + a date divider @@ -299,14 +321,23 @@ async fn test_new_pinned_event_ids_reload_the_timeline() { .expect("Room should be synced"); let timeline = - TimelineBuilder::new(&room).with_focus(pinned_events_focus(100)).build().await.unwrap(); + TimelineBuilder::new(&room).with_focus(TimelineFocus::PinnedEvents).build().await.unwrap(); assert!( timeline.live_back_pagination_status().await.is_none(), "there should be no live back-pagination status for a focused timeline" ); - let (items, mut timeline_stream) = timeline.subscribe().await; + let (mut items, mut timeline_stream) = timeline.subscribe().await; + + // Wait for the event cache to handle initialization of the pinned events + // timeline. + if items.is_empty() { + assert_let_timeout!(Some(updates) = timeline_stream.next()); + for up in updates { + up.apply(&mut items); + } + } assert_eq!(items.len(), 1 + 1); // event item + a date divider assert!(items[0].is_date_divider()); @@ -321,7 +352,10 @@ async fn test_new_pinned_event_ids_reload_the_timeline() { .await .expect("Sync failed"); - assert_let!(Some(timeline_updates) = timeline_stream.next().await); + assert_let_timeout!( + Duration::from_millis(300), + Some(timeline_updates) = timeline_stream.next() + ); assert_eq!(timeline_updates.len(), 4); assert_let!(VectorDiff::Clear = &timeline_updates[0]); @@ -345,7 +379,7 @@ async fn test_new_pinned_event_ids_reload_the_timeline() { .await .expect("Sync failed"); - assert_let!(Some(timeline_updates) = timeline_stream.next().await); + assert_let_timeout!(Some(timeline_updates) = timeline_stream.next()); assert_eq!(timeline_updates.len(), 1); assert_let!(VectorDiff::Clear = &timeline_updates[0]); @@ -358,6 +392,8 @@ async fn test_max_events_to_load_is_honored() { let client = server.client_builder().build().await; let room_id = room_id!("!test:localhost"); + client.event_cache().config_mut().await.max_pinned_events_to_load = 1; + let f = EventFactory::new().room(room_id).sender(*BOB); let pinned_event = f .text_msg("in the end") @@ -374,11 +410,16 @@ async fn test_max_events_to_load_is_honored() { .await .expect("Sync failed"); - let ret = TimelineBuilder::new(&room).with_focus(pinned_events_focus(1)).build().await; + let timeline = + TimelineBuilder::new(&room).with_focus(TimelineFocus::PinnedEvents).build().await.unwrap(); // We're only taking the last event id, `$2`, and it's not available so the - // timeline fails to initialise. - assert!(ret.is_err()); + // timeline fails to fill itself with events. + let (items, mut stream) = timeline.subscribe().await; + + assert!(items.is_empty()); + sleep(Duration::from_millis(100)).await; + assert_pending!(stream); } #[async_test] @@ -422,16 +463,22 @@ async fn test_cached_events_are_kept_for_different_room_instances() { let (room_cache, _drop_handles) = room.event_cache().await.unwrap(); let timeline = - TimelineBuilder::new(&room).with_focus(pinned_events_focus(2)).build().await.unwrap(); + TimelineBuilder::new(&room).with_focus(TimelineFocus::PinnedEvents).build().await.unwrap(); assert!( timeline.live_back_pagination_status().await.is_none(), "there should be no live back-pagination status for a focused timeline" ); - let (items, mut timeline_stream) = timeline.subscribe().await; + let (mut items, mut timeline_stream) = timeline.subscribe().await; + + if items.is_empty() { + assert_let_timeout!(Some(updates) = timeline_stream.next()); + for up in updates { + up.apply(&mut items); + } + } - assert!(!items.is_empty()); // We just loaded some events assert_pending!(timeline_stream); assert!(room_cache.find_event(event_id!("$1")).await.unwrap().is_some()); @@ -451,7 +498,7 @@ async fn test_cached_events_are_kept_for_different_room_instances() { // And a new timeline one let timeline = - TimelineBuilder::new(&room).with_focus(pinned_events_focus(2)).build().await.unwrap(); + TimelineBuilder::new(&room).with_focus(TimelineFocus::PinnedEvents).build().await.unwrap(); let (items, _) = timeline.subscribe().await; assert!(!items.is_empty()); // These events came from the cache @@ -476,10 +523,17 @@ async fn test_pinned_timeline_with_pinned_event_ids_and_empty_result_fails() { .mock_and_sync(&client, &server) .await .expect("Sync failed"); - let ret = TimelineBuilder::new(&room).with_focus(pinned_events_focus(1)).build().await; - // The timeline couldn't load any events so it fails to initialise - assert!(ret.is_err()); + // While the timeline gets initialized, it will fail to be updated by the + // background pinned cache task, after one second. + let timeline = + TimelineBuilder::new(&room).with_focus(TimelineFocus::PinnedEvents).build().await.unwrap(); + + let (_, mut timeline_stream) = timeline.subscribe().await; + + let result = timeout(timeline_stream.next(), Duration::from_secs(1)).await; + + assert!(result.is_err()); } #[async_test] @@ -495,7 +549,7 @@ async fn test_pinned_timeline_with_no_pinned_event_ids_is_just_empty() { .await .expect("Sync failed"); let timeline = - TimelineBuilder::new(&room).with_focus(pinned_events_focus(1)).build().await.unwrap(); + TimelineBuilder::new(&room).with_focus(TimelineFocus::PinnedEvents).build().await.unwrap(); // The timeline couldn't load any events, but it expected none, so it just // returns an empty list @@ -523,7 +577,7 @@ async fn test_pinned_timeline_with_no_pinned_events_and_an_utd_on_sync_is_just_e mock_events_endpoint(&server, room_id, vec![utd_event]).await; let timeline = - TimelineBuilder::new(&room).with_focus(pinned_events_focus(1)).build().await.unwrap(); + TimelineBuilder::new(&room).with_focus(TimelineFocus::PinnedEvents).build().await.unwrap(); // The timeline couldn't load any events, but it expected none, so it just // returns an empty list @@ -546,7 +600,7 @@ async fn test_pinned_timeline_with_no_pinned_events_on_pagination_is_just_empty( .await .expect("Sync failed"); let pinned_timeline = - TimelineBuilder::new(&room).with_focus(pinned_events_focus(1)).build().await.unwrap(); + TimelineBuilder::new(&room).with_focus(TimelineFocus::PinnedEvents).build().await.unwrap(); // The timeline couldn't load any events, but it expected none, so it just // returns an empty list @@ -615,10 +669,18 @@ async fn test_pinned_timeline_with_pinned_utd_on_sync_contains_it() { .await; let timeline = - TimelineBuilder::new(&room).with_focus(pinned_events_focus(1)).build().await.unwrap(); + TimelineBuilder::new(&room).with_focus(TimelineFocus::PinnedEvents).build().await.unwrap(); // The timeline loaded with just a day divider and the pinned UTD - let (items, _) = timeline.subscribe().await; + let (mut items, mut stream) = timeline.subscribe().await; + + if items.is_empty() { + assert_let_timeout!(Some(updates) = stream.next()); + for up in updates { + up.apply(&mut items); + } + } + assert_eq!(items.len(), 2); let pinned_utd_event = items.last().unwrap().as_event().unwrap(); assert_eq!(pinned_utd_event.event_id().unwrap(), event_id); @@ -659,7 +721,7 @@ async fn test_edited_events_are_reflected_in_sync() { .await .expect("Sync failed"); let timeline = - TimelineBuilder::new(&room).with_focus(pinned_events_focus(100)).build().await.unwrap(); + TimelineBuilder::new(&room).with_focus(TimelineFocus::PinnedEvents).build().await.unwrap(); assert!( timeline.live_back_pagination_status().await.is_none(), @@ -667,7 +729,14 @@ async fn test_edited_events_are_reflected_in_sync() { ); // Load timeline items. - let (items, mut timeline_stream) = timeline.subscribe().await; + let (mut items, mut timeline_stream) = timeline.subscribe().await; + + if items.is_empty() { + assert_let_timeout!(Some(updates) = timeline_stream.next()); + for up in updates { + up.apply(&mut items); + } + } assert_eq!(items.len(), 1 + 1); // event item + a date divider assert!(items[0].is_date_divider()); @@ -694,7 +763,7 @@ async fn test_edited_events_are_reflected_in_sync() { .await .expect("Sync failed"); - assert_let!(Some(timeline_updates) = timeline_stream.next().await); + assert_let_timeout!(Some(timeline_updates) = timeline_stream.next()); assert_eq!(timeline_updates.len(), 1); // The edit does replace the original event. @@ -742,7 +811,7 @@ async fn test_redacted_events_are_reflected_in_sync() { .await .expect("Sync failed"); let timeline = - TimelineBuilder::new(&room).with_focus(pinned_events_focus(100)).build().await.unwrap(); + TimelineBuilder::new(&room).with_focus(TimelineFocus::PinnedEvents).build().await.unwrap(); assert!( timeline.live_back_pagination_status().await.is_none(), @@ -750,7 +819,14 @@ async fn test_redacted_events_are_reflected_in_sync() { ); // Load timeline items - let (items, mut timeline_stream) = timeline.subscribe().await; + let (mut items, mut timeline_stream) = timeline.subscribe().await; + + if items.is_empty() { + assert_let_timeout!(Some(updates) = timeline_stream.next()); + for up in updates { + up.apply(&mut items); + } + } assert_eq!(items.len(), 1 + 1); // event item + a date divider assert!(items[0].is_date_divider()); @@ -773,7 +849,7 @@ async fn test_redacted_events_are_reflected_in_sync() { .await .expect("Sync failed"); - assert_let!(Some(timeline_updates) = timeline_stream.next().await); + assert_let_timeout!(Some(timeline_updates) = timeline_stream.next()); assert_eq!(timeline_updates.len(), 1); // The redaction takes place. @@ -792,7 +868,11 @@ async fn test_ensure_max_concurrency_is_observed() { let pinned_event_ids: Vec = (0..100).map(|idx| format!("${idx}")).collect(); - let max_concurrent_requests: u16 = 10; + let max_concurrent_requests = 10; + + // Define the max concurrent requests allowed for the event cache. + client.event_cache().config_mut().await.max_pinned_events_concurrent_requests = + max_concurrent_requests; let joined_room_builder = JoinedRoomBuilder::new(&room_id) // Set up encryption @@ -840,7 +920,7 @@ async fn test_ensure_max_concurrency_is_observed() { // Start loading the pinned event timeline asynchronously. let handle = spawn({ - let timeline_builder = room.timeline_builder().with_focus(pinned_events_focus(100)); + let timeline_builder = room.timeline_builder().with_focus(TimelineFocus::PinnedEvents); async { let _ = timeline_builder.build().await; } @@ -961,7 +1041,3 @@ fn create_utd( .event_id(event_id) .into_raw_sync() } - -fn pinned_events_focus(max_events_to_load: u16) -> TimelineFocus { - TimelineFocus::PinnedEvents { max_events_to_load, max_concurrent_requests: 10 } -} diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 61b49d81e58..06149610f41 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -30,6 +30,7 @@ use std::{ collections::{BTreeMap, HashMap}, fmt, + ops::{Deref, DerefMut}, sync::{Arc, OnceLock, Weak}, }; @@ -129,6 +130,12 @@ pub enum EventCacheError { #[error(transparent)] LinkedChunkLoader(#[from] LazyLoaderError), + /// An error happened when trying to load pinned events; none of them could + /// be loaded, which would otherwise result in an empty pinned events + /// list, incorrectly. + #[error("Unable to load any of the pinned events.")] + UnableToLoadPinnedEvents, + /// An error happened when reading the metadata of a linked chunk, upon /// reload. #[error("the linked chunk metadata is invalid: {details}")] @@ -224,6 +231,7 @@ impl EventCache { Self { inner: Arc::new(EventCacheInner { client: weak_client, + config: RwLock::new(EventCacheConfig::default()), store: event_cache_store, multiple_room_updates_lock: Default::default(), by_room: Default::default(), @@ -241,6 +249,17 @@ impl EventCache { } } + /// Get a read-only handle to the global configuration of the + /// [`EventCache`]. + pub async fn config(&self) -> impl Deref + '_ { + self.inner.config.read().await + } + + /// Get a writable handle to the global configuration of the [`EventCache`]. + pub async fn config_mut(&self) -> impl DerefMut + '_ { + self.inner.config.write().await + } + /// Subscribes to updates that a thread subscription has been sent. /// /// For testing purposes only. @@ -819,11 +838,42 @@ impl EventCache { } } +/// Global configuration for the [`EventCache`], applied to every single room. +#[derive(Clone, Copy, Debug)] +pub struct EventCacheConfig { + /// Maximum number of concurrent /event requests when loading pinned events. + pub max_pinned_events_concurrent_requests: usize, + + /// Maximum number of pinned events to load, for any room. + pub max_pinned_events_to_load: usize, +} + +impl EventCacheConfig { + /// The default maximum number of pinned events to load. + const DEFAULT_MAX_EVENTS_TO_LOAD: usize = 128; + + /// The default maximum number of concurrent requests to perform when + /// loading the pinned events. + const DEFAULT_MAX_CONCURRENT_REQUESTS: usize = 8; +} + +impl Default for EventCacheConfig { + fn default() -> Self { + Self { + max_pinned_events_concurrent_requests: Self::DEFAULT_MAX_CONCURRENT_REQUESTS, + max_pinned_events_to_load: Self::DEFAULT_MAX_EVENTS_TO_LOAD, + } + } +} + struct EventCacheInner { /// A weak reference to the inner client, useful when trying to get a handle /// on the owning client. client: WeakClient, + /// Global configuration for the event cache. + config: RwLock, + /// Reference to the underlying store. store: EventCacheStoreLock, diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index 078f73c530a..2a59550612e 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -167,7 +167,8 @@ type OwnedSessionId = String; type EventIdAndUtd = (OwnedEventId, Raw); type EventIdAndEvent = (OwnedEventId, DecryptedRoomEvent); -type ResolvedUtd = (OwnedEventId, DecryptedRoomEvent, Option>); +pub(in crate::event_cache) type ResolvedUtd = + (OwnedEventId, DecryptedRoomEvent, Option>); /// The information sent across the channel to the long-running task requesting /// that the supplied set of sessions be retried. @@ -372,6 +373,12 @@ impl EventCache { events.iter().cloned().map(|(event_id, _, _)| event_id).collect(); let mut new_events = Vec::with_capacity(events.len()); + // Consider the pinned event linked chunk, if it's been initialized. + if let Some(pinned_cache) = state.pinned_event_cache() { + pinned_cache.replace_utds(&events).await?; + } + + // Consider the room linked chunk. for (event_id, decrypted, actions) in events { // The event isn't in the cache, nothing to replace. Realistically this can't // happen since we retrieved the list of events from the cache itself and diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index 6d6fa524fd0..e75e408ab8d 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -57,6 +57,7 @@ use crate::{ }; pub(super) mod events; +mod pinned_events; mod threads; pub use threads::ThreadEventCacheUpdate; @@ -235,6 +236,24 @@ impl RoomEventCache { Ok(state.subscribe_to_thread(thread_root)) } + /// Get the pinned event cache for this room. + /// + /// This is a persisted view over the pinned events of a room. The pinned + /// events will be initially loaded from a network request to fetch the + /// latest pinned events will be performed, to update it as needed. The + /// list of pinned events will also be kept up-to-date as new events are + /// pinned, and new related events show up from sync or backpagination. + /// + /// This requires that the room's event cache be initialized. + pub async fn subscribe_to_pinned_events( + &self, + ) -> Result<(Vec, Receiver)> { + let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?; + let mut state = self.inner.state.write().await?; + + state.subscribe_to_pinned_events(room).await + } + /// Paginate backwards in a thread, given its root event ID. /// /// Returns whether we've hit the start of the thread, in which case the @@ -664,7 +683,7 @@ mod private { use std::{ collections::{BTreeMap, HashMap, HashSet}, sync::{ - Arc, + Arc, OnceLock, atomic::{AtomicBool, AtomicUsize, Ordering}, }, }; @@ -714,6 +733,7 @@ mod private { events::EventLinkedChunk, sort_positions_descending, }; + use crate::{Room, event_cache::room::pinned_events::PinnedEventCache}; /// State for a single room's event cache. /// @@ -747,6 +767,9 @@ mod private { /// Keyed by the thread root event ID. threads: HashMap, + /// Cache for pinned events in this room, initialized on-demand. + pinned_event_cache: OnceLock, + pagination_status: SharedObservable, /// A clone of [`super::RoomEventCacheInner::update_sender`]. @@ -884,6 +907,7 @@ mod private { room_version_rules, waited_for_initial_prev_token, subscriber_count: Default::default(), + pinned_event_cache: OnceLock::new(), }), read_lock_acquisition: Mutex::new(()), }) @@ -1199,6 +1223,13 @@ mod private { &mut self.state.room_linked_chunk } + /// Get a reference to the [`pinned_event_cache`] if it has been + /// initialized. + #[cfg(any(feature = "e2e-encryption", test))] + pub fn pinned_event_cache(&self) -> Option<&PinnedEventCache> { + self.state.pinned_event_cache.get() + } + /// Get a reference to the `waited_for_initial_prev_token` atomic bool. pub fn waited_for_initial_prev_token(&self) -> &Arc { &self.state.waited_for_initial_prev_token @@ -1823,6 +1854,32 @@ mod private { self.get_or_reload_thread(root).subscribe() } + /// Subscribe to the lazily initialized pinned event cache for this + /// room. + /// + /// This is a persisted view over the pinned events of a room. The + /// pinned events will be initially loaded from a network + /// request to fetch the latest pinned events will be performed, + /// to update it as needed. The list of pinned events will also + /// be kept up-to-date as new events are pinned, and new related + /// events show up from sync or backpagination. + /// + /// This requires that the room's event cache be initialized. + pub async fn subscribe_to_pinned_events( + &mut self, + room: Room, + ) -> Result<(Vec, Receiver), EventCacheError> { + let pinned_event_cache = self.state.pinned_event_cache.get_or_init(|| { + PinnedEventCache::new( + room, + self.state.linked_chunk_update_sender.clone(), + self.state.store.clone(), + ) + }); + + pinned_event_cache.subscribe().await + } + /// Back paginate in the given thread. /// /// Will always start from the end, unless we previously paginated. @@ -1859,6 +1916,16 @@ mod private { // Update the store before doing the post-processing. self.propagate_changes().await?; + // Need an explicit re-borrow to avoid a deref vs deref-mut borrowck conflict + // below. + let state = &mut *self.state; + + if let Some(pinned_event_cache) = state.pinned_event_cache.get_mut() { + pinned_event_cache + .maybe_add_live_related_events(&events, &state.room_version_rules.redaction) + .await?; + } + let mut new_events_by_thread: BTreeMap<_, Vec<_>> = BTreeMap::new(); for event in events { diff --git a/crates/matrix-sdk/src/event_cache/room/pinned_events.rs b/crates/matrix-sdk/src/event_cache/room/pinned_events.rs new file mode 100644 index 00000000000..51928456c41 --- /dev/null +++ b/crates/matrix-sdk/src/event_cache/room/pinned_events.rs @@ -0,0 +1,820 @@ +// Copyright 2026 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{collections::BTreeSet, sync::Arc}; + +use futures_util::{StreamExt as _, stream}; +use matrix_sdk_base::{ + deserialized_responses::TimelineEventKind, + event_cache::{ + Event, Gap, + store::{EventCacheStoreLock, EventCacheStoreLockGuard, EventCacheStoreLockState}, + }, + linked_chunk::{LinkedChunkId, OwnedLinkedChunkId, Position, Update}, +}; +use ruma::{ + EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, + events::{ + AnySyncMessageLikeEvent, AnySyncTimelineEvent, MessageLikeEventType, relation::RelationType, + }, + room_version_rules::RedactionRules, + serde::Raw, +}; +use serde::Deserialize; +use tokio::sync::{ + Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard, + broadcast::{Receiver, Sender}, +}; +use tracing::{debug, instrument, trace, warn}; + +#[cfg(feature = "e2e-encryption")] +use crate::event_cache::redecryptor::ResolvedUtd; +use crate::{ + Room, + client::WeakClient, + config::RequestConfig, + event_cache::{ + EventCacheError, EventsOrigin, Result, RoomEventCacheLinkedChunkUpdate, + RoomEventCacheUpdate, room::events::EventLinkedChunk, + }, + executor::{JoinHandle, spawn}, + room::WeakRoom, +}; + +struct PinnedEventCacheState { + /// The ID of the room owning this list of pinned events. + room_id: OwnedRoomId, + + /// A sender for live events updates in this room's pinned events list. + sender: Sender, + + /// The linked chunk representing this room's pinned events. + /// + /// Does not contain related events by default. + chunk: EventLinkedChunk, + + /// Reference to the underlying backing store. + // TODO: can be removed? + store: EventCacheStoreLock, + + /// A sender for the globally observable linked chunk updates that happened + /// during a sync or a back-pagination. + /// + /// See also [`super::super::EventCacheInner::linked_chunk_update_sender`]. + linked_chunk_update_sender: Sender, +} + +#[cfg(not(tarpaulin_include))] +impl std::fmt::Debug for PinnedEventCacheState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PinnedEventCacheState") + .field("room_id", &self.room_id) + .field("chunk", &self.chunk) + .finish_non_exhaustive() + } +} + +struct PinnedEventCacheStateLock { + /// The per-thread lock around the real state. + locked_state: RwLock, + + /// A lock to guard against races when upgrading a read lock into a write + /// lock, after noticing the cross-process lock has been dirtied. + state_lock_upgrade_mutex: Mutex<()>, +} + +impl PinnedEventCacheStateLock { + /// Lock this [`PinnedEventCacheStateLock`] with per-thread shared access. + /// + /// This method locks the per-thread lock over the state, and then locks + /// the cross-process lock over the store. It returns an RAII guard + /// which will drop the read access to the state and to the store when + /// dropped. + /// + /// If the cross-process lock over the store is dirty (see + /// [`EventCacheStoreLockState`]), the state is reset to the last chunk. + async fn read(&self) -> Result> { + // Se comment in [`RoomEventCacheStateLock::read`] for explanation. + let _state_lock_upgrade_guard = self.state_lock_upgrade_mutex.lock().await; + + // Obtain a read lock. + let state_guard = self.locked_state.read().await; + + match state_guard.store.lock().await? { + EventCacheStoreLockState::Clean(store_guard) => { + Ok(PinnedEventCacheStateLockReadGuard { state: state_guard, _store: store_guard }) + } + + EventCacheStoreLockState::Dirty(store_guard) => { + // Drop the read lock, and take a write lock to modify the state. + // This is safe because only one reader at a time (see + // `Self::state_lock_upgrade_mutex`) is allowed. + drop(state_guard); + let state_guard = self.locked_state.write().await; + + let mut guard = + PinnedEventCacheStateLockWriteGuard { state: state_guard, store: store_guard }; + + // Force to reload by shrinking to the last chunk. + guard.reload_from_storage().await?; + + // All good now, mark the cross-process lock as non-dirty. + EventCacheStoreLockGuard::clear_dirty(&guard.store); + + // Downgrade the guard as soon as possible. + let guard = guard.downgrade(); + + Ok(guard) + } + } + } + + /// Lock this [`PinnedEventCacheStateLock`] with exclusive per-thread + /// write access. + /// + /// This method locks the per-thread lock over the state, and then locks + /// the cross-process lock over the store. It returns an RAII guard + /// which will drop the write access to the state and to the store when + /// dropped. + /// + /// If the cross-process lock over the store is dirty (see + /// [`EventCacheStoreLockState`]), the state is reset to the last chunk. + async fn write(&self) -> Result> { + let state_guard = self.locked_state.write().await; + + match state_guard.store.lock().await? { + EventCacheStoreLockState::Clean(store_guard) => { + Ok(PinnedEventCacheStateLockWriteGuard { state: state_guard, store: store_guard }) + } + + EventCacheStoreLockState::Dirty(store_guard) => { + let mut guard = + PinnedEventCacheStateLockWriteGuard { state: state_guard, store: store_guard }; + + // Reload the full pinned events list from the store. + guard.reload_from_storage().await?; + + // All good now, mark the cross-process lock as non-dirty. + EventCacheStoreLockGuard::clear_dirty(&guard.store); + + Ok(guard) + } + } + } +} + +#[cfg(not(tarpaulin_include))] +impl std::fmt::Debug for PinnedEventCacheStateLock { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PinnedEventCacheStateLock") + .field("locked_state", &self.locked_state) + .finish_non_exhaustive() + } +} + +/// The read lock guard returned by [`PinnedEventCacheStateLock::read`]. +pub struct PinnedEventCacheStateLockReadGuard<'a> { + /// The per-thread read lock guard over the + /// [`PinnedEventCacheState`]. + state: RwLockReadGuard<'a, PinnedEventCacheState>, + + /// The cross-process lock guard over the store. + _store: EventCacheStoreLockGuard, +} + +/// The write lock guard return by [`PinnedEventCacheStateLock::write`]. +struct PinnedEventCacheStateLockWriteGuard<'a> { + /// The per-thread write lock guard over the + /// [`PinnedEventCacheState`]. + state: RwLockWriteGuard<'a, PinnedEventCacheState>, + + /// The cross-process lock guard over the store. + store: EventCacheStoreLockGuard, +} + +impl<'a> PinnedEventCacheStateLockWriteGuard<'a> { + /// Synchronously downgrades a write lock into a read lock. + /// + /// The per-thread/state lock is downgraded atomically, without allowing + /// any writers to take exclusive access of the lock in the meantime. + /// + /// It returns an RAII guard which will drop the write access to the + /// state and to the store when dropped. + fn downgrade(self) -> PinnedEventCacheStateLockReadGuard<'a> { + PinnedEventCacheStateLockReadGuard { state: self.state.downgrade(), _store: self.store } + } + + /// Reload all the pinned events from storage, replacing the current linked + /// chunk. + async fn reload_from_storage(&mut self) -> Result<()> { + let room_id = self.state.room_id.clone(); + let linked_chunk_id = LinkedChunkId::PinnedEvents(&room_id); + + let (last_chunk, chunk_id_gen) = self.store.load_last_chunk(linked_chunk_id).await?; + + let Some(last_chunk) = last_chunk else { + // No pinned events stored, make sure the in-memory linked chunk is sync'd (i.e. + // empty), and return. + if self.state.chunk.events().next().is_some() { + self.state.chunk.reset(); + + let diffs = self.state.chunk.updates_as_vector_diffs(); + if !diffs.is_empty() { + let _ = self.state.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents { + diffs, + origin: EventsOrigin::Sync, + }); + } + } + + return Ok(()); + }; + + let mut previous = last_chunk.previous; + self.state.chunk.replace_with(Some(last_chunk), chunk_id_gen)?; + + // Reload the entire chunk. + while let Some(previous_chunk_id) = previous { + let prev = self.store.load_previous_chunk(linked_chunk_id, previous_chunk_id).await?; + if let Some(prev_chunk) = prev { + previous = prev_chunk.previous; + self.state.chunk.insert_new_chunk_as_first(prev_chunk)?; + } + } + + // Empty store updates, since we just reloaded from storage. + self.state.chunk.store_updates().take(); + + // Let observers know about it. + let diffs = self.state.chunk.updates_as_vector_diffs(); + if !diffs.is_empty() { + let _ = self.state.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents { + diffs, + origin: EventsOrigin::Sync, + }); + } + + Ok(()) + } + + async fn replace_all_events(&mut self, new_events: Vec) -> Result<()> { + trace!("resetting all pinned events in linked chunk"); + + let previous_pinned_event_ids = self.state.current_event_ids(); + + if new_events.iter().filter_map(|e| e.event_id()).collect::>() + == previous_pinned_event_ids.iter().cloned().collect() + { + // No change in the list of pinned events. + return Ok(()); + } + + if self.state.chunk.events().next().is_some() { + self.state.chunk.reset(); + } + self.state.chunk.push_live_events(None, &new_events); + + self.propagate_changes().await?; + + let diffs = self.state.chunk.updates_as_vector_diffs(); + if !diffs.is_empty() { + let _ = self.state.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents { + diffs, + origin: EventsOrigin::Sync, + }); + } + + Ok(()) + } + + /// Propagate the changes in this linked chunk to observers, and save the + /// changes on disk. + async fn propagate_changes(&mut self) -> Result<()> { + let updates = self.state.chunk.store_updates().take(); + self.send_updates_to_store(updates).await + } + + // NOTE: copy/paste + async fn send_updates_to_store(&mut self, mut updates: Vec>) -> Result<()> { + if updates.is_empty() { + return Ok(()); + } + + // Strip relations from updates which insert or replace items. + for update in updates.iter_mut() { + match update { + Update::PushItems { items, .. } => Self::strip_relations_from_events(items), + Update::ReplaceItem { item, .. } => Self::strip_relations_from_event(item), + // Other update kinds don't involve adding new events. + Update::NewItemsChunk { .. } + | Update::NewGapChunk { .. } + | Update::RemoveChunk(_) + | Update::RemoveItem { .. } + | Update::DetachLastItems { .. } + | Update::StartReattachItems + | Update::EndReattachItems + | Update::Clear => {} + } + } + + // Spawn a task to make sure that all the changes are effectively forwarded to + // the store, even if the call to this method gets aborted. + // + // The store cross-process locking involves an actual mutex, which ensures that + // storing updates happens in the expected order. + + let store = self.store.clone(); + let room_id = self.state.room_id.clone(); + let cloned_updates = updates.clone(); + + spawn(async move { + trace!(updates = ?cloned_updates, "sending linked chunk updates to the store"); + let linked_chunk_id = LinkedChunkId::PinnedEvents(&room_id); + + store.handle_linked_chunk_updates(linked_chunk_id, cloned_updates).await?; + trace!("linked chunk updates applied"); + + Result::Ok(()) + }) + .await + .expect("joining failed")?; + + // Forward that the store got updated to observers. + let _ = self.state.linked_chunk_update_sender.send(RoomEventCacheLinkedChunkUpdate { + linked_chunk_id: OwnedLinkedChunkId::PinnedEvents(self.state.room_id.clone()), + updates, + }); + + Ok(()) + } + + // NOTE: copy/paste + fn strip_relations_from_event(ev: &mut Event) { + match &mut ev.kind { + TimelineEventKind::Decrypted(decrypted) => { + // Remove all information about encryption info for + // the bundled events. + decrypted.unsigned_encryption_info = None; + + // Remove the `unsigned`/`m.relations` field, if needs be. + Self::strip_relations_if_present(&mut decrypted.event); + } + + TimelineEventKind::UnableToDecrypt { event, .. } + | TimelineEventKind::PlainText { event } => { + Self::strip_relations_if_present(event); + } + } + } + + /// Strips the bundled relations from a collection of events. + // NOTE: copy/paste + fn strip_relations_from_events(items: &mut [Event]) { + for ev in items.iter_mut() { + Self::strip_relations_from_event(ev); + } + } + + /// Removes the bundled relations from an event, if they were present. + /// + /// Only replaces the present if it contained bundled relations. + // NOTE: copy/paste + fn strip_relations_if_present(event: &mut Raw) { + // We're going to get rid of the `unsigned`/`m.relations` field, if it's + // present. + // Use a closure that returns an option so we can quickly short-circuit. + let mut closure = || -> Option<()> { + let mut val: serde_json::Value = event.deserialize_as().ok()?; + let unsigned = val.get_mut("unsigned")?; + let unsigned_obj = unsigned.as_object_mut()?; + if unsigned_obj.remove("m.relations").is_some() { + *event = Raw::new(&val).ok()?.cast_unchecked(); + } + None + }; + let _ = closure(); + } +} + +impl PinnedEventCacheState { + /// Return a list of the current event IDs in this linked chunk. + fn current_event_ids(&self) -> Vec { + self.chunk.events().filter_map(|(_position, event)| event.event_id()).collect() + } + + /// Find an event in the linked chunk by its event ID, and return its + /// location. + /// + /// Note: the in-memory content is always the same as the one in the store, + /// since the store is updated synchronously with changes in the linked + /// chunk, so we can afford to only look for the event in the memory + /// linked chunk. + fn find_event(&self, event_id: &EventId) -> Option<(Position, Event)> { + for (position, event) in self.chunk.revents() { + if event.event_id().as_deref() == Some(event_id) { + return Some((position, event.clone())); + } + } + + None + } +} + +/// All the information related to a room's pinned events cache. +pub struct PinnedEventCache { + state: Arc, + + /// The task handling the refreshing of pinned events for this specific + /// room. + // TODO(bnjbvr): use the background job handle for this, when + // available in `main`. + _task: Arc>, +} + +#[cfg(not(tarpaulin_include))] +impl std::fmt::Debug for PinnedEventCache { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PinnedEventCache").field("state", &self.state).finish_non_exhaustive() + } +} + +impl PinnedEventCache { + /// Creates a new [`PinnedEventCache`] for the given room. + pub(super) fn new( + room: Room, + linked_chunk_update_sender: Sender, + store: EventCacheStoreLock, + ) -> Self { + let sender = Sender::new(32); + + let room_id = room.room_id().to_owned(); + + let chunk = EventLinkedChunk::new(); + + let state = + PinnedEventCacheState { room_id, chunk, sender, linked_chunk_update_sender, store }; + let state = Arc::new(PinnedEventCacheStateLock { + locked_state: RwLock::new(state), + state_lock_upgrade_mutex: Mutex::new(()), + }); + + let task = Arc::new(spawn(Self::pinned_event_listener_task(room, state.clone()))); + + Self { state, _task: task } + } + + /// Subscribe to live events from this room's pinned events cache. + pub async fn subscribe(&self) -> Result<(Vec, Receiver)> { + let guard = self.state.read().await?; + let events = guard.state.chunk.events().map(|(_position, item)| item.clone()).collect(); + + let recv = guard.state.sender.subscribe(); + + Ok((events, recv)) + } + + /// Try to locate the events in the linked chunk corresponding to the given + /// list of decrypted events, and replace them, while alerting observers + /// about the update. + #[cfg(feature = "e2e-encryption")] + pub(in crate::event_cache) async fn replace_utds(&self, events: &[ResolvedUtd]) -> Result<()> { + let mut guard = self.state.write().await?; + + let pinned_events_set = + guard.state.current_event_ids().into_iter().collect::>(); + + let mut replaced_some = false; + + for (event_id, decrypted, actions) in events { + // As a performance optimization, do a lookup in the current pinned events + // check, before looking for the event in the linked chunk. + if !pinned_events_set.contains(event_id) { + continue; + } + + // The event should be in the linked chunk. + let Some((position, mut target_event)) = guard.state.find_event(event_id) else { + continue; + }; + + target_event.kind = TimelineEventKind::Decrypted(decrypted.clone()); + + if let Some(actions) = actions { + target_event.set_push_actions(actions.clone()); + } + + guard + .state + .chunk + .replace_event_at(position, target_event.clone()) + .expect("position should be valid"); + + replaced_some = true; + } + + if replaced_some { + guard.propagate_changes().await?; + + let diffs = guard.state.chunk.updates_as_vector_diffs(); + let _ = guard.state.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents { + diffs, + origin: EventsOrigin::Cache, + }); + } + + Ok(()) + } + + /// Given a raw event, try to extract the target event ID of a relation as + /// defined with `m.relates_to`. + fn extract_relation_target(raw: &Raw) -> Option { + #[derive(Deserialize)] + struct SimplifiedRelation { + event_id: OwnedEventId, + } + + #[derive(Deserialize)] + struct SimplifiedContent { + #[serde(rename = "m.relates_to")] + relates_to: Option, + } + + let SimplifiedContent { relates_to: Some(relation) } = + raw.get_field::("content").ok()?? + else { + return None; + }; + + Some(relation.event_id) + } + + /// Given a raw event, try to extract the target event ID of a live + /// redaction. + fn extract_redaction_target( + raw: &Raw, + room_redaction_rules: &RedactionRules, + ) -> Option { + // Try to find a redaction, but do not deserialize the entire event if we aren't + // certain it's a `m.room.redaction`. + if raw.get_field::("type").ok()?? + != MessageLikeEventType::RoomRedaction + { + return None; + } + + let AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(redaction)) = + raw.deserialize().ok()? + else { + return None; + }; + + redaction.redacts(room_redaction_rules).map(ToOwned::to_owned).or_else(|| { + warn!("missing target event id from the redaction event"); + None + }) + } + + /// Check if any of the given events relate to an event in the pinned events + /// linked chunk, and append it, in this case. + pub(super) async fn maybe_add_live_related_events( + &mut self, + events: &[Event], + room_redaction_rules: &RedactionRules, + ) -> Result<()> { + trace!("checking live events for relations to pinned events"); + let mut guard = self.state.write().await?; + + let pinned_event_ids: BTreeSet = + guard.state.current_event_ids().into_iter().collect(); + + if pinned_event_ids.is_empty() { + return Ok(()); + } + + let mut new_relations = Vec::new(); + + // For all events that relate to an event in the pinned events chunk, push this + // event to the linked chunk, and propagate changes to observers. + for ev in events { + // Try to find a regular relation in ev. + if let Some(relation_target) = Self::extract_relation_target(ev.raw()) + && pinned_event_ids.contains(&relation_target) + { + new_relations.push(ev.clone()); + continue; + } + + // Try to find a redaction in ev. + if let Some(redaction_target) = + Self::extract_redaction_target(ev.raw(), room_redaction_rules) + && pinned_event_ids.contains(&redaction_target) + { + new_relations.push(ev.clone()); + continue; + } + } + + if !new_relations.is_empty() { + trace!("found {} new related events to pinned events", new_relations.len()); + + // We've found new relations; append them to the linked chunk. + guard.state.chunk.push_live_events(None, &new_relations); + + guard.propagate_changes().await?; + + let diffs = guard.state.chunk.updates_as_vector_diffs(); + if !diffs.is_empty() { + let _ = guard.state.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents { + diffs, + origin: EventsOrigin::Sync, + }); + } + } + + Ok(()) + } + + #[instrument(fields(%room_id = room.room_id()), skip(room, state))] + async fn pinned_event_listener_task(room: Room, state: Arc) { + debug!("pinned events listener task started"); + + let reload_from_network = async |room: Room| { + let events = match Self::reload_pinned_events(room).await { + Ok(Some(events)) => events, + Ok(None) => Vec::new(), + Err(err) => { + warn!("error when loading pinned events: {err}"); + return; + } + }; + + // Replace the whole linked chunk with those new events, and propagate updates + // to the observers. + match state.write().await { + Ok(mut guard) => { + guard.replace_all_events(events).await.unwrap_or_else(|err| { + warn!("error when replacing pinned events: {err}"); + }); + } + + Err(err) => { + warn!("error when acquiring write lock to replace pinned events: {err}"); + } + } + }; + + // Reload the pinned events from the storage first. + match state.write().await { + Ok(mut guard) => { + // On startup, reload the pinned events from storage. + guard.reload_from_storage().await.unwrap_or_else(|err| { + warn!("error when reloading pinned events from storage, at start: {err}"); + }); + + // Compare the initial list of pinned events to the one in the linked chunk. + let actual_pinned_events = room.pinned_event_ids().unwrap_or_default(); + let reloaded_set = + guard.state.current_event_ids().into_iter().collect::>(); + + if actual_pinned_events.len() != reloaded_set.len() + || actual_pinned_events.iter().any(|event_id| !reloaded_set.contains(event_id)) + { + // Reload the list of pinned events from network. + drop(guard); + reload_from_network(room.clone()).await; + } + } + + Err(err) => { + warn!("error when acquiring write lock to initialize pinned events: {err}"); + } + } + + let weak_room = + WeakRoom::new(WeakClient::from_client(&room.client()), room.room_id().to_owned()); + + let mut stream = room.pinned_event_ids_stream(); + + drop(room); + + // Whenever the list of pinned events changes, reload it. + while let Some(new_list) = stream.next().await { + trace!("handling update"); + + let guard = match state.read().await { + Ok(guard) => guard, + Err(err) => { + warn!("error when acquiring read lock to handle pinned events update: {err}"); + break; + } + }; + + // Compare to the current linked chunk. + let current_set = guard.state.current_event_ids().into_iter().collect::>(); + + if !new_list.is_empty() + && new_list.iter().all(|event_id| current_set.contains(event_id)) + { + // All the events in the pinned list are the same, don't reload. + continue; + } + + let Some(room) = weak_room.get() else { + debug!("room has been dropped, ending pinned events listener task"); + break; + }; + + drop(guard); + + // Event IDs differ, so reload all the pinned events. + reload_from_network(room).await; + } + + debug!("pinned events listener task ended"); + } + + /// Loads the pinned events in this room, using the cache first and then + /// requesting the event from the homeserver if it couldn't be found. + /// This method will perform as many concurrent requests for events as + /// `max_concurrent_requests` allows, to avoid overwhelming the server. + /// + /// Returns `None` if the list of pinned events hasn't changed since the + /// previous time we loaded them. May return an error if there was an + /// issue fetching the full events. + async fn reload_pinned_events(room: Room) -> Result>> { + let (max_events_to_load, max_concurrent_requests) = { + let client = room.client(); + let config = client.event_cache().config().await; + (config.max_pinned_events_to_load, config.max_pinned_events_concurrent_requests) + }; + + let pinned_event_ids: Vec = room + .pinned_event_ids() + .unwrap_or_default() + .into_iter() + .rev() + .take(max_events_to_load) + .rev() + .collect(); + + if pinned_event_ids.is_empty() { + return Ok(Some(Vec::new())); + } + + let mut loaded_events: Vec = + stream::iter(pinned_event_ids.clone().into_iter().map(|event_id| { + let room = room.clone(); + let filter = vec![RelationType::Annotation, RelationType::Replacement]; + let request_config = RequestConfig::default().retry_limit(3); + + async move { + let (target, mut relations) = room + .load_or_fetch_event_with_relations( + &event_id, + Some(filter), + Some(request_config), + ) + .await?; + + relations.insert(0, target); + Ok::<_, crate::Error>(relations) + } + })) + .buffer_unordered(max_concurrent_requests) + // Flatten all the vectors. + .flat_map(stream::iter) + .flat_map(stream::iter) + .collect() + .await; + + if loaded_events.is_empty() { + // If the list of loaded events is empty, we ran into an error to load *all* the + // pinned events, which needs to be reported to the caller. + return Err(EventCacheError::UnableToLoadPinnedEvents); + } + + // Since we have all the events and their related events, we can't nicely sort + // them, since we've lost all ordering information from using /event or + // /relations. Resort to sorting using chronological ordering (oldest -> + // newest). + loaded_events.sort_by_key(|item| { + item.raw() + .deserialize() + .map(|e| e.origin_server_ts()) + .unwrap_or_else(|_| MilliSecondsSinceUnixEpoch::now()) + }); + + Ok(Some(loaded_events)) + } +} diff --git a/crates/matrix-sdk/tests/integration/room/pinned_events.rs b/crates/matrix-sdk/tests/integration/room/pinned_events.rs index f636be5c204..6e43ac7af99 100644 --- a/crates/matrix-sdk/tests/integration/room/pinned_events.rs +++ b/crates/matrix-sdk/tests/integration/room/pinned_events.rs @@ -1,9 +1,18 @@ -use std::ops::Not as _; - -use matrix_sdk::{Room, test_utils::mocks::MatrixMockServer}; +use std::{ops::Not as _, sync::Arc}; + +use matrix_sdk::{ + Room, + event_cache::RoomEventCacheUpdate, + linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update}, + store::StoreConfig, + test_utils::mocks::MatrixMockServer, + timeout::timeout, +}; +use matrix_sdk_base::event_cache::store::{EventCacheStore, MemoryStore}; use matrix_sdk_test::{JoinedRoomBuilder, StateTestEvent, async_test, event_factory::EventFactory}; use ruma::{EventId, event_id, owned_event_id, room_id, user_id}; use serde_json::json; +use tokio::time::Duration; use wiremock::{ Mock, ResponseTemplate, matchers::{header, method, path_regex}, @@ -120,3 +129,93 @@ async fn test_unpin_event_is_returning_an_error() { assert!(setup.room.unpin_event(setup.event_id).await.is_err()); } + +#[async_test] +async fn test_pinned_events_are_reloaded_from_storage() { + let room_id = room_id!("!galette:saucisse.bzh"); + let pinned_event_id = event_id!("$pinned_event"); + + let f = EventFactory::new().room(room_id).sender(user_id!("@alice:example.org")); + + // Create the pinned event. + let pinned_event = f.text_msg("I'm pinned!").event_id(pinned_event_id).into_event(); + + // Create an event cache store, pre-populated with the pinned event. + let event_cache_store = Arc::new(MemoryStore::new()); + event_cache_store + .handle_linked_chunk_updates( + LinkedChunkId::PinnedEvents(room_id), + vec![ + Update::NewItemsChunk { previous: None, new: ChunkIdentifier::new(0), next: None }, + Update::PushItems { + at: Position::new(ChunkIdentifier::new(0), 0), + items: vec![pinned_event.clone()], + }, + ], + ) + .await + .unwrap(); + + // Create a client with the pre-populated store. + let server = MatrixMockServer::new().await; + let client = server + .client_builder() + .on_builder(|builder| { + builder.store_config( + StoreConfig::new("test_store".to_owned()).event_cache_store(event_cache_store), + ) + }) + .build() + .await; + + // Subscribe the event cache to sync updates. + client.event_cache().subscribe().unwrap(); + + // Sync the room with the pinned event ID in the room state. + // + // This is important: the pinned events list must include our event ID, + // otherwise the initial reload from network will clear the storage-loaded + // events. + let pinned_events_state = StateTestEvent::Custom(json!({ + "content": { + "pinned": [pinned_event_id] + }, + "event_id": "$pinned_events_state", + "origin_server_ts": 151393755, + "sender": "@example:localhost", + "state_key": "", + "type": "m.room.pinned_events", + })); + + let room = server + .sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_event(pinned_events_state)) + .await; + + // Get the room event cache and subscribe to pinned events. + let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); + + // Subscribe to pinned events - this triggers PinnedEventCache::new() which + // spawns a task that calls reload_from_storage() first. + let (events, mut subscriber) = room_event_cache.subscribe_to_pinned_events().await.unwrap(); + let mut events = events.into(); + + // Wait for the background task to reload the events. + while let Ok(Ok(up)) = timeout(subscriber.recv(), Duration::from_millis(300)).await { + if let RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } = up { + for diff in diffs { + diff.apply(&mut events); + } + } + if !events.is_empty() { + break; + } + } + + // Verify the pinned event was reloaded from storage. + assert_eq!(events.len(), 1, "Expected pinned events to be loaded"); + assert_eq!( + events[0].event_id().unwrap(), + pinned_event_id, + "The pinned event should have been loaded" + ); +} diff --git a/testing/matrix-sdk-integration-testing/src/tests/timeline.rs b/testing/matrix-sdk-integration-testing/src/tests/timeline.rs index e8c88133c29..27c8829f613 100644 --- a/testing/matrix-sdk-integration-testing/src/tests/timeline.rs +++ b/testing/matrix-sdk-integration-testing/src/tests/timeline.rs @@ -1083,6 +1083,10 @@ async fn test_local_echo_to_send_event_has_encryption_info() -> TestResult { Ok(()) } +/// Setup a pinned events test. +/// +/// Let Alice create an encrypted room, send an event that is immediately +/// pinned, and a number of normal events as well. async fn prepare_room_with_pinned_events( alice: &Client, recovery_passphrase: &str, @@ -1163,9 +1167,9 @@ async fn test_pinned_events_are_decrypted_after_recovering_with_event_count( let sync_service = SyncService::builder(another_alice.clone()).build().await?; // We need to subscribe to the room, otherwise we won't request the - // `m.room.pinned_events` stat event. + // `m.room.pinned_events` state event. // - // Additionally if we subscribe to the room after we already synced, we'll won't + // Additionally if we subscribe to the room after we already synced, we won't // receive the event, likely due to a Synapse bug. sync_service.room_list_service().subscribe_to_rooms(&[&room_id]).await; sync_service.start().await; @@ -1188,20 +1192,16 @@ async fn test_pinned_events_are_decrypted_after_recovering_with_event_count( assert!(event.kind.is_utd()); // Alright, let's now get to the timeline with a PinnedEvents focus. - let pinned_timeline = room - .timeline_builder() - .with_focus(TimelineFocus::PinnedEvents { - max_events_to_load: 100, - max_concurrent_requests: 10, - }) - .build() - .await?; + let pinned_timeline = + room.timeline_builder().with_focus(TimelineFocus::PinnedEvents).build().await?; let (items, mut stream) = pinned_timeline.subscribe_filter_map(|i| i.as_event().cloned()).await; // If we don't have any items as of yet, wait on the stream. if items.is_empty() { - let _ = assert_next_with_timeout!(stream, 5000); + while let Ok(Some(_)) = timeout(Duration::from_secs(5), stream.next()).await { + // Wait until the timeline stabilizes. + } } // Alright, let's get the event from the timeline. @@ -1256,13 +1256,10 @@ async fn test_pinned_events_are_decrypted_after_recovering_with_event_in_timelin } /// Test that pinned UTD events, once decrypted by R2D2 (the redecryptor), get -/// replaced in the timeline with the decrypted variant even if the pinened UTD +/// replaced in the timeline with the decrypted variant even if the pinned UTD /// event isn't part of the main timeline and thus wasn't put into the event /// cache by the main timeline backpaginating. #[tokio::test(flavor = "multi_thread", worker_threads = 4)] -// FIXME: This test is ignored because R2D2 can't decrypt this pinned event as it's never put into -// the event cache. -#[ignore] async fn test_pinned_events_are_decrypted_after_recovering_with_event_not_in_timeline() -> TestResult { test_pinned_events_are_decrypted_after_recovering_with_event_count(30).await