From 10be3274d90ba96bf933b598040ed64a73de72dd Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Tue, 29 Oct 2024 17:59:41 +0100 Subject: [PATCH 1/6] chore(event cache store): Support multiple parent key types for dependent requests This makes it possible to have different kinds of *parent key*, to update a dependent request. A dependent request waits for the parent key to be set, before it can be acted upon; before, it could only be an event id, because a dependent request would only wait for an event to be sent. In a soon future, we're going to support uploading medias as requests, and some subsequent requests will depend on this, but won't be able to rely on an event id (since an upload doesn't return an event/event id). Since this changes the format of `DependentQueuedRequest`, which is directly serialized into the state stores, I've also cleared the table, to not have to migrate the data in there. Dependent requests are supposed to be transient anyways, so it would be a bug if they were transient. Since a migration was needed anyways, I've also removed the `rename` annotations (that supported a previous format) for the `DependentQueuedRequestKind` enum. --- .../src/store/integration_tests.rs | 18 ++++-- .../matrix-sdk-base/src/store/memory_store.rs | 8 +-- crates/matrix-sdk-base/src/store/mod.rs | 2 +- .../matrix-sdk-base/src/store/send_queue.rs | 26 +++++--- crates/matrix-sdk-base/src/store/traits.rs | 19 +++--- .../src/state_store/migrations.rs | 22 ++++++- .../src/state_store/mod.rs | 10 ++-- .../migrations/state_store/008_send_queue.sql | 7 +++ crates/matrix-sdk-sqlite/src/state_store.rs | 26 +++++--- crates/matrix-sdk/src/send_queue.rs | 59 ++++++++++++++----- 10 files changed, 143 insertions(+), 54 deletions(-) create mode 100644 crates/matrix-sdk-sqlite/migrations/state_store/008_send_queue.sql diff --git a/crates/matrix-sdk-base/src/store/integration_tests.rs b/crates/matrix-sdk-base/src/store/integration_tests.rs index d83eef9793e..a49f4d79492 100644 --- a/crates/matrix-sdk-base/src/store/integration_tests.rs +++ b/crates/matrix-sdk-base/src/store/integration_tests.rs @@ -33,7 +33,9 @@ use ruma::{ }; use serde_json::{json, value::Value as JsonValue}; -use super::{DependentQueuedRequestKind, DynStateStore, ServerCapabilities}; +use super::{ + send_queue::SentRequestKey, DependentQueuedRequestKind, DynStateStore, ServerCapabilities, +}; use crate::{ deserialized_responses::MemberEvent, store::{ChildTransactionId, QueueWedgeError, Result, SerializableEventContent, StateStoreExt}, @@ -1384,13 +1386,19 @@ impl StateStoreIntegrationTests for DynStateStore { assert_eq!(dependents.len(), 1); assert_eq!(dependents[0].parent_transaction_id, txn0); assert_eq!(dependents[0].own_transaction_id, child_txn); - assert!(dependents[0].event_id.is_none()); + assert!(dependents[0].parent_key.is_none()); assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent); // Update the event id. let event_id = owned_event_id!("$1"); - let num_updated = - self.update_dependent_queued_request(room_id, &txn0, event_id.clone()).await.unwrap(); + let num_updated = self + .update_dependent_queued_request( + room_id, + &txn0, + SentRequestKey::Event(event_id.clone()), + ) + .await + .unwrap(); assert_eq!(num_updated, 1); // It worked. @@ -1398,7 +1406,7 @@ impl StateStoreIntegrationTests for DynStateStore { assert_eq!(dependents.len(), 1); assert_eq!(dependents[0].parent_transaction_id, txn0); assert_eq!(dependents[0].own_transaction_id, child_txn); - assert_eq!(dependents[0].event_id.as_ref(), Some(&event_id)); + assert_eq!(dependents[0].parent_key.as_ref(), Some(&SentRequestKey::Event(event_id))); assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent); // Now remove it. diff --git a/crates/matrix-sdk-base/src/store/memory_store.rs b/crates/matrix-sdk-base/src/store/memory_store.rs index e55ec14caf3..aaeda6e3d9d 100644 --- a/crates/matrix-sdk-base/src/store/memory_store.rs +++ b/crates/matrix-sdk-base/src/store/memory_store.rs @@ -36,7 +36,7 @@ use ruma::{ use tracing::{debug, instrument, trace, warn}; use super::{ - send_queue::{ChildTransactionId, QueuedRequest, SerializableEventContent}, + send_queue::{ChildTransactionId, QueuedRequest, SentRequestKey, SerializableEventContent}, traits::{ComposerDraft, ServerCapabilities}, DependentQueuedRequest, DependentQueuedRequestKind, QueuedRequestKind, Result, RoomInfo, StateChanges, StateStore, StoreError, @@ -907,7 +907,7 @@ impl StateStore for MemoryStore { kind: content, parent_transaction_id: parent_transaction_id.to_owned(), own_transaction_id, - event_id: None, + parent_key: None, }, ); Ok(()) @@ -917,13 +917,13 @@ impl StateStore for MemoryStore { &self, room: &RoomId, parent_txn_id: &TransactionId, - event_id: OwnedEventId, + sent_parent_key: SentRequestKey, ) -> Result { let mut dependent_send_queue_events = self.dependent_send_queue_events.write().unwrap(); let dependents = dependent_send_queue_events.entry(room.to_owned()).or_default(); let mut num_updated = 0; for d in dependents.iter_mut().filter(|item| item.parent_transaction_id == parent_txn_id) { - d.event_id = Some(event_id.clone()); + d.parent_key = Some(sent_parent_key.clone()); num_updated += 1; } Ok(num_updated) diff --git a/crates/matrix-sdk-base/src/store/mod.rs b/crates/matrix-sdk-base/src/store/mod.rs index 392dd6e5bb8..cd4b7c67cd1 100644 --- a/crates/matrix-sdk-base/src/store/mod.rs +++ b/crates/matrix-sdk-base/src/store/mod.rs @@ -76,7 +76,7 @@ pub use self::{ memory_store::MemoryStore, send_queue::{ ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError, - QueuedRequest, QueuedRequestKind, SerializableEventContent, + QueuedRequest, QueuedRequestKind, SentRequestKey, SerializableEventContent, }, traits::{ ComposerDraft, ComposerDraftType, DynStateStore, IntoStateStore, ServerCapabilities, diff --git a/crates/matrix-sdk-base/src/store/send_queue.rs b/crates/matrix-sdk-base/src/store/send_queue.rs index 9a70c579641..6a695c62959 100644 --- a/crates/matrix-sdk-base/src/store/send_queue.rs +++ b/crates/matrix-sdk-base/src/store/send_queue.rs @@ -150,18 +150,15 @@ pub enum QueueWedgeError { #[derive(Clone, Debug, Serialize, Deserialize)] pub enum DependentQueuedRequestKind { /// The event should be edited. - #[serde(rename = "Edit")] EditEvent { /// The new event for the content. new_content: SerializableEventContent, }, /// The event should be redacted/aborted/removed. - #[serde(rename = "Redact")] RedactEvent, /// The event should be reacted to, with the given key. - #[serde(rename = "React")] ReactEvent { /// Key used for the reaction. key: String, @@ -207,6 +204,23 @@ impl From for OwnedTransactionId { } } +/// A unique key (identifier) indicating that a transaction has been +/// successfully sent to the server. +/// +/// The owning child transactions can now be resolved. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub enum SentRequestKey { + /// The parent transaction returned an event when it succeeded. + Event(OwnedEventId), +} + +impl SentRequestKey { + /// Converts the current parent key into an event id, if possible. + pub fn into_event_id(self) -> Option { + as_variant!(self, Self::Event) + } +} + /// A request to be sent, depending on a [`QueuedRequest`] to be sent first. /// /// Depending on whether the parent request has been sent or not, this will @@ -231,11 +245,7 @@ pub struct DependentQueuedRequest { /// If the parent request has been sent, the parent's request identifier /// returned by the server once the local echo has been sent out. - /// - /// Note: this is the event id used for the depended-on event after it's - /// been sent, not for a possible event that could have been sent - /// because of this [`DependentQueuedRequest`]. - pub event_id: Option, + pub parent_key: Option, } #[cfg(not(tarpaulin_include))] diff --git a/crates/matrix-sdk-base/src/store/traits.rs b/crates/matrix-sdk-base/src/store/traits.rs index e3ef3c9de59..e1bfd517432 100644 --- a/crates/matrix-sdk-base/src/store/traits.rs +++ b/crates/matrix-sdk-base/src/store/traits.rs @@ -41,8 +41,9 @@ use ruma::{ use serde::{Deserialize, Serialize}; use super::{ - ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError, - QueuedRequest, SerializableEventContent, StateChanges, StoreError, + send_queue::SentRequestKey, ChildTransactionId, DependentQueuedRequest, + DependentQueuedRequestKind, QueueWedgeError, QueuedRequest, SerializableEventContent, + StateChanges, StoreError, }; use crate::{ deserialized_responses::{RawAnySyncOrStrippedState, RawMemberEvent, RawSyncOrStrippedState}, @@ -416,15 +417,19 @@ pub trait StateStore: AsyncTraitDeps { content: DependentQueuedRequestKind, ) -> Result<(), Self::Error>; - /// Update a set of dependent send queue requests with an event id, - /// effectively marking them as ready. + /// Update a set of dependent send queue requests with a key identifying the + /// homeserver's response, effectively marking them as ready. + /// + /// ⚠ Beware! There's no verification applied that the parent key type is + /// compatible with the dependent event type. The invalid state may be + /// lazily filtered out in `load_dependent_queued_requests`. /// /// Returns the number of updated requests. async fn update_dependent_queued_request( &self, room_id: &RoomId, parent_txn_id: &TransactionId, - event_id: OwnedEventId, + sent_parent_key: SentRequestKey, ) -> Result; /// Remove a specific dependent send queue request by id. @@ -697,10 +702,10 @@ impl StateStore for EraseStateStoreError { &self, room_id: &RoomId, parent_txn_id: &TransactionId, - event_id: OwnedEventId, + sent_parent_key: SentRequestKey, ) -> Result { self.0 - .update_dependent_queued_request(room_id, parent_txn_id, event_id) + .update_dependent_queued_request(room_id, parent_txn_id, sent_parent_key) .await .map_err(Into::into) } diff --git a/crates/matrix-sdk-indexeddb/src/state_store/migrations.rs b/crates/matrix-sdk-indexeddb/src/state_store/migrations.rs index 98b55daea22..8e559cfea9d 100644 --- a/crates/matrix-sdk-indexeddb/src/state_store/migrations.rs +++ b/crates/matrix-sdk-indexeddb/src/state_store/migrations.rs @@ -46,7 +46,7 @@ use super::{ }; use crate::IndexeddbStateStoreError; -const CURRENT_DB_VERSION: u32 = 11; +const CURRENT_DB_VERSION: u32 = 12; const CURRENT_META_DB_VERSION: u32 = 2; /// Sometimes Migrations can't proceed without having to drop existing @@ -235,6 +235,9 @@ pub async fn upgrade_inner_db( if old_version < 11 { db = migrate_to_v11(db).await?; } + if old_version < 12 { + db = migrate_to_v12(db).await?; + } } db.close(); @@ -771,6 +774,23 @@ async fn migrate_to_v11(db: IdbDatabase) -> Result { apply_migration(db, 11, migration).await } +/// Drop entries from the [`keys::DEPENDENT_SEND_QUEUE`] table. +async fn migrate_to_v12(db: IdbDatabase) -> Result { + let tx = + db.transaction_on_one_with_mode(keys::DEPENDENT_SEND_QUEUE, IdbTransactionMode::Readwrite)?; + + let store = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?; + store.clear()?; + + tx.await.into_result()?; + + let name = db.name(); + db.close(); + + // Update the version of the database. + Ok(IdbDatabase::open_u32(&name, 12)?.await?) +} + #[cfg(all(test, target_arch = "wasm32"))] mod tests { wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); diff --git a/crates/matrix-sdk-indexeddb/src/state_store/mod.rs b/crates/matrix-sdk-indexeddb/src/state_store/mod.rs index 3dd7fab7a6b..f8ac92af8f0 100644 --- a/crates/matrix-sdk-indexeddb/src/state_store/mod.rs +++ b/crates/matrix-sdk-indexeddb/src/state_store/mod.rs @@ -26,8 +26,8 @@ use matrix_sdk_base::{ deserialized_responses::RawAnySyncOrStrippedState, store::{ ChildTransactionId, ComposerDraft, DependentQueuedRequest, DependentQueuedRequestKind, - QueuedRequest, QueuedRequestKind, SerializableEventContent, ServerCapabilities, - StateChanges, StateStore, StoreError, + QueuedRequest, QueuedRequestKind, SentRequestKey, SerializableEventContent, + ServerCapabilities, StateChanges, StateStore, StoreError, }, MinimalRoomMemberEvent, RoomInfo, RoomMemberships, StateStoreDataKey, StateStoreDataValue, }; @@ -1548,7 +1548,7 @@ impl_state_store!({ kind: content, parent_transaction_id: parent_txn_id.to_owned(), own_transaction_id: own_txn_id, - event_id: None, + parent_key: None, }); // Save the new vector into db. @@ -1563,7 +1563,7 @@ impl_state_store!({ &self, room_id: &RoomId, parent_txn_id: &TransactionId, - event_id: OwnedEventId, + parent_key: SentRequestKey, ) -> Result { let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id); @@ -1586,7 +1586,7 @@ impl_state_store!({ // Modify all requests that match. let mut num_updated = 0; for entry in prev.iter_mut().filter(|entry| entry.parent_transaction_id == parent_txn_id) { - entry.event_id = Some(event_id.clone()); + entry.parent_key = Some(parent_key.clone()); num_updated += 1; } diff --git a/crates/matrix-sdk-sqlite/migrations/state_store/008_send_queue.sql b/crates/matrix-sdk-sqlite/migrations/state_store/008_send_queue.sql new file mode 100644 index 00000000000..d1bec740d87 --- /dev/null +++ b/crates/matrix-sdk-sqlite/migrations/state_store/008_send_queue.sql @@ -0,0 +1,7 @@ +-- Delete all previous entries in the dependent send queue table, because the format changed. +DELETE FROM "dependent_send_queue_events"; + +-- Rename its "event_id" column to "parent_key", while we're at it. +ALTER TABLE "dependent_send_queue_events" + RENAME COLUMN "event_id" + TO "parent_key"; diff --git a/crates/matrix-sdk-sqlite/src/state_store.rs b/crates/matrix-sdk-sqlite/src/state_store.rs index 86f2b95d525..7c951d4ffdf 100644 --- a/crates/matrix-sdk-sqlite/src/state_store.rs +++ b/crates/matrix-sdk-sqlite/src/state_store.rs @@ -13,7 +13,7 @@ use matrix_sdk_base::{ store::{ migration_helpers::RoomInfoV1, ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError, QueuedRequest, QueuedRequestKind, - SerializableEventContent, + SentRequestKey, SerializableEventContent, }, MinimalRoomMemberEvent, RoomInfo, RoomMemberships, RoomState, StateChanges, StateStore, StateStoreDataKey, StateStoreDataValue, @@ -69,7 +69,7 @@ mod keys { /// This is used to figure whether the sqlite database requires a migration. /// Every new SQL migration should imply a bump of this number, and changes in /// the [`SqliteStateStore::run_migrations`] function.. -const DATABASE_VERSION: u8 = 8; +const DATABASE_VERSION: u8 = 9; /// A sqlite based cryptostore. #[derive(Clone)] @@ -297,6 +297,16 @@ impl SqliteStateStore { }) .await?; } + + if from < 9 && to >= 9 { + conn.with_transaction(move |txn| { + // Run the migration. + txn.execute_batch(include_str!("../migrations/state_store/008_send_queue.sql"))?; + txn.set_db_version(9) + }) + .await?; + } + Ok(()) } @@ -1854,10 +1864,10 @@ impl StateStore for SqliteStateStore { &self, room_id: &RoomId, parent_txn_id: &TransactionId, - event_id: OwnedEventId, + parent_key: SentRequestKey, ) -> Result { let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id); - let event_id = self.serialize_value(&event_id)?; + let parent_key = self.serialize_value(&parent_key)?; // See comment in `save_send_queue_event`. let parent_txn_id = parent_txn_id.to_string(); @@ -1866,9 +1876,9 @@ impl StateStore for SqliteStateStore { .await? .with_transaction(move |txn| { Ok(txn.prepare_cached( - "UPDATE dependent_send_queue_events SET event_id = ? WHERE parent_transaction_id = ? and room_id = ?", + "UPDATE dependent_send_queue_events SET parent_key = ? WHERE parent_transaction_id = ? and room_id = ?", )? - .execute((event_id, parent_txn_id, room_id))?) + .execute((parent_key, parent_txn_id, room_id))?) }) .await } @@ -1908,7 +1918,7 @@ impl StateStore for SqliteStateStore { .acquire() .await? .prepare( - "SELECT own_transaction_id, parent_transaction_id, event_id, content FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID", + "SELECT own_transaction_id, parent_transaction_id, parent_key, content FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID", |mut stmt| { stmt.query((room_id,))? .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))) @@ -1922,7 +1932,7 @@ impl StateStore for SqliteStateStore { dependent_events.push(DependentQueuedRequest { own_transaction_id: entry.0.into(), parent_transaction_id: entry.1.into(), - event_id: entry.2.map(|bytes| self.deserialize_value(&bytes)).transpose()?, + parent_key: entry.2.map(|bytes| self.deserialize_value(&bytes)).transpose()?, kind: self.deserialize_json(&entry.3)?, }); } diff --git a/crates/matrix-sdk/src/send_queue.rs b/crates/matrix-sdk/src/send_queue.rs index fcee608eed2..1c570c1cdbf 100644 --- a/crates/matrix-sdk/src/send_queue.rs +++ b/crates/matrix-sdk/src/send_queue.rs @@ -54,7 +54,7 @@ use std::{ use matrix_sdk_base::{ store::{ ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError, - QueuedRequest, QueuedRequestKind, SerializableEventContent, + QueuedRequest, QueuedRequestKind, SentRequestKey, SerializableEventContent, }, RoomState, StoreError, }; @@ -772,7 +772,11 @@ impl QueueStorage { // Update all dependent requests. store - .update_dependent_queued_request(&self.room_id, transaction_id, event_id.to_owned()) + .update_dependent_queued_request( + &self.room_id, + transaction_id, + SentRequestKey::Event(event_id.to_owned()), + ) .await?; let removed = store.remove_send_queue_request(&self.room_id, transaction_id).await?; @@ -953,9 +957,17 @@ impl QueueStorage { ) -> Result { let store = client.store(); + let parent_key = de.parent_key; + match de.kind { DependentQueuedRequestKind::EditEvent { new_content } => { - if let Some(event_id) = de.event_id { + if let Some(parent_key) = parent_key { + let Some(event_id) = parent_key.into_event_id() else { + return Err(RoomSendQueueError::StorageError( + RoomSendQueueStorageError::InvalidParentKey, + )); + }; + // The parent event has been sent, so send an edit event. let room = client .get_room(&self.room_id) @@ -1030,7 +1042,13 @@ impl QueueStorage { } DependentQueuedRequestKind::RedactEvent => { - if let Some(event_id) = de.event_id { + if let Some(parent_key) = parent_key { + let Some(event_id) = parent_key.into_event_id() else { + return Err(RoomSendQueueError::StorageError( + RoomSendQueueStorageError::InvalidParentKey, + )); + }; + // The parent event has been sent; send a redaction. let room = client .get_room(&self.room_id) @@ -1064,10 +1082,16 @@ impl QueueStorage { } DependentQueuedRequestKind::ReactEvent { key } => { - if let Some(event_id) = de.event_id { + if let Some(parent_key) = parent_key { + let Some(parent_event_id) = parent_key.into_event_id() else { + return Err(RoomSendQueueError::StorageError( + RoomSendQueueStorageError::InvalidParentKey, + )); + }; + // Queue the reaction event in the send queue 🧠. let react_event = - ReactionEventContent::new(Annotation::new(event_id, key)).into(); + ReactionEventContent::new(Annotation::new(parent_event_id, key)).into(); let serializable = SerializableEventContent::from_raw( Raw::new(&react_event) .map_err(RoomSendQueueStorageError::JsonSerialization)?, @@ -1303,6 +1327,11 @@ pub enum RoomSendQueueStorageError { #[error(transparent)] JsonSerialization(#[from] serde_json::Error), + /// A parent key was expected to be of a certain type, and it was another + /// type instead. + #[error("a dependent event had an invalid parent key type")] + InvalidParentKey, + /// The client is shutting down. #[error("The client is shutting down.")] ClientShuttingDown, @@ -1599,14 +1628,14 @@ mod tests { ) .unwrap(), }, - event_id: None, + parent_key: None, }; let res = canonicalize_dependent_requests(&[edit]); assert_eq!(res.len(), 1); assert_matches!(&res[0].kind, DependentQueuedRequestKind::EditEvent { .. }); assert_eq!(res[0].parent_transaction_id, txn); - assert!(res[0].event_id.is_none()); + assert!(res[0].parent_key.is_none()); } #[test] @@ -1619,7 +1648,7 @@ mod tests { own_transaction_id: ChildTransactionId::new(), parent_transaction_id: txn.clone(), kind: DependentQueuedRequestKind::RedactEvent, - event_id: None, + parent_key: None, }; let edit = DependentQueuedRequest { @@ -1631,7 +1660,7 @@ mod tests { ) .unwrap(), }, - event_id: None, + parent_key: None, }; inputs.push({ @@ -1670,7 +1699,7 @@ mod tests { ) .unwrap(), }, - event_id: None, + parent_key: None, }) .collect::>(); @@ -1701,7 +1730,7 @@ mod tests { own_transaction_id: child1.clone(), kind: DependentQueuedRequestKind::RedactEvent, parent_transaction_id: txn1.clone(), - event_id: None, + parent_key: None, }, // This one pertains to txn2. DependentQueuedRequest { @@ -1713,7 +1742,7 @@ mod tests { .unwrap(), }, parent_transaction_id: txn2.clone(), - event_id: None, + parent_key: None, }, ]; @@ -1743,7 +1772,7 @@ mod tests { own_transaction_id: react_id.clone(), kind: DependentQueuedRequestKind::ReactEvent { key: "🧠".to_owned() }, parent_transaction_id: txn.clone(), - event_id: None, + parent_key: None, }; let edit_id = ChildTransactionId::new(); @@ -1756,7 +1785,7 @@ mod tests { .unwrap(), }, parent_transaction_id: txn, - event_id: None, + parent_key: None, }; let res = canonicalize_dependent_requests(&[react, edit]); From 6adc615302c5cce4aa955290341b1aea98c083ad Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Tue, 29 Oct 2024 19:24:27 +0100 Subject: [PATCH 2/6] chore(send queue): move sending of an event to an helper function --- crates/matrix-sdk/src/send_queue.rs | 65 ++++++++++++++++------------- 1 file changed, 36 insertions(+), 29 deletions(-) diff --git a/crates/matrix-sdk/src/send_queue.rs b/crates/matrix-sdk/src/send_queue.rs index 1c570c1cdbf..b34853d13d6 100644 --- a/crates/matrix-sdk/src/send_queue.rs +++ b/crates/matrix-sdk/src/send_queue.rs @@ -65,7 +65,7 @@ use ruma::{ EventContent as _, }, serde::Raw, - EventId, OwnedEventId, OwnedRoomId, OwnedTransactionId, TransactionId, + OwnedEventId, OwnedRoomId, OwnedTransactionId, TransactionId, }; use tokio::sync::{broadcast, Notify, RwLock}; use tracing::{debug, error, info, instrument, trace, warn}; @@ -464,32 +464,24 @@ impl RoomSendQueue { continue; }; - let (event, event_type) = match &queued_request.kind { - QueuedRequestKind::Event { content } => content.raw(), - }; - - match room - .send_raw(event_type, event) - .with_transaction_id(&queued_request.transaction_id) - .with_request_config(RequestConfig::short_retry()) - .await - { - Ok(res) => { - trace!(txn_id = %queued_request.transaction_id, event_id = %res.event_id, "successfully sent"); - - match queue.mark_as_sent(&queued_request.transaction_id, &res.event_id).await { - Ok(()) => { + match Self::handle_request(&room, &queued_request).await { + Ok(parent_key) => match queue + .mark_as_sent(&queued_request.transaction_id, parent_key.clone()) + .await + { + Ok(()) => match parent_key { + SentRequestKey::Event(event_id) => { let _ = updates.send(RoomSendQueueUpdate::SentEvent { transaction_id: queued_request.transaction_id, - event_id: res.event_id, + event_id, }); } + }, - Err(err) => { - warn!("unable to mark queued event as sent: {err}"); - } + Err(err) => { + warn!("unable to mark queued request as sent: {err}"); } - } + }, Err(err) => { let is_recoverable = match err { @@ -563,6 +555,27 @@ impl RoomSendQueue { info!("exited sending task"); } + /// Handles a single request and returns the [`SentRequestKey`] on success. + async fn handle_request( + room: &Room, + request: &QueuedRequest, + ) -> Result { + match &request.kind { + QueuedRequestKind::Event { content } => { + let (event, event_type) = content.raw(); + + let res = room + .send_raw(event_type, event) + .with_transaction_id(&request.transaction_id) + .with_request_config(RequestConfig::short_retry()) + .await?; + + trace!(txn_id = %request.transaction_id, event_id = %res.event_id, "event successfully sent"); + Ok(SentRequestKey::Event(res.event_id)) + } + } + } + /// Returns whether the room is enabled, at the room level. pub fn is_enabled(&self) -> bool { self.inner.locally_enabled.load(Ordering::SeqCst) @@ -761,7 +774,7 @@ impl QueueStorage { async fn mark_as_sent( &self, transaction_id: &TransactionId, - event_id: &EventId, + parent_key: SentRequestKey, ) -> Result<(), RoomSendQueueStorageError> { // Keep the lock until we're done touching the storage. let mut being_sent = self.being_sent.write().await; @@ -771,13 +784,7 @@ impl QueueStorage { let store = client.store(); // Update all dependent requests. - store - .update_dependent_queued_request( - &self.room_id, - transaction_id, - SentRequestKey::Event(event_id.to_owned()), - ) - .await?; + store.update_dependent_queued_request(&self.room_id, transaction_id, parent_key).await?; let removed = store.remove_send_queue_request(&self.room_id, transaction_id).await?; From 5d3ed08f29a24618ddb70cd636c040fd059302f7 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Tue, 29 Oct 2024 19:56:29 +0100 Subject: [PATCH 3/6] refactor!(event cache store): store the serialized `QueuedRequestKind`, not a raw event Changelog: The send queue will now store a serialized `QueuedRequestKind` instead of a raw event, which breaks the format. As a result, all send queues have been emptied. --- .../src/store/integration_tests.rs | 14 ++++++------- .../matrix-sdk-base/src/store/memory_store.rs | 21 +++++++++---------- .../matrix-sdk-base/src/store/send_queue.rs | 6 ++++++ crates/matrix-sdk-base/src/store/traits.rs | 12 +++++------ .../src/state_store/migrations.rs | 13 +++++++----- .../src/state_store/mod.rs | 8 +++---- .../migrations/state_store/008_send_queue.sql | 3 +++ crates/matrix-sdk-sqlite/src/state_store.rs | 8 +++---- crates/matrix-sdk/src/send_queue.rs | 14 ++++++------- 9 files changed, 55 insertions(+), 44 deletions(-) diff --git a/crates/matrix-sdk-base/src/store/integration_tests.rs b/crates/matrix-sdk-base/src/store/integration_tests.rs index a49f4d79492..131f919c2c4 100644 --- a/crates/matrix-sdk-base/src/store/integration_tests.rs +++ b/crates/matrix-sdk-base/src/store/integration_tests.rs @@ -1212,7 +1212,7 @@ impl StateStoreIntegrationTests for DynStateStore { let event0 = SerializableEventContent::new(&RoomMessageEventContent::text_plain("msg0").into()) .unwrap(); - self.save_send_queue_request(room_id, txn0.clone(), event0).await.unwrap(); + self.save_send_queue_request(room_id, txn0.clone(), event0.into()).await.unwrap(); // Reading it will work. let pending = self.load_send_queue_requests(room_id).await.unwrap(); @@ -1236,7 +1236,7 @@ impl StateStoreIntegrationTests for DynStateStore { ) .unwrap(); - self.save_send_queue_request(room_id, txn, event).await.unwrap(); + self.save_send_queue_request(room_id, txn, event.into()).await.unwrap(); } // Reading all the events should work. @@ -1286,7 +1286,7 @@ impl StateStoreIntegrationTests for DynStateStore { &RoomMessageEventContent::text_plain("wow that's a cool test").into(), ) .unwrap(); - self.update_send_queue_request(room_id, txn2, event0).await.unwrap(); + self.update_send_queue_request(room_id, txn2, event0.into()).await.unwrap(); // And it is reflected. let pending = self.load_send_queue_requests(room_id).await.unwrap(); @@ -1334,7 +1334,7 @@ impl StateStoreIntegrationTests for DynStateStore { let event = SerializableEventContent::new(&RoomMessageEventContent::text_plain("room2").into()) .unwrap(); - self.save_send_queue_request(room_id2, txn.clone(), event).await.unwrap(); + self.save_send_queue_request(room_id2, txn.clone(), event.into()).await.unwrap(); } // Add and remove one event for room3. @@ -1344,7 +1344,7 @@ impl StateStoreIntegrationTests for DynStateStore { let event = SerializableEventContent::new(&RoomMessageEventContent::text_plain("room3").into()) .unwrap(); - self.save_send_queue_request(room_id3, txn.clone(), event).await.unwrap(); + self.save_send_queue_request(room_id3, txn.clone(), event.into()).await.unwrap(); self.remove_send_queue_request(room_id3, &txn).await.unwrap(); } @@ -1365,7 +1365,7 @@ impl StateStoreIntegrationTests for DynStateStore { let event0 = SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey").into()) .unwrap(); - self.save_send_queue_request(room_id, txn0.clone(), event0).await.unwrap(); + self.save_send_queue_request(room_id, txn0.clone(), event0.into()).await.unwrap(); // No dependents, to start with. assert!(self.load_dependent_queued_requests(room_id).await.unwrap().is_empty()); @@ -1425,7 +1425,7 @@ impl StateStoreIntegrationTests for DynStateStore { let event1 = SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey2").into()) .unwrap(); - self.save_send_queue_request(room_id, txn1.clone(), event1).await.unwrap(); + self.save_send_queue_request(room_id, txn1.clone(), event1.into()).await.unwrap(); self.save_dependent_queued_request( room_id, diff --git a/crates/matrix-sdk-base/src/store/memory_store.rs b/crates/matrix-sdk-base/src/store/memory_store.rs index aaeda6e3d9d..2ba91d6f200 100644 --- a/crates/matrix-sdk-base/src/store/memory_store.rs +++ b/crates/matrix-sdk-base/src/store/memory_store.rs @@ -36,7 +36,7 @@ use ruma::{ use tracing::{debug, instrument, trace, warn}; use super::{ - send_queue::{ChildTransactionId, QueuedRequest, SentRequestKey, SerializableEventContent}, + send_queue::{ChildTransactionId, QueuedRequest, SentRequestKey}, traits::{ComposerDraft, ServerCapabilities}, DependentQueuedRequest, DependentQueuedRequestKind, QueuedRequestKind, Result, RoomInfo, StateChanges, StateStore, StoreError, @@ -806,15 +806,14 @@ impl StateStore for MemoryStore { &self, room_id: &RoomId, transaction_id: OwnedTransactionId, - content: SerializableEventContent, + kind: QueuedRequestKind, ) -> Result<(), Self::Error> { - self.send_queue_events.write().unwrap().entry(room_id.to_owned()).or_default().push( - QueuedRequest { - kind: QueuedRequestKind::Event { content }, - transaction_id, - error: None, - }, - ); + self.send_queue_events + .write() + .unwrap() + .entry(room_id.to_owned()) + .or_default() + .push(QueuedRequest { kind, transaction_id, error: None }); Ok(()) } @@ -822,7 +821,7 @@ impl StateStore for MemoryStore { &self, room_id: &RoomId, transaction_id: &TransactionId, - content: SerializableEventContent, + kind: QueuedRequestKind, ) -> Result { if let Some(entry) = self .send_queue_events @@ -833,7 +832,7 @@ impl StateStore for MemoryStore { .iter_mut() .find(|item| item.transaction_id == transaction_id) { - entry.kind = QueuedRequestKind::Event { content }; + entry.kind = kind; entry.error = None; Ok(true) } else { diff --git a/crates/matrix-sdk-base/src/store/send_queue.rs b/crates/matrix-sdk-base/src/store/send_queue.rs index 6a695c62959..7efe0f13633 100644 --- a/crates/matrix-sdk-base/src/store/send_queue.rs +++ b/crates/matrix-sdk-base/src/store/send_queue.rs @@ -78,6 +78,12 @@ pub enum QueuedRequestKind { }, } +impl From for QueuedRequestKind { + fn from(content: SerializableEventContent) -> Self { + Self::Event { content } + } +} + /// A request to be sent with a send queue. #[derive(Clone)] pub struct QueuedRequest { diff --git a/crates/matrix-sdk-base/src/store/traits.rs b/crates/matrix-sdk-base/src/store/traits.rs index e1bfd517432..4dabd9eefac 100644 --- a/crates/matrix-sdk-base/src/store/traits.rs +++ b/crates/matrix-sdk-base/src/store/traits.rs @@ -42,8 +42,8 @@ use serde::{Deserialize, Serialize}; use super::{ send_queue::SentRequestKey, ChildTransactionId, DependentQueuedRequest, - DependentQueuedRequestKind, QueueWedgeError, QueuedRequest, SerializableEventContent, - StateChanges, StoreError, + DependentQueuedRequestKind, QueueWedgeError, QueuedRequest, QueuedRequestKind, StateChanges, + StoreError, }; use crate::{ deserialized_responses::{RawAnySyncOrStrippedState, RawMemberEvent, RawSyncOrStrippedState}, @@ -357,7 +357,7 @@ pub trait StateStore: AsyncTraitDeps { &self, room_id: &RoomId, transaction_id: OwnedTransactionId, - content: SerializableEventContent, + request: QueuedRequestKind, ) -> Result<(), Self::Error>; /// Updates a send queue request with the given content, and resets its @@ -375,7 +375,7 @@ pub trait StateStore: AsyncTraitDeps { &self, room_id: &RoomId, transaction_id: &TransactionId, - content: SerializableEventContent, + content: QueuedRequestKind, ) -> Result; /// Remove a request previously inserted with @@ -640,7 +640,7 @@ impl StateStore for EraseStateStoreError { &self, room_id: &RoomId, transaction_id: OwnedTransactionId, - content: SerializableEventContent, + content: QueuedRequestKind, ) -> Result<(), Self::Error> { self.0.save_send_queue_request(room_id, transaction_id, content).await.map_err(Into::into) } @@ -649,7 +649,7 @@ impl StateStore for EraseStateStoreError { &self, room_id: &RoomId, transaction_id: &TransactionId, - content: SerializableEventContent, + content: QueuedRequestKind, ) -> Result { self.0.update_send_queue_request(room_id, transaction_id, content).await.map_err(Into::into) } diff --git a/crates/matrix-sdk-indexeddb/src/state_store/migrations.rs b/crates/matrix-sdk-indexeddb/src/state_store/migrations.rs index 8e559cfea9d..5d86dcf61a6 100644 --- a/crates/matrix-sdk-indexeddb/src/state_store/migrations.rs +++ b/crates/matrix-sdk-indexeddb/src/state_store/migrations.rs @@ -774,13 +774,16 @@ async fn migrate_to_v11(db: IdbDatabase) -> Result { apply_migration(db, 11, migration).await } -/// Drop entries from the [`keys::DEPENDENT_SEND_QUEUE`] table. +/// The format of data serialized into the send queue and dependent send queue +/// tables have changed, clear both. async fn migrate_to_v12(db: IdbDatabase) -> Result { - let tx = - db.transaction_on_one_with_mode(keys::DEPENDENT_SEND_QUEUE, IdbTransactionMode::Readwrite)?; + let store_keys = &[keys::DEPENDENT_SEND_QUEUE, keys::ROOM_SEND_QUEUE]; + let tx = db.transaction_on_multi_with_mode(store_keys, IdbTransactionMode::Readwrite)?; - let store = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?; - store.clear()?; + for store_name in store_keys { + let store = tx.object_store(store_name)?; + store.clear()?; + } tx.await.into_result()?; diff --git a/crates/matrix-sdk-indexeddb/src/state_store/mod.rs b/crates/matrix-sdk-indexeddb/src/state_store/mod.rs index f8ac92af8f0..39156c70a25 100644 --- a/crates/matrix-sdk-indexeddb/src/state_store/mod.rs +++ b/crates/matrix-sdk-indexeddb/src/state_store/mod.rs @@ -1328,7 +1328,7 @@ impl_state_store!({ &self, room_id: &RoomId, transaction_id: OwnedTransactionId, - content: SerializableEventContent, + kind: QueuedRequestKind, ) -> Result<()> { let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id); @@ -1352,7 +1352,7 @@ impl_state_store!({ // Push the new request. prev.push(PersistedQueuedRequest { room_id: room_id.to_owned(), - kind: Some(QueuedRequestKind::Event { content }), + kind: Some(kind), transaction_id, error: None, is_wedged: None, @@ -1371,7 +1371,7 @@ impl_state_store!({ &self, room_id: &RoomId, transaction_id: &TransactionId, - content: SerializableEventContent, + kind: QueuedRequestKind, ) -> Result { let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id); @@ -1394,7 +1394,7 @@ impl_state_store!({ // Modify the one request. if let Some(entry) = prev.iter_mut().find(|entry| entry.transaction_id == transaction_id) { - entry.kind = Some(QueuedRequestKind::Event { content }); + entry.kind = Some(kind); // Reset the error state. entry.error = None; // Remove migrated fields. diff --git a/crates/matrix-sdk-sqlite/migrations/state_store/008_send_queue.sql b/crates/matrix-sdk-sqlite/migrations/state_store/008_send_queue.sql index d1bec740d87..f6afcbe54af 100644 --- a/crates/matrix-sdk-sqlite/migrations/state_store/008_send_queue.sql +++ b/crates/matrix-sdk-sqlite/migrations/state_store/008_send_queue.sql @@ -5,3 +5,6 @@ DELETE FROM "dependent_send_queue_events"; ALTER TABLE "dependent_send_queue_events" RENAME COLUMN "event_id" TO "parent_key"; + +--- Delete all previous entries in the send queue, since the content's format has changed. +DELETE FROM "send_queue_events"; diff --git a/crates/matrix-sdk-sqlite/src/state_store.rs b/crates/matrix-sdk-sqlite/src/state_store.rs index 7c951d4ffdf..23570681a96 100644 --- a/crates/matrix-sdk-sqlite/src/state_store.rs +++ b/crates/matrix-sdk-sqlite/src/state_store.rs @@ -13,7 +13,7 @@ use matrix_sdk_base::{ store::{ migration_helpers::RoomInfoV1, ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError, QueuedRequest, QueuedRequestKind, - SentRequestKey, SerializableEventContent, + SentRequestKey, }, MinimalRoomMemberEvent, RoomInfo, RoomMemberships, RoomState, StateChanges, StateStore, StateStoreDataKey, StateStoreDataValue, @@ -1684,7 +1684,7 @@ impl StateStore for SqliteStateStore { &self, room_id: &RoomId, transaction_id: OwnedTransactionId, - content: SerializableEventContent, + content: QueuedRequestKind, ) -> Result<(), Self::Error> { let room_id_key = self.encode_key(keys::SEND_QUEUE, room_id); let room_id_value = self.serialize_value(&room_id.to_owned())?; @@ -1709,7 +1709,7 @@ impl StateStore for SqliteStateStore { &self, room_id: &RoomId, transaction_id: &TransactionId, - content: SerializableEventContent, + content: QueuedRequestKind, ) -> Result { let room_id = self.encode_key(keys::SEND_QUEUE, room_id); @@ -1778,7 +1778,7 @@ impl StateStore for SqliteStateStore { for entry in res { requests.push(QueuedRequest { transaction_id: entry.0.into(), - kind: QueuedRequestKind::Event { content: self.deserialize_json(&entry.1)? }, + kind: self.deserialize_json(&entry.1)?, error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?, }); } diff --git a/crates/matrix-sdk/src/send_queue.rs b/crates/matrix-sdk/src/send_queue.rs index b34853d13d6..01c50eddd73 100644 --- a/crates/matrix-sdk/src/send_queue.rs +++ b/crates/matrix-sdk/src/send_queue.rs @@ -345,7 +345,7 @@ impl RoomSendQueue { let content = SerializableEventContent::from_raw(content, event_type); - let transaction_id = self.inner.queue.push(content.clone()).await?; + let transaction_id = self.inner.queue.push(content.clone().into()).await?; trace!(%transaction_id, "manager sends a raw event to the background task"); self.inner.notifier.notify_one(); @@ -698,13 +698,13 @@ impl QueueStorage { /// Returns the transaction id chosen to identify the request. async fn push( &self, - serializable: SerializableEventContent, + request: QueuedRequestKind, ) -> Result { let transaction_id = TransactionId::new(); self.client()? .store() - .save_send_queue_request(&self.room_id, transaction_id.clone(), serializable) + .save_send_queue_request(&self.room_id, transaction_id.clone(), request) .await?; Ok(transaction_id) @@ -861,7 +861,7 @@ impl QueueStorage { let edited = self .client()? .store() - .update_send_queue_request(&self.room_id, transaction_id, serializable) + .update_send_queue_request(&self.room_id, transaction_id, serializable.into()) .await?; Ok(edited) @@ -1027,7 +1027,7 @@ impl QueueStorage { .save_send_queue_request( &self.room_id, de.own_transaction_id.into(), - serializable, + serializable.into(), ) .await .map_err(RoomSendQueueStorageError::StorageError)?; @@ -1037,7 +1037,7 @@ impl QueueStorage { .update_send_queue_request( &self.room_id, &de.parent_transaction_id, - new_content, + new_content.into(), ) .await .map_err(RoomSendQueueStorageError::StorageError)?; @@ -1109,7 +1109,7 @@ impl QueueStorage { .save_send_queue_request( &self.room_id, de.own_transaction_id.into(), - serializable, + serializable.into(), ) .await .map_err(RoomSendQueueStorageError::StorageError)?; From 2ab23b0e5e270dbe52adf0e8b5cab0b7f186d9a4 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Tue, 29 Oct 2024 19:57:41 +0100 Subject: [PATCH 4/6] chore(event cache store): remove failing test Because the latest migration would clear events to-be-sent from the send queue, the test now failed. It's been considered OK. --- crates/matrix-sdk-sqlite/src/state_store.rs | 86 +-------------------- 1 file changed, 3 insertions(+), 83 deletions(-) diff --git a/crates/matrix-sdk-sqlite/src/state_store.rs b/crates/matrix-sdk-sqlite/src/state_store.rs index 23570681a96..06082b0d1b3 100644 --- a/crates/matrix-sdk-sqlite/src/state_store.rs +++ b/crates/matrix-sdk-sqlite/src/state_store.rs @@ -2010,21 +2010,12 @@ mod migration_tests { }, }; - use assert_matches::assert_matches; - use matrix_sdk_base::{ - store::{QueueWedgeError, SerializableEventContent}, - sync::UnreadNotificationsCount, - RoomState, StateStore, - }; + use matrix_sdk_base::{sync::UnreadNotificationsCount, RoomState, StateStore}; use matrix_sdk_test::async_test; use once_cell::sync::Lazy; use ruma::{ - events::{ - room::{create::RoomCreateEventContent, message::RoomMessageEventContent}, - StateEventType, - }, - room_id, server_name, user_id, EventId, MilliSecondsSinceUnixEpoch, RoomId, TransactionId, - UserId, + events::{room::create::RoomCreateEventContent, StateEventType}, + room_id, server_name, user_id, EventId, MilliSecondsSinceUnixEpoch, RoomId, UserId, }; use rusqlite::Transaction; use serde_json::json; @@ -2271,75 +2262,4 @@ mod migration_tests { assert_eq!(room_c.name(), None); assert_eq!(room_c.creator(), Some(room_c_create_sender)); } - - #[async_test] - pub async fn test_migrating_v7_to_v8() { - let path = new_path(); - - let room_a_id = room_id!("!room_a:dummy.local"); - let wedged_event_transaction_id = TransactionId::new(); - let local_event_transaction_id = TransactionId::new(); - - // Create and populate db. - { - let db = create_fake_db(&path, 7).await.unwrap(); - let conn = db.pool.get().await.unwrap(); - - let wedge_tx = wedged_event_transaction_id.clone(); - let local_tx = local_event_transaction_id.clone(); - - conn.with_transaction(move |txn| { - add_send_queue_event_v7(&db, txn, &wedge_tx, room_a_id, true)?; - add_send_queue_event_v7(&db, txn, &local_tx, room_a_id, false)?; - - Result::<_, Error>::Ok(()) - }) - .await - .unwrap(); - } - - // This transparently migrates to the latest version. - let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap(); - let requests = store.load_send_queue_requests(room_a_id).await.unwrap(); - - assert_eq!(requests.len(), 2); - - let migrated_wedged = - requests.iter().find(|e| e.transaction_id == wedged_event_transaction_id).unwrap(); - - assert!(migrated_wedged.is_wedged()); - assert_matches!( - migrated_wedged.error.clone(), - Some(QueueWedgeError::GenericApiError { .. }) - ); - - let migrated_ok = requests - .iter() - .find(|e| e.transaction_id == local_event_transaction_id.clone()) - .unwrap(); - - assert!(!migrated_ok.is_wedged()); - assert!(migrated_ok.error.is_none()); - } - - fn add_send_queue_event_v7( - this: &SqliteStateStore, - txn: &Transaction<'_>, - transaction_id: &TransactionId, - room_id: &RoomId, - is_wedged: bool, - ) -> Result<(), Error> { - let content = - SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())?; - - let room_id_key = this.encode_key(keys::SEND_QUEUE, room_id); - let room_id_value = this.serialize_value(&room_id.to_owned())?; - - let content = this.serialize_json(&content)?; - - txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, wedged) VALUES (?, ?, ?, ?, ?)")? - .execute((room_id_key, room_id_value, transaction_id.to_string(), content, is_wedged))?; - - Ok(()) - } } From 9b197e250483aa673f007e6600ff7c43cdc11cb4 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Wed, 30 Oct 2024 15:34:02 +0100 Subject: [PATCH 5/6] chore(base): refactor internal helpers related to media Notably, make it super clear what parameters are required to create the attachment type, since the function doesn't consume the whole `AttachmentConfig` for realz. --- crates/matrix-sdk/src/encryption/mod.rs | 13 ++-- crates/matrix-sdk/src/media.rs | 13 ++-- crates/matrix-sdk/src/room/mod.rs | 79 +++++++++++++++---------- 3 files changed, 59 insertions(+), 46 deletions(-) diff --git a/crates/matrix-sdk/src/encryption/mod.rs b/crates/matrix-sdk/src/encryption/mod.rs index b59c3e96d0a..a6ecc1f7754 100644 --- a/crates/matrix-sdk/src/encryption/mod.rs +++ b/crates/matrix-sdk/src/encryption/mod.rs @@ -459,7 +459,7 @@ impl Client { data: &[u8], thumbnail: Option, send_progress: SharedObservable, - ) -> Result<(MediaSource, Option, Option>)> { + ) -> Result<(MediaSource, Option<(MediaSource, Box)>)> { let upload_thumbnail = self.upload_encrypted_thumbnail(thumbnail, content_type, send_progress.clone()); @@ -470,10 +470,9 @@ impl Client { .await }; - let ((thumbnail_source, thumbnail_info), file) = - try_join(upload_thumbnail, upload_attachment).await?; + let (thumbnail, file) = try_join(upload_thumbnail, upload_attachment).await?; - Ok((MediaSource::Encrypted(Box::new(file)), thumbnail_source, thumbnail_info)) + Ok((MediaSource::Encrypted(Box::new(file)), thumbnail)) } /// Uploads an encrypted thumbnail to the media repository, and returns @@ -483,9 +482,9 @@ impl Client { thumbnail: Option, content_type: &mime::Mime, send_progress: SharedObservable, - ) -> Result<(Option, Option>)> { + ) -> Result)>> { let Some(thumbnail) = thumbnail else { - return Ok((None, None)); + return Ok(None); }; let mut cursor = Cursor::new(thumbnail.data); @@ -501,7 +500,7 @@ impl Client { mimetype: Some(thumbnail.content_type.as_ref().to_owned()) }); - Ok((Some(MediaSource::Encrypted(Box::new(file))), Some(Box::new(thumbnail_info)))) + Ok(Some((MediaSource::Encrypted(Box::new(file)), Box::new(thumbnail_info)))) } /// Claim one-time keys creating new Olm sessions. diff --git a/crates/matrix-sdk/src/media.rs b/crates/matrix-sdk/src/media.rs index c4553dba731..a25466e0a98 100644 --- a/crates/matrix-sdk/src/media.rs +++ b/crates/matrix-sdk/src/media.rs @@ -613,7 +613,7 @@ impl Media { data: Vec, thumbnail: Option, send_progress: SharedObservable, - ) -> Result<(MediaSource, Option, Option>)> { + ) -> Result<(MediaSource, Option<(MediaSource, Box)>)> { let upload_thumbnail = self.upload_thumbnail(thumbnail, send_progress.clone()); let upload_attachment = async move { @@ -623,10 +623,9 @@ impl Media { .map_err(Error::from) }; - let ((thumbnail_source, thumbnail_info), response) = - try_join(upload_thumbnail, upload_attachment).await?; + let (thumbnail, response) = try_join(upload_thumbnail, upload_attachment).await?; - Ok((MediaSource::Plain(response.content_uri), thumbnail_source, thumbnail_info)) + Ok((MediaSource::Plain(response.content_uri), thumbnail)) } /// Uploads an unencrypted thumbnail to the media repository, and returns @@ -635,9 +634,9 @@ impl Media { &self, thumbnail: Option, send_progress: SharedObservable, - ) -> Result<(Option, Option>)> { + ) -> Result)>> { let Some(thumbnail) = thumbnail else { - return Ok((None, None)); + return Ok(None); }; let response = self @@ -654,6 +653,6 @@ impl Media { { mimetype: Some(thumbnail.content_type.as_ref().to_owned()) } ); - Ok((Some(MediaSource::Plain(url)), Some(Box::new(thumbnail_info)))) + Ok(Some((MediaSource::Plain(url), Box::new(thumbnail_info)))) } } diff --git a/crates/matrix-sdk/src/room/mod.rs b/crates/matrix-sdk/src/room/mod.rs index 420aaf8eb5d..0d0e13b1bd6 100644 --- a/crates/matrix-sdk/src/room/mod.rs +++ b/crates/matrix-sdk/src/room/mod.rs @@ -86,7 +86,7 @@ use ruma::{ history_visibility::HistoryVisibility, message::{ AudioInfo, AudioMessageEventContent, FileInfo, FileMessageEventContent, - ImageMessageEventContent, MessageType, RoomMessageEventContent, + FormattedBody, ImageMessageEventContent, MessageType, RoomMessageEventContent, UnstableAudioDetailsContentBlock, UnstableVoiceContentBlock, VideoInfo, VideoMessageEventContent, }, @@ -1960,7 +1960,7 @@ impl Room { }; #[cfg(feature = "e2e-encryption")] - let (media_source, thumbnail_source, thumbnail_info) = if self.is_encrypted().await? { + let (media_source, thumbnail) = if self.is_encrypted().await? { self.client .upload_encrypted_media_and_thumbnail(content_type, &data, thumbnail, send_progress) .await? @@ -1979,7 +1979,7 @@ impl Room { }; #[cfg(not(feature = "e2e-encryption"))] - let (media_source, thumbnail_source, thumbnail_info) = self + let (media_source, thumbnail) = self .client .media() .upload_plain_media_and_thumbnail(content_type, data.clone(), thumbnail, send_progress) @@ -1998,7 +1998,7 @@ impl Room { } if let Some(((data, height, width), source)) = - thumbnail_cache_info.zip(thumbnail_source.as_ref()) + thumbnail_cache_info.zip(thumbnail.as_ref().map(|tuple| &tuple.0)) { debug!("caching the thumbnail"); @@ -2022,21 +2022,19 @@ impl Room { } } - let msg_type = self.make_attachment_message( - content_type, - media_source, - thumbnail_source, - thumbnail_info, - filename, - config, + let content = Self::make_attachment_event( + self.make_attachment_type( + content_type, + filename, + media_source, + config.caption, + config.formatted_caption, + config.info, + thumbnail, + ), + mentions, ); - let mut content = RoomMessageEventContent::new(msg_type); - - if let Some(mentions) = mentions { - content = content.add_mentions(mentions); - } - let mut fut = self.send(content); if let Some(txn_id) = txn_id { fut = fut.with_transaction_id(txn_id); @@ -2046,33 +2044,37 @@ impl Room { /// Creates the inner [`MessageType`] for an already-uploaded media file /// provided by its source. - fn make_attachment_message( + #[allow(clippy::too_many_arguments)] + fn make_attachment_type( &self, content_type: &Mime, - source: MediaSource, - thumbnail_source: Option, - thumbnail_info: Option>, filename: &str, - config: AttachmentConfig, + source: MediaSource, + caption: Option, + formatted_caption: Option, + info: Option, + thumbnail: Option<(MediaSource, Box)>, ) -> MessageType { - // if config.caption is set, use it as body, and filename as the file name - // otherwise, body is the filename, and the filename is not set. + // If caption is set, use it as body, and filename as the file name; otherwise, + // body is the filename, and the filename is not set. // https://github.com/tulir/matrix-spec-proposals/blob/body-as-caption/proposals/2530-body-as-caption.md - let (body, filename) = match config.caption { + let (body, filename) = match caption { Some(caption) => (caption, Some(filename.to_owned())), None => (filename.to_owned(), None), }; + let (thumbnail_source, thumbnail_info) = thumbnail.unzip(); + match content_type.type_() { mime::IMAGE => { - let info = assign!(config.info.map(ImageInfo::from).unwrap_or_default(), { + let info = assign!(info.map(ImageInfo::from).unwrap_or_default(), { mimetype: Some(content_type.as_ref().to_owned()), thumbnail_source, thumbnail_info }); let content = assign!(ImageMessageEventContent::new(body, source), { info: Some(Box::new(info)), - formatted: config.formatted_caption, + formatted: formatted_caption, filename }); MessageType::Image(content) @@ -2082,7 +2084,7 @@ impl Room { let mut content = AudioMessageEventContent::new(body, source); if let Some(AttachmentInfo::Voice { audio_info, waveform: Some(waveform_vec) }) = - &config.info + &info { if let Some(duration) = audio_info.duration { let waveform = waveform_vec.iter().map(|v| (*v).into()).collect(); @@ -2092,7 +2094,7 @@ impl Room { content.voice = Some(UnstableVoiceContentBlock::new()); } - let mut audio_info = config.info.map(AudioInfo::from).unwrap_or_default(); + let mut audio_info = info.map(AudioInfo::from).unwrap_or_default(); audio_info.mimetype = Some(content_type.as_ref().to_owned()); let content = content.info(Box::new(audio_info)); @@ -2100,21 +2102,21 @@ impl Room { } mime::VIDEO => { - let info = assign!(config.info.map(VideoInfo::from).unwrap_or_default(), { + let info = assign!(info.map(VideoInfo::from).unwrap_or_default(), { mimetype: Some(content_type.as_ref().to_owned()), thumbnail_source, thumbnail_info }); let content = assign!(VideoMessageEventContent::new(body, source), { info: Some(Box::new(info)), - formatted: config.formatted_caption, + formatted: formatted_caption, filename }); MessageType::Video(content) } _ => { - let info = assign!(config.info.map(FileInfo::from).unwrap_or_default(), { + let info = assign!(info.map(FileInfo::from).unwrap_or_default(), { mimetype: Some(content_type.as_ref().to_owned()), thumbnail_source, thumbnail_info @@ -2127,6 +2129,19 @@ impl Room { } } + /// Creates the [`RoomMessageEventContent`] based on the message type and + /// mentions. + fn make_attachment_event( + msg_type: MessageType, + mentions: Option, + ) -> RoomMessageEventContent { + let mut content = RoomMessageEventContent::new(msg_type); + if let Some(mentions) = mentions { + content = content.add_mentions(mentions); + } + content + } + /// Update the power levels of a select set of users of this room. /// /// Issue a `power_levels` state event request to the server, changing the From bd6402b8b86d02dc8132b88049d780e7fde1ae1f Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Wed, 30 Oct 2024 16:48:16 +0100 Subject: [PATCH 6/6] feat(send queue): allow sending attachments with the send queue --- bindings/matrix-sdk-ffi/src/error.rs | 17 + crates/matrix-sdk-base/src/media.rs | 9 +- .../src/store/integration_tests.rs | 4 +- crates/matrix-sdk-base/src/store/mod.rs | 5 +- .../matrix-sdk-base/src/store/send_queue.rs | 102 ++- .../src/timeline/controller/mod.rs | 5 + crates/matrix-sdk/src/room/mod.rs | 4 +- crates/matrix-sdk/src/send_queue.rs | 619 ++++++++++++++++-- 8 files changed, 692 insertions(+), 73 deletions(-) diff --git a/bindings/matrix-sdk-ffi/src/error.rs b/bindings/matrix-sdk-ffi/src/error.rs index e644d7974d8..9d793b07020 100644 --- a/bindings/matrix-sdk-ffi/src/error.rs +++ b/bindings/matrix-sdk-ffi/src/error.rs @@ -183,6 +183,12 @@ pub enum QueueWedgeError { /// session before sending. CrossVerificationRequired, + /// Some media content to be sent has disappeared from the cache. + MissingMediaContent, + + /// Some mime type couldn't be parsed. + InvalidMimeType { mime_type: String }, + /// Other errors. GenericApiError { msg: String }, } @@ -201,10 +207,17 @@ impl Display for QueueWedgeError { QueueWedgeError::CrossVerificationRequired => { f.write_str("Own verification is required") } + QueueWedgeError::MissingMediaContent => { + f.write_str("Media to be sent disappeared from local storage") + } + QueueWedgeError::InvalidMimeType { mime_type } => { + write!(f, "Invalid mime type '{mime_type}' for media upload") + } QueueWedgeError::GenericApiError { msg } => f.write_str(msg), } } } + impl From for QueueWedgeError { fn from(value: SdkQueueWedgeError) -> Self { match value { @@ -223,6 +236,10 @@ impl From for QueueWedgeError { users: users.iter().map(ruma::OwnedUserId::to_string).collect(), }, SdkQueueWedgeError::CrossVerificationRequired => Self::CrossVerificationRequired, + SdkQueueWedgeError::MissingMediaContent => Self::MissingMediaContent, + SdkQueueWedgeError::InvalidMimeType { mime_type } => { + Self::InvalidMimeType { mime_type } + } SdkQueueWedgeError::GenericApiError { msg } => Self::GenericApiError { msg }, } } diff --git a/crates/matrix-sdk-base/src/media.rs b/crates/matrix-sdk-base/src/media.rs index 6d02683a5f8..15950be168c 100644 --- a/crates/matrix-sdk-base/src/media.rs +++ b/crates/matrix-sdk-base/src/media.rs @@ -14,6 +14,7 @@ use ruma::{ }, MxcUri, UInt, }; +use serde::{Deserialize, Serialize}; const UNIQUE_SEPARATOR: &str = "_"; @@ -25,7 +26,7 @@ pub trait UniqueKey { } /// The requested format of a media file. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub enum MediaFormat { /// The file that was uploaded. File, @@ -44,7 +45,7 @@ impl UniqueKey for MediaFormat { } /// The requested size of a media thumbnail. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct MediaThumbnailSize { /// The desired resizing method. pub method: Method, @@ -65,7 +66,7 @@ impl UniqueKey for MediaThumbnailSize { } /// The desired settings of a media thumbnail. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct MediaThumbnailSettings { /// The desired size of the thumbnail. pub size: MediaThumbnailSize, @@ -110,7 +111,7 @@ impl UniqueKey for MediaSource { } /// A request for media data. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct MediaRequest { /// The source of the media file. pub source: MediaSource, diff --git a/crates/matrix-sdk-base/src/store/integration_tests.rs b/crates/matrix-sdk-base/src/store/integration_tests.rs index 131f919c2c4..b8121bc197e 100644 --- a/crates/matrix-sdk-base/src/store/integration_tests.rs +++ b/crates/matrix-sdk-base/src/store/integration_tests.rs @@ -1406,7 +1406,9 @@ impl StateStoreIntegrationTests for DynStateStore { assert_eq!(dependents.len(), 1); assert_eq!(dependents[0].parent_transaction_id, txn0); assert_eq!(dependents[0].own_transaction_id, child_txn); - assert_eq!(dependents[0].parent_key.as_ref(), Some(&SentRequestKey::Event(event_id))); + assert_matches!(dependents[0].parent_key.as_ref(), Some(&SentRequestKey::Event(ref eid)) => { + assert_eq!(*eid, event_id); + }); assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent); // Now remove it. diff --git a/crates/matrix-sdk-base/src/store/mod.rs b/crates/matrix-sdk-base/src/store/mod.rs index cd4b7c67cd1..b219c664bbf 100644 --- a/crates/matrix-sdk-base/src/store/mod.rs +++ b/crates/matrix-sdk-base/src/store/mod.rs @@ -75,8 +75,9 @@ pub use self::integration_tests::StateStoreIntegrationTests; pub use self::{ memory_store::MemoryStore, send_queue::{ - ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError, - QueuedRequest, QueuedRequestKind, SentRequestKey, SerializableEventContent, + ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, + FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind, + SentRequestKey, SerializableEventContent, }, traits::{ ComposerDraft, ComposerDraftType, DynStateStore, IntoStateStore, ServerCapabilities, diff --git a/crates/matrix-sdk-base/src/store/send_queue.rs b/crates/matrix-sdk-base/src/store/send_queue.rs index 7efe0f13633..e5b62242c12 100644 --- a/crates/matrix-sdk-base/src/store/send_queue.rs +++ b/crates/matrix-sdk-base/src/store/send_queue.rs @@ -18,12 +18,17 @@ use std::{collections::BTreeMap, fmt, ops::Deref}; use as_variant::as_variant; use ruma::{ - events::{AnyMessageLikeEventContent, EventContent as _, RawExt as _}, + events::{ + room::{message::RoomMessageEventContent, MediaSource}, + AnyMessageLikeEventContent, EventContent as _, RawExt as _, + }, serde::Raw, - OwnedDeviceId, OwnedEventId, OwnedTransactionId, OwnedUserId, TransactionId, + OwnedDeviceId, OwnedEventId, OwnedTransactionId, OwnedUserId, TransactionId, UInt, }; use serde::{Deserialize, Serialize}; +use crate::media::MediaRequest; + /// A thin wrapper to serialize a `AnyMessageLikeEventContent`. #[derive(Clone, Serialize, Deserialize)] pub struct SerializableEventContent { @@ -76,6 +81,28 @@ pub enum QueuedRequestKind { /// The content of the message-like event we'd like to send. content: SerializableEventContent, }, + + /// Content to upload on the media server. + /// + /// The bytes must be stored in the media cache, and are identified by the + /// cache key. + Upload { + /// Content type of the media to be uploaded. + /// + /// Stored as a `String` because `Mime` which we'd really want to use + /// here, is not serializable. Oh well. + content_type: String, + + /// The cache key used to retrieve the media's bytes in the event cache + /// store. + cache_key: MediaRequest, + + /// An optional media source for a thumbnail already uploadd. + thumbnail_source: Option, + + /// To which media event transaction does this upload relate? + related_to: OwnedTransactionId, + }, } impl From for QueuedRequestKind { @@ -143,6 +170,18 @@ pub enum QueueWedgeError { #[error("Own verification is required")] CrossVerificationRequired, + /// Media content was cached in the media store, but has disappeared before + /// we could upload it. + #[error("Media content disappeared")] + MissingMediaContent, + + /// We tried to upload some media content with an unknown mime type. + #[error("Invalid mime type '{mime_type}' for media")] + InvalidMimeType { + /// The observed mime type that's expected to be invalid. + mime_type: String, + }, + /// Other errors. #[error("Other unrecoverable error: {msg}")] GenericApiError { @@ -169,6 +208,43 @@ pub enum DependentQueuedRequestKind { /// Key used for the reaction. key: String, }, + + /// Upload a file that had a thumbnail. + UploadFileWithThumbnail { + /// Content type for the file itself (not the thumbnail). + content_type: String, + + /// Media request necessary to retrieve the file itself (not the + /// thumbnail). + cache_key: MediaRequest, + + /// To which media transaction id does this upload relate to? + related_to: OwnedTransactionId, + }, + + /// Finish an upload by updating references to the media cache and sending + /// the final media event with the remote MXC URIs. + FinishUpload { + /// Local echo for the event (containing the local MXC URIs). + local_echo: RoomMessageEventContent, + + /// Transaction id for the file upload. + file_upload: OwnedTransactionId, + + /// Information about the thumbnail, if present. + thumbnail_info: Option, + }, +} + +/// Detailed record about a thumbnail used when finishing a media upload. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct FinishUploadThumbnailInfo { + /// Transaction id for the thumbnail upload. + pub txn: OwnedTransactionId, + /// Thumbnail's width. + pub width: UInt, + /// Thumbnail's height. + pub height: UInt, } /// A transaction id identifying a [`DependentQueuedRequest`] rather than its @@ -210,14 +286,34 @@ impl From for OwnedTransactionId { } } +impl From for ChildTransactionId { + fn from(val: OwnedTransactionId) -> Self { + Self(val) + } +} + /// A unique key (identifier) indicating that a transaction has been /// successfully sent to the server. /// /// The owning child transactions can now be resolved. -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub enum SentRequestKey { /// The parent transaction returned an event when it succeeded. Event(OwnedEventId), + + /// The parent transaction returned an uploaded resource URL. + Media { + /// File that was uploaded by this request. + /// + /// If the request related to a thumbnail upload, this contains the + /// thumbnail media source. + file: MediaSource, + + /// Optional thumbnail previously uploaded, when uploading a file. + /// + /// When uploading a thumbnail, this is set to `None`. + thumbnail: Option, + }, } impl SentRequestKey { diff --git a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs index cd1c39b14d5..3dd614fa330 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs @@ -1359,6 +1359,11 @@ impl TimelineController

{ self.update_event_send_state(&transaction_id, EventSendState::Sent { event_id }) .await; } + + RoomSendQueueUpdate::UploadedMedia { related_to } => { + // TODO(bnjbvr): Do something else? + info!(txn_id = %related_to, "some media for a media event has been uploaded"); + } } } } diff --git a/crates/matrix-sdk/src/room/mod.rs b/crates/matrix-sdk/src/room/mod.rs index 0d0e13b1bd6..cc9a294d587 100644 --- a/crates/matrix-sdk/src/room/mod.rs +++ b/crates/matrix-sdk/src/room/mod.rs @@ -2045,7 +2045,7 @@ impl Room { /// Creates the inner [`MessageType`] for an already-uploaded media file /// provided by its source. #[allow(clippy::too_many_arguments)] - fn make_attachment_type( + pub(crate) fn make_attachment_type( &self, content_type: &Mime, filename: &str, @@ -2131,7 +2131,7 @@ impl Room { /// Creates the [`RoomMessageEventContent`] based on the message type and /// mentions. - fn make_attachment_event( + pub(crate) fn make_attachment_event( msg_type: MessageType, mentions: Option, ) -> RoomMessageEventContent { diff --git a/crates/matrix-sdk/src/send_queue.rs b/crates/matrix-sdk/src/send_queue.rs index 01c50eddd73..f3267600381 100644 --- a/crates/matrix-sdk/src/send_queue.rs +++ b/crates/matrix-sdk/src/send_queue.rs @@ -45,27 +45,38 @@ use std::{ collections::{BTreeMap, BTreeSet, HashMap}, + io::Cursor, + str::FromStr as _, sync::{ atomic::{AtomicBool, Ordering}, Arc, RwLock as SyncRwLock, }, }; +use as_variant::as_variant; use matrix_sdk_base::{ + event_cache_store::EventCacheStoreError, + media::{MediaFormat, MediaRequest, MediaThumbnailSettings, MediaThumbnailSize}, store::{ - ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError, - QueuedRequest, QueuedRequestKind, SentRequestKey, SerializableEventContent, + ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, + FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind, + SentRequestKey, SerializableEventContent, }, RoomState, StoreError, }; use matrix_sdk_common::executor::{spawn, JoinHandle}; +use mime::Mime; use ruma::{ + assign, events::{ - reaction::ReactionEventContent, relation::Annotation, AnyMessageLikeEventContent, - EventContent as _, + reaction::ReactionEventContent, + relation::Annotation, + room::{message::MessageType, MediaSource, ThumbnailInfo}, + AnyMessageLikeEventContent, EventContent as _, }, + media::Method, serde::Raw, - OwnedEventId, OwnedRoomId, OwnedTransactionId, TransactionId, + uint, OwnedEventId, OwnedMxcUri, OwnedRoomId, OwnedTransactionId, TransactionId, }; use tokio::sync::{broadcast, Notify, RwLock}; use tracing::{debug, error, info, instrument, trace, warn}; @@ -73,6 +84,7 @@ use tracing::{debug, error, info, instrument, trace, warn}; #[cfg(feature = "e2e-encryption")] use crate::crypto::{OlmError, SessionRecipientCollectionError}; use crate::{ + attachment::AttachmentConfig, client::WeakClient, config::RequestConfig, error::RetryKind, @@ -390,6 +402,225 @@ impl RoomSendQueue { .await } + /// Queues an attachment to be sent to the room, using the send queue. + /// + /// This returns quickly (without sending or uploading anything), and will + /// push the event to be sent into a queue, handled in the background. + /// + /// Callers are expected to consume [`RoomSendQueueUpdate`] via calling + /// the [`Self::subscribe()`] method to get updates about the sending of + /// that event. + /// + /// By default, if sending failed on the first attempt, it will be retried a + /// few times. If sending failed after those retries, the entire + /// client's sending queue will be disabled, and it will need to be + /// manually re-enabled by the caller (e.g. after network is back, or when + /// something has been done about the faulty requests). + pub async fn send_attachment( + &self, + filename: &str, + content_type: Mime, + data: Vec, + mut config: AttachmentConfig, + ) -> Result { + let Some(room) = self.inner.room.get() else { + return Err(RoomSendQueueError::RoomDisappeared); + }; + if room.state() != RoomState::Joined { + return Err(RoomSendQueueError::RoomNotJoined); + } + + let client = room.client(); + let store = client.store(); + + // Push the dependent requests first, to make sure we're not sending the parent + // (depended upon) while dependencies aren't known yet. + + let upload_file_txn = TransactionId::new(); + let send_event_txn = ChildTransactionId::new(); + + // Cache medias. + + // Prepare and cache the file. + let file_source = MediaSource::Plain(OwnedMxcUri::from(format!( + "mxc://send-queue.local/{upload_file_txn}" + ))); + + let file_media_request = + MediaRequest { source: file_source.clone(), format: MediaFormat::File }; + room.client() + .event_cache_store() + .add_media_content(&file_media_request, data.clone()) + .await + .map_err(|err| RoomSendQueueError::StorageError(err.into()))?; + + let (event_content, thumbnail_txn) = if let Some(thumbnail) = config.thumbnail.take() { + let info = thumbnail.info.as_ref(); + let height = info.and_then(|info| info.height).unwrap_or_else(|| uint!(0)); + let width = info.and_then(|info| info.width).unwrap_or_else(|| uint!(0)); + + let thumbnail_upload_txn = TransactionId::new(); + let thumbnail_source = MediaSource::Plain(OwnedMxcUri::from(format!( + "mxc://send-queue.local/{thumbnail_upload_txn}" + ))); + + let media_request = MediaRequest { + source: thumbnail_source.clone(), + format: MediaFormat::Thumbnail(MediaThumbnailSettings { + size: MediaThumbnailSize { method: Method::Scale, width, height }, + animated: false, + }), + }; + + room.client() + .event_cache_store() + .add_media_content(&media_request, thumbnail.data.clone()) + .await + .map_err(|err| RoomSendQueueError::StorageError(err.into()))?; + + let thumbnail_info = + Box::new(assign!(thumbnail.info.map(ThumbnailInfo::from).unwrap_or_default(), { + mimetype: Some(thumbnail.content_type.as_ref().to_owned()) + })); + + // Save the event sending request as a dependent request on the file upload. + let content = Room::make_attachment_event( + room.make_attachment_type( + &content_type, + filename, + file_source, + config.caption, + config.formatted_caption, + config.info, + Some((thumbnail_source, thumbnail_info)), + ), + config.mentions, + ); + + store + .save_dependent_queued_request( + room.room_id(), + &upload_file_txn, + send_event_txn.clone().into(), + DependentQueuedRequestKind::FinishUpload { + local_echo: content.clone(), + file_upload: upload_file_txn.clone(), + thumbnail_info: Some(FinishUploadThumbnailInfo { + txn: thumbnail_upload_txn.clone(), + height, + width, + }), + }, + ) + .await + .map_err(|err| RoomSendQueueError::StorageError(err.into()))?; + + // Save the file upload request as a dependent request of the thumbnail upload. + store + .save_dependent_queued_request( + room.room_id(), + &thumbnail_upload_txn, + upload_file_txn.clone().into(), + DependentQueuedRequestKind::UploadFileWithThumbnail { + content_type: content_type.to_string(), + cache_key: file_media_request, + related_to: send_event_txn.clone().into(), + }, + ) + .await + .map_err(|err| RoomSendQueueError::StorageError(err.into()))?; + + // Save the thumbnail upload request. + store + .save_send_queue_request( + room.room_id(), + thumbnail_upload_txn.clone(), + QueuedRequestKind::Upload { + content_type: thumbnail.content_type.to_string(), + cache_key: media_request, + thumbnail_source: None, + related_to: send_event_txn.clone().into(), + }, + ) + .await + .map_err(|err| RoomSendQueueError::StorageError(err.into()))?; + + (content, Some(thumbnail_upload_txn)) + } else { + // No thumbnail: only save the file upload request and send the event as a + // dependency. + let content = Room::make_attachment_event( + room.make_attachment_type( + &content_type, + filename, + file_source, + config.caption, + config.formatted_caption, + config.info, + None, + ), + config.mentions, + ); + + store + .save_dependent_queued_request( + room.room_id(), + &upload_file_txn, + send_event_txn.clone().into(), + DependentQueuedRequestKind::FinishUpload { + local_echo: content.clone(), + file_upload: upload_file_txn.clone(), + thumbnail_info: None, + }, + ) + .await + .map_err(|err| RoomSendQueueError::StorageError(err.into()))?; + + store + .save_send_queue_request( + room.room_id(), + upload_file_txn.clone(), + QueuedRequestKind::Upload { + content_type: content_type.to_string(), + cache_key: file_media_request, + thumbnail_source: None, + related_to: send_event_txn.clone().into(), + }, + ) + .await + .map_err(|err| RoomSendQueueError::StorageError(err.into()))?; + + // No thumbnail attachment. + (content, None) + }; + + let send_event_txn = OwnedTransactionId::from(send_event_txn); + trace!(event_txn = %send_event_txn, file_txn = %upload_file_txn, thumbnail_txn = ?thumbnail_txn, "manager sends a media to the background task"); + + self.inner.notifier.notify_one(); + + let _ = self.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho { + transaction_id: send_event_txn.clone(), + content: LocalEchoContent::Event { + serialized_event: SerializableEventContent::new(&event_content.into()) + .map_err(RoomSendQueueStorageError::JsonSerialization)?, + // TODO: this should be a `SendAttachmentHandle`! + send_handle: SendHandle { + room: self.clone(), + transaction_id: send_event_txn.clone(), + }, + send_error: None, + }, + })); + + Ok(SendAttachmentHandle { + _room: self.clone(), + _transaction_id: send_event_txn, + _file_upload: upload_file_txn, + _thumbnail_transaction_id: thumbnail_txn, + }) + } + /// Returns the current local requests as well as a receiver to listen to /// the send queue updates, as defined in [`RoomSendQueueUpdate`]. pub async fn subscribe( @@ -454,7 +685,10 @@ impl RoomSendQueue { } }; - trace!(txn_id = %queued_request.transaction_id, "received a request to send!"); + let txn_id = queued_request.transaction_id.clone(); + trace!(txn_id = %txn_id, "received a request to send!"); + + let related_txn_id = as_variant!(&queued_request.kind, QueuedRequestKind::Upload { related_to, .. } => related_to.clone()); let Some(room) = room.get() else { if is_dropping.load(Ordering::SeqCst) { @@ -464,18 +698,21 @@ impl RoomSendQueue { continue; }; - match Self::handle_request(&room, &queued_request).await { - Ok(parent_key) => match queue - .mark_as_sent(&queued_request.transaction_id, parent_key.clone()) - .await - { + match Self::handle_request(&room, queued_request).await { + Ok(parent_key) => match queue.mark_as_sent(&txn_id, parent_key.clone()).await { Ok(()) => match parent_key { SentRequestKey::Event(event_id) => { let _ = updates.send(RoomSendQueueUpdate::SentEvent { - transaction_id: queued_request.transaction_id, + transaction_id: txn_id, event_id, }); } + + SentRequestKey::Media { .. } => { + let _ = updates.send(RoomSendQueueUpdate::UploadedMedia { + related_to: related_txn_id.as_ref().unwrap_or(&txn_id).clone(), + }); + } }, Err(err) => { @@ -504,11 +741,11 @@ impl RoomSendQueue { }; if is_recoverable { - warn!(txn_id = %queued_request.transaction_id, error = ?err, "Recoverable error when sending request: {err}, disabling send queue"); + warn!(txn_id = %txn_id, error = ?err, "Recoverable error when sending request: {err}, disabling send queue"); // In this case, we intentionally keep the request in the queue, but mark it // as not being sent anymore. - queue.mark_as_not_being_sent(&queued_request.transaction_id).await; + queue.mark_as_not_being_sent(&txn_id).await; // Let observers know about a failure *after* we've marked the item as not // being sent anymore. Otherwise, there's a possible race where a caller @@ -520,16 +757,11 @@ impl RoomSendQueue { // disconnected, maybe the server had a hiccup). locally_enabled.store(false, Ordering::SeqCst); } else { - warn!(txn_id = %queued_request.transaction_id, error = ?err, "Unrecoverable error when sending request: {err}"); + warn!(txn_id = %txn_id, error = ?err, "Unrecoverable error when sending request: {err}"); // Mark the request as wedged, so it's not picked at any future point. - - if let Err(storage_error) = queue - .mark_as_wedged( - &queued_request.transaction_id, - QueueWedgeError::from(&err), - ) - .await + if let Err(storage_error) = + queue.mark_as_wedged(&txn_id, QueueWedgeError::from(&err)).await { warn!("unable to mark request as wedged: {storage_error}"); } @@ -544,7 +776,7 @@ impl RoomSendQueue { }); let _ = updates.send(RoomSendQueueUpdate::SendError { - transaction_id: queued_request.transaction_id, + transaction_id: related_txn_id.unwrap_or(txn_id), error, is_recoverable, }); @@ -558,9 +790,9 @@ impl RoomSendQueue { /// Handles a single request and returns the [`SentRequestKey`] on success. async fn handle_request( room: &Room, - request: &QueuedRequest, + request: QueuedRequest, ) -> Result { - match &request.kind { + match request.kind { QueuedRequestKind::Event { content } => { let (event, event_type) = content.raw(); @@ -573,6 +805,45 @@ impl RoomSendQueue { trace!(txn_id = %request.transaction_id, event_id = %res.event_id, "event successfully sent"); Ok(SentRequestKey::Event(res.event_id)) } + + QueuedRequestKind::Upload { + content_type, + cache_key, + thumbnail_source, + related_to: relates_to, + } => { + let mime = Mime::from_str(&content_type).map_err(|_| { + crate::Error::SendQueueWedgeError(QueueWedgeError::InvalidMimeType { + mime_type: content_type.clone(), + }) + })?; + + let Some(data) = + room.client().event_cache_store().get_media_content(&cache_key).await? + else { + return Err(crate::Error::SendQueueWedgeError( + QueueWedgeError::MissingMediaContent, + )); + }; + + let media_source = if room.is_encrypted().await? { + let mut cursor = Cursor::new(data); + let encrypted_file = + room.client().upload_encrypted_file(&mime, &mut cursor).await?; + MediaSource::Encrypted(Box::new(encrypted_file)) + } else { + let res = room.client().media().upload(&mime, data).await?; + MediaSource::Plain(res.content_uri) + }; + + let uri = match &media_source { + MediaSource::Plain(uri) => uri, + MediaSource::Encrypted(encrypted_file) => &encrypted_file.url, + }; + trace!(%relates_to, mxc_uri = %uri, "media successfully uploaded"); + + Ok(SentRequestKey::Media { file: media_source, thumbnail: thumbnail_source }) + } } } @@ -633,6 +904,9 @@ impl From<&crate::Error> for QueueWedgeError { } }, + // Flatten errors of `Self` type. + crate::Error::SendQueueWedgeError(error) => error.clone(), + _ => QueueWedgeError::GenericApiError { msg: value.to_string() }, } } @@ -908,8 +1182,8 @@ impl QueueStorage { let store = client.store(); let local_requests = - store.load_send_queue_requests(&self.room_id).await?.into_iter().map(|queued| { - LocalEcho { + store.load_send_queue_requests(&self.room_id).await?.into_iter().filter_map(|queued| { + Some(LocalEcho { transaction_id: queued.transaction_id.clone(), content: match queued.kind { QueuedRequestKind::Event { content } => LocalEchoContent::Event { @@ -920,33 +1194,67 @@ impl QueueStorage { }, send_error: queued.error, }, + + QueuedRequestKind::Upload { .. } => { + // Don't return uploaded medias as their own things; the accompanying + // event represented as a dependent request should be sufficient. + return None; + } }, - } + }) }); - let local_reactions = - store.load_dependent_queued_requests(&self.room_id).await?.into_iter().filter_map( - |dep| match dep.kind { - DependentQueuedRequestKind::EditEvent { .. } - | DependentQueuedRequestKind::RedactEvent => { - // TODO: reflect local edits/redacts too? - None - } - DependentQueuedRequestKind::ReactEvent { key } => Some(LocalEcho { + let reactions_and_medias = store + .load_dependent_queued_requests(&self.room_id) + .await? + .into_iter() + .filter_map(|dep| match dep.kind { + DependentQueuedRequestKind::EditEvent { .. } + | DependentQueuedRequestKind::RedactEvent => { + // TODO: reflect local edits/redacts too? + None + } + + DependentQueuedRequestKind::ReactEvent { key } => Some(LocalEcho { + transaction_id: dep.own_transaction_id.clone().into(), + content: LocalEchoContent::React { + key, + send_handle: SendReactionHandle { + room: room.clone(), + transaction_id: dep.own_transaction_id, + }, + applies_to: dep.parent_transaction_id, + }, + }), + + DependentQueuedRequestKind::UploadFileWithThumbnail { .. } => { + // Don't reflect these: only the associated event is interesting to observers. + None + } + + DependentQueuedRequestKind::FinishUpload { + local_echo, + file_upload: _, + thumbnail_info: _, + } => { + // Materialize as an event local echo. + Some(LocalEcho { transaction_id: dep.own_transaction_id.clone().into(), - content: LocalEchoContent::React { - key, - send_handle: SendReactionHandle { + content: LocalEchoContent::Event { + serialized_event: SerializableEventContent::new(&local_echo.into()) + .ok()?, + // TODO this should be a `SendAttachmentHandle`! + send_handle: SendHandle { room: room.clone(), - transaction_id: dep.own_transaction_id, + transaction_id: dep.own_transaction_id.into(), }, - applies_to: dep.parent_transaction_id, + send_error: None, }, - }), - }, - ); + }) + } + }); - Ok(local_requests.chain(local_reactions).collect()) + Ok(local_requests.chain(reactions_and_medias).collect()) } /// Try to apply a single dependent request, whether it's local or remote. @@ -1030,7 +1338,7 @@ impl QueueStorage { serializable.into(), ) .await - .map_err(RoomSendQueueStorageError::StorageError)?; + .map_err(RoomSendQueueStorageError::StateStoreError)?; } else { // The parent event is still local; update the local echo. let edited = store @@ -1040,7 +1348,7 @@ impl QueueStorage { new_content.into(), ) .await - .map_err(RoomSendQueueStorageError::StorageError)?; + .map_err(RoomSendQueueStorageError::StateStoreError)?; if !edited { warn!("missing local echo upon dependent edit"); @@ -1080,7 +1388,7 @@ impl QueueStorage { let removed = store .remove_send_queue_request(&self.room_id, &de.parent_transaction_id) .await - .map_err(RoomSendQueueStorageError::StorageError)?; + .map_err(RoomSendQueueStorageError::StateStoreError)?; if !removed { warn!("missing local echo upon dependent redact"); @@ -1112,12 +1420,173 @@ impl QueueStorage { serializable.into(), ) .await - .map_err(RoomSendQueueStorageError::StorageError)?; + .map_err(RoomSendQueueStorageError::StateStoreError)?; } else { // Not applied yet, we should retry later => false. return Ok(false); } } + + DependentQueuedRequestKind::UploadFileWithThumbnail { + content_type, + cache_key, + related_to, + } => { + let Some(parent_key) = parent_key else { + // Not finished yet. + return Ok(false); + }; + + let Some((file, thumbnail)) = as_variant!(parent_key, SentRequestKey::Media { file, thumbnail } => (file, thumbnail)) + else { + return Err(RoomSendQueueError::StorageError( + RoomSendQueueStorageError::InvalidParentKey, + )); + }; + + // The media we just uploaded was a thumbnail, so the thumbnail shouldn't have + // a thumbnail itself. + debug_assert!(thumbnail.is_none()); + if thumbnail.is_some() { + warn!("unexpected thumbnail for a thumbnail!"); + } + + let request = QueuedRequestKind::Upload { + content_type, + cache_key, + thumbnail_source: Some(file), + related_to, + }; + + store + .save_send_queue_request(&self.room_id, de.own_transaction_id.into(), request) + .await + .map_err(RoomSendQueueStorageError::StateStoreError)?; + } + + DependentQueuedRequestKind::FinishUpload { + mut local_echo, + file_upload, + thumbnail_info, + } => { + let Some(parent_key) = parent_key else { + // Not finished yet. + return Ok(false); + }; + + // Both uploads are ready: enqueue the event with its final data. + let Some((file_source, thumbnail_source)) = as_variant!(parent_key, SentRequestKey::Media { file, thumbnail } => (file, thumbnail)) + else { + return Err(RoomSendQueueError::StorageError( + RoomSendQueueStorageError::InvalidParentKey, + )); + }; + + { + // Update cache keys in the media stores, from the local ones to the remote + // ones. + + // Rename the original file. + let original_file_request = MediaRequest { + source: MediaSource::Plain(OwnedMxcUri::from(format!( + "mxc://send-queue.local/{file_upload}" + ))), + format: MediaFormat::File, + }; + + client + .event_cache_store() + .replace_media_key( + &original_file_request, + &MediaRequest { + source: file_source.clone(), + format: MediaFormat::File, + }, + ) + .await + .map_err(RoomSendQueueStorageError::EventCacheStoreError)?; + + // Rename the thumbnail too, if needs be. + if let Some(info) = thumbnail_info.as_ref() { + let original_thumbnail_source = MediaSource::Plain(OwnedMxcUri::from( + format!("mxc://send-queue.local/{}", info.txn), + )); + let format = MediaFormat::Thumbnail(MediaThumbnailSettings { + size: MediaThumbnailSize { + method: Method::Scale, + width: info.width, + height: info.height, + }, + animated: false, + }); + + client + .event_cache_store() + .replace_media_key( + &MediaRequest { + source: original_thumbnail_source, + format: format.clone(), + }, + &MediaRequest { source: file_source.clone(), format }, + ) + .await + .map_err(RoomSendQueueStorageError::EventCacheStoreError)?; + } + } + + // Replace the source by the final ones in all the medias handled by + // `Room::make_attachment_type()`. + // + // Some variants look eerily similar below, but the `event` and `info` are all + // different types… + + match &mut local_echo.msgtype { + MessageType::Audio(event) => { + event.source = file_source; + } + MessageType::File(event) => { + event.source = file_source; + if let Some(info) = event.info.as_mut() { + info.thumbnail_source = thumbnail_source; + } + } + MessageType::Image(event) => { + event.source = file_source; + if let Some(info) = event.info.as_mut() { + info.thumbnail_source = thumbnail_source; + } + } + MessageType::Video(event) => { + event.source = file_source; + if let Some(info) = event.info.as_mut() { + info.thumbnail_source = thumbnail_source; + } + } + + _ => { + // All `MessageType` created by `Room::make_attachment_type` should be + // handled here. The only way to end up here is that a message type has + // been tampered with in the database. + error!("Invalid message type in database: {}", local_echo.msgtype()); + // Only crash debug builds. + debug_assert!(false, "invalid message type in database"); + } + } + + let request = SerializableEventContent::new(&local_echo.into()) + .map_err(RoomSendQueueStorageError::JsonSerialization)?; + + // TODO: probably want to emit a room update here, of type "edit". + + store + .save_send_queue_request( + &self.room_id, + de.own_transaction_id.into(), + request.into(), + ) + .await + .map_err(RoomSendQueueStorageError::StateStoreError)?; + } } Ok(true) @@ -1134,7 +1603,7 @@ impl QueueStorage { let dependent_requests = store .load_dependent_queued_requests(&self.room_id) .await - .map_err(RoomSendQueueStorageError::StorageError)?; + .map_err(RoomSendQueueStorageError::StateStoreError)?; let num_initial_dependent_requests = dependent_requests.len(); if num_initial_dependent_requests == 0 { @@ -1153,7 +1622,7 @@ impl QueueStorage { store .remove_dependent_queued_request(&self.room_id, &original.own_transaction_id) .await - .map_err(RoomSendQueueStorageError::StorageError)?; + .map_err(RoomSendQueueStorageError::StateStoreError)?; } } @@ -1174,7 +1643,7 @@ impl QueueStorage { store .remove_dependent_queued_request(&self.room_id, &dependent_id) .await - .map_err(RoomSendQueueStorageError::StorageError)?; + .map_err(RoomSendQueueStorageError::StateStoreError)?; num_dependent_requests -= 1; } @@ -1303,6 +1772,12 @@ pub enum RoomSendQueueUpdate { /// Received event id from the send response. event_id: OwnedEventId, }, + + /// A media has been successfully uploaded. + UploadedMedia { + /// The media event this uploaded media relates to. + related_to: OwnedTransactionId, + }, } /// An error triggered by the send queue module. @@ -1328,7 +1803,11 @@ pub enum RoomSendQueueError { pub enum RoomSendQueueStorageError { /// Error caused by the state store. #[error(transparent)] - StorageError(#[from] StoreError), + StateStoreError(#[from] StoreError), + + /// Error caused by the event cache store. + #[error(transparent)] + EventCacheStoreError(#[from] EventCacheStoreError), /// Error caused when (de)serializing into/from json. #[error(transparent)] @@ -1510,16 +1989,34 @@ impl SendReactionHandle { } } +/// A handle to execute actions while sending an attachment. +/// +/// In the future, this may support cancellation, subscribing to progress, etc. +#[derive(Clone, Debug)] +pub struct SendAttachmentHandle { + /// Reference to the send queue for the room where this attachment was sent. + _room: RoomSendQueue, + + /// Transaction id for the sending of the event itself. + _transaction_id: OwnedTransactionId, + + /// Transaction id for the file upload. + _file_upload: OwnedTransactionId, + + /// Transaction id for the thumbnail upload. + _thumbnail_transaction_id: Option, +} + /// From a given source of [`DependentQueuedRequest`], return only the most /// meaningful, i.e. the ones that wouldn't be overridden after applying the /// others. fn canonicalize_dependent_requests( dependent: &[DependentQueuedRequest], ) -> Vec { - let mut by_event_id = HashMap::>::new(); + let mut by_txn = HashMap::>::new(); for d in dependent { - let prevs = by_event_id.entry(d.parent_transaction_id.clone()).or_default(); + let prevs = by_txn.entry(d.parent_transaction_id.clone()).or_default(); if prevs.iter().any(|prev| matches!(prev.kind, DependentQueuedRequestKind::RedactEvent)) { // The parent event has already been flagged for redaction, don't consider the @@ -1540,7 +2037,10 @@ fn canonicalize_dependent_requests( } } - DependentQueuedRequestKind::ReactEvent { .. } => { + DependentQueuedRequestKind::UploadFileWithThumbnail { .. } + | DependentQueuedRequestKind::FinishUpload { .. } + | DependentQueuedRequestKind::ReactEvent { .. } => { + // These requests can't be canonicalized, push them as is. prevs.push(d); } @@ -1552,10 +2052,7 @@ fn canonicalize_dependent_requests( } } - by_event_id - .into_iter() - .flat_map(|(_parent_txn_id, entries)| entries.into_iter().cloned()) - .collect() + by_txn.into_iter().flat_map(|(_parent_txn_id, entries)| entries.into_iter().cloned()).collect() } #[cfg(all(test, not(target_arch = "wasm32")))]