From a522c424ab0fe9ba02697367080805f5db450a01 Mon Sep 17 00:00:00 2001 From: Art <4998038+Alorel@users.noreply.github.com> Date: Thu, 24 Oct 2024 20:16:21 +0100 Subject: [PATCH] feat: Accept async `upgradeneeded` closures Closes #30 --- .github/workflows/test.yml | 3 + Cargo.lock | 77 ++++++++--- Cargo.toml | 7 +- internal_macros/src/generic_bounds.rs | 85 +++++++++--- src/database/version_change_listener.rs | 105 +++++++-------- src/error.rs | 8 ++ src/error/unexpected_data.rs | 9 ++ src/factory/req_builder.rs | 64 +++++---- src/future/mod.rs | 2 +- src/future/open_db.rs | 82 +++++++++--- src/future/open_db/listener.rs | 166 +++++++++++++++++++----- src/future/open_db/listeners.rs | 63 ++++++--- src/lib.rs | 2 + src/transaction.rs | 6 + src/transaction/base.rs | 10 ++ src/transaction/on_done.rs | 120 +++++++++++++++++ tests/tests/transaction/mod.rs | 3 + tests/tests/transaction/on_done.rs | 120 +++++++++++++++++ 18 files changed, 737 insertions(+), 195 deletions(-) create mode 100644 src/transaction/on_done.rs create mode 100644 tests/tests/transaction/on_done.rs diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 852393e..ee49872 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -138,6 +138,9 @@ jobs: - --features "indices cursors streams serde" - --features "dates serde" - --features "cursors serde" + - --features "async-upgrade" + - --features "tx-done" + - --features "async-upgrade tx-done" done: name: All tests diff --git a/Cargo.lock b/Cargo.lock index 8316ce6..feb8a59 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11,7 +11,7 @@ dependencies = [ "macroific", "proc-macro2", "quote", - "syn", + "syn 2.0.81", ] [[package]] @@ -108,7 +108,18 @@ dependencies = [ "macroific", "proc-macro2", "quote", - "syn", + "syn 2.0.81", +] + +[[package]] +name = "derivative" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", ] [[package]] @@ -128,7 +139,7 @@ checksum = "cb7330aeadfbe296029522e6c40f315320aba36fc43a5b3632f3795348f3bd22" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.81", "unicode-xid", ] @@ -141,7 +152,7 @@ dependencies = [ "macroific", "proc-macro2", "quote", - "syn", + "syn 2.0.81", ] [[package]] @@ -200,7 +211,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.81", ] [[package]] @@ -260,7 +271,7 @@ checksum = "0ab604ee7085efba6efc65e4ebca0e9533e3aff6cb501d7d77b211e3a781c6d5" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.81", ] [[package]] @@ -290,6 +301,7 @@ dependencies = [ "wasm-bindgen", "wasm-bindgen-futures", "wasm-bindgen-test", + "wasm_evt_listener", "web-sys", "web-time", ] @@ -301,7 +313,7 @@ dependencies = [ "macroific", "proc-macro2", "quote", - "syn", + "syn 2.0.81", ] [[package]] @@ -351,7 +363,7 @@ dependencies = [ "proc-macro2", "quote", "sealed", - "syn", + "syn 2.0.81", ] [[package]] @@ -363,7 +375,7 @@ dependencies = [ "proc-macro2", "quote", "sealed", - "syn", + "syn 2.0.81", ] [[package]] @@ -376,7 +388,7 @@ dependencies = [ "macroific_core", "proc-macro2", "quote", - "syn", + "syn 2.0.81", ] [[package]] @@ -517,7 +529,7 @@ checksum = "22f968c5ea23d555e670b449c1c5e7b2fc399fdaec1d304a17cd48e288abc107" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.81", ] [[package]] @@ -548,7 +560,7 @@ checksum = "243902eda00fad750862fc144cea25caca5e20d615af0a81bee94ca738f1df1f" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.81", ] [[package]] @@ -575,6 +587,17 @@ dependencies = [ "serde", ] +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "syn" version = "2.0.81" @@ -603,7 +626,7 @@ checksum = "08904e7672f5eb876eaaf87e0ce17857500934f4981c4a0ab2b4aa98baac7fc3" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.81", ] [[package]] @@ -625,7 +648,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.81", ] [[package]] @@ -688,7 +711,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn", + "syn 2.0.81", "wasm-bindgen-shared", ] @@ -722,7 +745,7 @@ checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.81", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -756,7 +779,25 @@ checksum = "c97b2ef2c8d627381e51c071c2ab328eac606d3f69dd82bcbca20a9e389d95f0" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.81", +] + +[[package]] +name = "wasm_evt_listener" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc92d6378b411ed94839112a36d9dbc77143451d85b05dfb0cce93a78dab1963" +dependencies = [ + "accessory", + "derivative", + "derive_more", + "fancy_constructor", + "futures-core", + "js-sys", + "smallvec", + "tokio", + "wasm-bindgen", + "web-sys", ] [[package]] @@ -880,5 +921,5 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.81", ] diff --git a/Cargo.toml b/Cargo.toml index 4e4fa2a..a7f21cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ include = [ ] [features] +async-upgrade = [] cursors = [ "web-sys/IdbCursor", "web-sys/IdbCursorWithValue", @@ -47,10 +48,12 @@ serde = [ ] streams = [ "dep:futures-core", + "wasm_evt_listener/streams", ] switch = [] +tx-done = ["dep:wasm_evt_listener"] typed-arrays = [] -version-change = ["tokio/macros"] +version-change = ["tokio/macros", "dep:wasm_evt_listener"] _serialise-deserialise-dyn = [] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -81,6 +84,7 @@ uuid = "1.8" wasm-bindgen = "0.2.95" wasm-bindgen-futures = "0.4.45" wasm-bindgen-test = "0.3.45" +wasm_evt_listener = "0.1" web-time = "1.1" web-sys = "0.3.72" @@ -105,6 +109,7 @@ thiserror = { workspace = true } tokio = { workspace = true, features = ["sync"], default-features = false } wasm-bindgen = { workspace = true } wasm-bindgen-futures = { workspace = true } +wasm_evt_listener = { workspace = true, optional = true } web-time = { workspace = true, optional = true } [dependencies.internal_macros] diff --git a/internal_macros/src/generic_bounds.rs b/internal_macros/src/generic_bounds.rs index d24a1da..cf2d0e3 100644 --- a/internal_macros/src/generic_bounds.rs +++ b/internal_macros/src/generic_bounds.rs @@ -1,33 +1,32 @@ use crate::commons::FnTarget; use crate::TokenStream1; +use macroific::prelude::*; +use proc_macro2::Ident; use quote::ToTokens; -use syn::punctuated::Punctuated; use syn::{parse_quote, Error, WherePredicate}; -#[inline] -pub(super) fn exec(spec: TokenStream1, target: TokenStream1) -> TokenStream1 { - match syn::parse::(spec) { - Ok(opts) => match syn::parse::(target.clone()) { - Ok(mut target) => { - opts.extend_target(&mut target); - target.into_token_stream().into() - } - Err(e) => on_err(target, e), - }, - Err(e) => on_err(target, e), - } -} - macro_rules! make_opts { - ($struct_name: ident => { $($($opt: ident)|+ => $predicate: ty),+ $(,)? }) => { - /// Options list + ($struct_name: ident => { + $($($opt: ident)|+ => $predicate: ty),+ + $(,[custom] => { + $($extra_opt: ident => $extra_ty: ty),+ $(,)? + })? $(,)? + }) => { + /// # Options list /// /// | Option | Predicate | /// |--------|-----------| $($(#[doc = concat!(" | `", stringify!($opt), "` | `", stringify!($predicate), "` |")])+)+ + /// + /// # Extras + /// + /// | Option | Type | + /// |--------|-----------| + $($(#[doc = concat!(" | `", stringify!($extra_opt), "` | `", stringify!($extra_ty), "` |")])+)+ #[derive(::macroific::attr_parse::AttributeOptions)] pub(super) struct $struct_name { - $($($opt: ::syn::punctuated::Punctuated),+),+ + $($($opt: ::syn::punctuated::Punctuated,)+)+ + $($($extra_opt: Option<$extra_ty>,)+)? } impl ::syn::parse::Parse for $struct_name { @@ -42,6 +41,9 @@ macro_rules! make_opts { $($(if !self.$opt.is_empty() { extend_generics(self.$opt, target, ::quote::quote!($predicate)); })+)+ + $($(if let Some($extra_opt) = self.$extra_opt { + $extra_opt.extend_target(target); + })+)? } } }; @@ -52,18 +54,61 @@ make_opts!(Opts => { db_version => crate::factory::DBVersion, blocked_cb => ::core::ops::FnOnce(crate::database::VersionChangeEvent) -> crate::Result<()> + 'static, upgrade_cb => ::core::ops::FnOnce(crate::database::VersionChangeEvent, crate::database::Database) -> crate::Result<()> + 'static, + [custom] => { + upgrade_async_cb => UpgradeAsyncCb, + }, }); +#[inline] +pub(super) fn exec(spec: TokenStream1, target: TokenStream1) -> TokenStream1 { + match syn::parse::(spec) { + Ok(opts) => match syn::parse::(target.clone()) { + Ok(mut target) => { + opts.extend_target(&mut target); + target.into_token_stream().into() + } + Err(e) => on_err(target, e), + }, + Err(e) => on_err(target, e), + } +} + +#[derive(ParseOption)] +struct UpgradeAsyncCb { + #[attr_opts(default = false)] + fun: Ident, + + #[attr_opts(default = false)] + fut: Ident, +} + +impl UpgradeAsyncCb { + fn extend_target(self, target: &mut FnTarget) { + let Self { fun, fut } = self; + let wheres = [ + parse_quote!(#fun: ::core::ops::FnOnce(crate::database::VersionChangeEvent, crate::database::Database) -> #fut + 'static), + parse_quote!(#fut: ::core::future::Future> + 'static), + ]; + + target + .generics_mut() + .make_where_clause() + .predicates + .extend::<[WherePredicate; 2]>(wheres); + } +} + fn on_err(mut target: TokenStream1, e: Error) -> TokenStream1 { let e: TokenStream1 = e.into_compile_error().into(); target.extend(e); target } -fn extend_generics(idents: Punctuated, target: &mut FnTarget, ext_with: T) +fn extend_generics(idents: Iter, target: &mut FnTarget, ext_with: T) where T: ToTokens, - I: ToTokens, + Item: ToTokens, + Iter: IntoIterator, { let iter = idents .into_iter() diff --git a/src/database/version_change_listener.rs b/src/database/version_change_listener.rs index c85d646..564d354 100644 --- a/src/database/version_change_listener.rs +++ b/src/database/version_change_listener.rs @@ -1,14 +1,8 @@ use super::{Database, VersionChangeEvent}; use crate::internal_utils::SystemRepr; use accessory::Accessors; -use tokio::sync::mpsc::{ - channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender, -}; -use wasm_bindgen::prelude::*; -use wasm_bindgen_futures::spawn_local; - -type TOnClose = Closure; -type TOnVersionChange = Closure; +use std::task::{Context, Poll}; +use wasm_evt_listener::Listener as EvtListener; const EVT_CLOSE: &str = "close"; const EVT_CHANGE: &str = "versionchange"; @@ -22,93 +16,90 @@ pub struct VersionChangeListener { /// The database associated with this listener #[access(get)] db: Database, - rx_close: Receiver<()>, - rx_change: UnboundedReceiver, - cb_close: TOnClose, - cb_version_change: TOnVersionChange, + on_close: EvtListener, + on_change: EvtListener, } impl VersionChangeListener { pub(super) fn new(db: Database) -> crate::Result { - let (tx_close, rx_close) = channel(1); - let (tx_change, rx_change) = unbounded_channel(); - - let cb_close = Self::create_on_close(tx_close); - let cb_version_change = Self::create_on_change(tx_change); + let on_close = EvtListener::builder().build()?; + let on_change = EvtListener::builder().build()?; - db.as_sys() - .add_event_listener_with_callback(EVT_CLOSE, cb_close.as_ref().unchecked_ref())?; - db.as_sys().add_event_listener_with_callback( - EVT_CHANGE, - cb_version_change.as_ref().unchecked_ref(), - )?; + on_close.add_to(EVT_CLOSE, db.as_sys())?; + on_change.add_to(EVT_CHANGE, db.as_sys())?; Ok(Self { db, - rx_close, - rx_change, - cb_close, - cb_version_change, + on_close, + on_change, }) } + /// Poll for the next event. + /// + /// Returns `None` if the database gets closed. + pub fn poll_recv(&mut self, cx: &mut Context) -> Poll> { + match self.on_change.poll_recv(cx) { + Poll::Pending => match self.on_close.poll_recv(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(_) => { + self.close(); + Poll::Ready(None) + } + }, + Poll::Ready(evt) => Poll::Ready(Some(VersionChangeEvent::new(evt))), + } + } + /// Receive the next event. /// /// Returns `None` if the database gets closed. pub async fn recv(&mut self) -> Option { tokio::select! { - _ = self.rx_close.recv() => None, - opt = self.rx_change.recv() => opt.map(VersionChangeEvent::new), + _ = self.on_close.recv() => { + self.close(); + None + }, + evt = self.on_change.recv() => Some(VersionChangeEvent::new(evt)), } } - fn create_on_change(tx: UnboundedSender) -> TOnVersionChange { - TOnVersionChange::wrap(Box::new(move |evt| { - let _ = tx.send(evt); - })) + /// Check if a `versionchange` event got emitted, return it if so. + pub fn try_recv(&mut self) -> Option { + Some(VersionChangeEvent::new(self.on_change.try_recv()?)) } - fn create_on_close(tx: Sender<()>) -> TOnClose { - TOnClose::wrap(Box::new(move || { - let tx = tx.clone(); - spawn_local(async move { - let _ = tx.send(()).await; - }); - })) + fn close(&mut self) { + self.on_close.close(); + self.on_change.close(); } } impl Drop for VersionChangeListener { fn drop(&mut self) { - let _ = self - .db - .as_sys() - .remove_event_listener_with_callback(EVT_CLOSE, self.cb_close.as_ref().unchecked_ref()); - - let _ = self.db.as_sys().remove_event_listener_with_callback( - EVT_CHANGE, - self.cb_version_change.as_ref().unchecked_ref(), - ); + let _ = self.on_close.rm_from(EVT_CLOSE, self.db.as_sys()); + let _ = self.on_change.rm_from(EVT_CHANGE, self.db.as_sys()); } } #[cfg(feature = "streams")] const _: () = { - use futures_core::Stream; + use futures_core::{FusedStream, Stream}; use std::pin::Pin; use std::task::{Context, Poll}; impl Stream for VersionChangeListener { type Item = VersionChangeEvent; + #[inline] fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.rx_change.poll_recv(cx) { - Poll::Ready(opt) => Poll::Ready(opt.map(VersionChangeEvent::new)), - Poll::Pending => match self.rx_close.poll_recv(cx) { - Poll::Ready(_) => Poll::Ready(None), - Poll::Pending => Poll::Pending, - }, - } + self.poll_recv(cx) + } + } + + impl FusedStream for VersionChangeListener { + fn is_terminated(&self) -> bool { + self.on_close.is_terminated() && self.on_change.is_terminated() } } }; diff --git a/src/error.rs b/src/error.rs index 51e98a5..9adcfee 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,5 +1,6 @@ //! Crate errors. +use std::sync::PoisonError; use wasm_bindgen::prelude::*; pub use dom_exception::DomException; @@ -69,3 +70,10 @@ impl From for Error { } } } + +impl From> for Error { + #[inline] + fn from(value: PoisonError) -> Self { + UnexpectedDataError::from(value).into() + } +} diff --git a/src/error/unexpected_data.rs b/src/error/unexpected_data.rs index 84485f9..49da5e3 100644 --- a/src/error/unexpected_data.rs +++ b/src/error/unexpected_data.rs @@ -1,3 +1,5 @@ +use std::sync::PoisonError; + /// We reached a state that shouldn't have been allowed by the API. #[derive(Debug, PartialEq, Eq, Copy, Clone, thiserror::Error)] #[non_exhaustive] @@ -32,3 +34,10 @@ pub enum UnexpectedDataError { #[error("Mutex poisoned.")] PoisonedLock, } + +impl From> for UnexpectedDataError { + #[inline] + fn from(_: PoisonError) -> Self { + Self::PoisonedLock + } +} diff --git a/src/factory/req_builder.rs b/src/factory/req_builder.rs index 28f72e4..57f5343 100644 --- a/src/factory/req_builder.rs +++ b/src/factory/req_builder.rs @@ -1,22 +1,12 @@ use super::DBFactory; use crate::error::OpenDbError; -use crate::future::OpenDbRequest; +use crate::future::{OpenDbListener, OpenDbRequest}; use internal_macros::{generic_bounds, BuildIntoFut, StructName}; use sealed::sealed; type WithFactory = OpenDbRequestBuilder; /// Database open request builder. -/// -/// # Generics -/// -/// | Generic param | Description | -/// |---|---| -/// | `N` | Database name | -/// | `V` | Database version | -/// | `B` | `blocked` event handler. | -/// | `U` | `upgradeneeded` event handler. | -/// | `Fa` | Either [`DBFactory`] or `()` if a factory should be generated on the fly | #[derive(Clone, StructName, BuildIntoFut)] #[must_use] pub struct OpenDbRequestBuilder { @@ -81,11 +71,14 @@ impl OpenDbRequestBuilder { /// Set the [blocked](https://developer.mozilla.org/en-US/docs/Web/API/IDBOpenDBRequest/blocked_event) /// event handler. #[generic_bounds(blocked_cb(B2))] - pub fn with_on_blocked(self, on_blocked: B2) -> OpenDbRequestBuilder { + pub fn with_on_blocked( + self, + on_blocked: B2, + ) -> OpenDbRequestBuilder { OpenDbRequestBuilder { name: self.name, version: self.version, - on_blocked, + on_blocked: OpenDbListener::new_blocked(on_blocked), on_upgrade_needed: self.on_upgrade_needed, factory: self.factory, } @@ -97,12 +90,29 @@ impl OpenDbRequestBuilder { pub fn with_on_upgrade_needed( self, on_upgrade_needed: U2, - ) -> OpenDbRequestBuilder { + ) -> OpenDbRequestBuilder { + OpenDbRequestBuilder { + name: self.name, + version: self.version, + on_blocked: self.on_blocked, + on_upgrade_needed: OpenDbListener::new_upgrade(on_upgrade_needed), + factory: self.factory, + } + } + + /// Set the [upgradeneeded](https://developer.mozilla.org/en-US/docs/Web/API/IDBOpenDBRequest/upgradeneeded_event) + /// event handler that returns a `Future`. + #[generic_bounds(upgrade_async_cb(fun(U2), fut(U2Fut)))] + #[cfg(feature = "async-upgrade")] + pub fn with_on_upgrade_needed_fut( + self, + on_upgrade_needed: U2, + ) -> OpenDbRequestBuilder { OpenDbRequestBuilder { name: self.name, version: self.version, on_blocked: self.on_blocked, - on_upgrade_needed, + on_upgrade_needed: OpenDbListener::new_upgrade_fut(on_upgrade_needed), factory: self.factory, } } @@ -136,9 +146,9 @@ impl crate::Build for WithFactory { } } -#[generic_bounds(db_name(N), db_version(V), upgrade_cb(U), blocked_cb(B))] +#[generic_bounds(db_name(N), db_version(V))] #[sealed] -impl crate::Build for WithFactory { +impl crate::Build for WithFactory { type Ok = OpenDbRequest; type Err = OpenDbError; @@ -154,9 +164,9 @@ impl crate::Build for WithFactory { } } -#[generic_bounds(db_name(N), upgrade_cb(U), blocked_cb(B))] +#[generic_bounds(db_name(N))] #[sealed] -impl crate::Build for WithFactory { +impl crate::Build for WithFactory { type Ok = OpenDbRequest; type Err = OpenDbError; @@ -170,9 +180,9 @@ impl crate::Build for WithFactory { } } -#[generic_bounds(db_name(N), db_version(V), upgrade_cb(U))] +#[generic_bounds(db_name(N), db_version(V))] #[sealed] -impl crate::Build for WithFactory { +impl crate::Build for WithFactory { type Ok = OpenDbRequest; type Err = OpenDbError; @@ -184,9 +194,9 @@ impl crate::Build for WithFactory { } } -#[generic_bounds(db_name(N), upgrade_cb(U))] +#[generic_bounds(db_name(N))] #[sealed] -impl crate::Build for WithFactory { +impl crate::Build for WithFactory { type Ok = OpenDbRequest; type Err = OpenDbError; @@ -196,9 +206,9 @@ impl crate::Build for WithFactory { } } -#[generic_bounds(db_name(N), db_version(V), blocked_cb(B))] +#[generic_bounds(db_name(N), db_version(V))] #[sealed] -impl crate::Build for WithFactory { +impl crate::Build for WithFactory { type Ok = OpenDbRequest; type Err = OpenDbError; @@ -210,9 +220,9 @@ impl crate::Build for WithFactory { } } -#[generic_bounds(db_name(N), blocked_cb(B))] +#[generic_bounds(db_name(N))] #[sealed] -impl crate::Build for WithFactory { +impl crate::Build for WithFactory { type Ok = OpenDbRequest; type Err = OpenDbError; diff --git a/src/future/mod.rs b/src/future/mod.rs index 9cd09ef..0ffc7cc 100644 --- a/src/future/mod.rs +++ b/src/future/mod.rs @@ -4,7 +4,7 @@ pub use array_map::ArrayMapFuture; pub use basic::BasicRequest; pub use get_all::*; pub use maybe_errored::MaybeErrored; -pub use open_db::OpenDbRequest; +pub use open_db::{OpenDbListener, OpenDbRequest}; pub use request::{Request, VoidRequest}; pub use traits::*; diff --git a/src/future/open_db.rs b/src/future/open_db.rs index ebacef8..0f4ab99 100644 --- a/src/future/open_db.rs +++ b/src/future/open_db.rs @@ -1,18 +1,19 @@ mod listener; mod listeners; +use cfg_if::cfg_if; use fancy_constructor::new; use std::task::{Context, Poll}; use wasm_bindgen::prelude::*; -use internal_macros::{generic_bounds, FutureFromPollUnpinned}; -use listener::Listener; +use internal_macros::FutureFromPollUnpinned; +pub use listener::OpenDbListener; use listeners::Listeners; use super::traits::*; use super::VoidRequest; use crate::database::Database; -use crate::error::Error; +use crate::error::{Error, UnexpectedDataError}; use crate::internal_utils::SystemRepr; /// Future for opening a database. @@ -21,31 +22,31 @@ use crate::internal_utils::SystemRepr; pub struct OpenDbRequest { req: VoidRequest, listeners: Listeners, + + #[new(val(Self::phase_poll_req))] + polling_fn: fn(&mut Self, &mut Context) -> Poll>, } impl OpenDbRequest { - #[generic_bounds(upgrade_cb(F))] - pub(crate) fn with_upgrade(req: VoidRequest, on_upgrade_needed: F) -> Self { - let listener = Listener::new_upgrade(on_upgrade_needed); + pub(crate) fn with_upgrade(req: VoidRequest, on_upgrade_needed: OpenDbListener) -> Self { let idb_req = req.as_sys().unchecked_ref(); - let listeners = Listeners::with_on_upgrade_needed(idb_req, listener); + let listeners = Listeners::with_on_upgrade_needed(idb_req, on_upgrade_needed); Self::new(req, listeners) } - #[generic_bounds(blocked_cb(F))] - pub(crate) fn with_block(req: VoidRequest, on_blocked: F) -> Self { - let listener = Listener::new_blocked(on_blocked); + pub(crate) fn with_block(req: VoidRequest, on_blocked: OpenDbListener) -> Self { let idb_req = req.as_sys().unchecked_ref(); - let listeners = Listeners::with_block(idb_req, listener); + let listeners = Listeners::with_block(idb_req, on_blocked); Self::new(req, listeners) } - #[generic_bounds(upgrade_cb(U), blocked_cb(B))] - pub(crate) fn with_both(req: VoidRequest, on_blocked: B, on_upgrade_needed: U) -> Self { - let on_blocked = Listener::new_blocked(on_blocked); - let on_upgrade_needed = Listener::new_upgrade(on_upgrade_needed); + pub(crate) fn with_both( + req: VoidRequest, + on_blocked: OpenDbListener, + on_upgrade_needed: OpenDbListener, + ) -> Self { let idb_req = req.as_sys().unchecked_ref(); let listeners = Listeners::with_both(idb_req, on_blocked, on_upgrade_needed); @@ -67,16 +68,57 @@ impl OpenDbRequest { } } +impl OpenDbRequest { + fn phase_poll_req(&mut self, cx: &mut Context) -> Poll> { + match self.req.poll_unpinned(cx) { + Poll::Ready(Ok(())) => { + cfg_if! { + if #[cfg(feature = "async-upgrade")] { + self.polling_fn = Self::phase_poll_listeners; + self.phase_poll_listeners(cx) + } else { + self.polling_fn = Self::phase_done; + Poll::Ready(self.take_ok()) + } + } + } + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => { + self.polling_fn = Self::phase_done; + Poll::Ready(self.take_err(e)) + } + } + } + + #[cfg(feature = "async-upgrade")] + fn phase_poll_listeners(&mut self, cx: &mut Context) -> Poll> { + match self.listeners.poll_unpinned(cx) { + Poll::Ready(Ok(())) => { + self.polling_fn = Self::phase_done; + Poll::Ready(self.take_ok()) + } + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => { + self.polling_fn = Self::phase_done; + Poll::Ready(self.take_err(e)) + } + } + } + + #[inline] + #[allow(clippy::unused_self)] + fn phase_done(&mut self, _: &mut Context) -> Poll> { + Poll::Ready(Err(UnexpectedDataError::PollState.into())) + } +} + #[::sealed::sealed] impl PollUnpinned for OpenDbRequest { type Output = crate::OpenDbResult; + #[inline] fn poll_unpinned(&mut self, cx: &mut Context) -> Poll { - match self.req.poll_unpinned(cx) { - Poll::Ready(Ok(())) => Poll::Ready(self.take_ok()), - Poll::Pending => Poll::Pending, - Poll::Ready(Err(e)) => Poll::Ready(self.take_err(e)), - } + (self.polling_fn)(self, cx) } } diff --git a/src/future/open_db/listener.rs b/src/future/open_db/listener.rs index 624585b..a5ea5cb 100644 --- a/src/future/open_db/listener.rs +++ b/src/future/open_db/listener.rs @@ -8,58 +8,72 @@ use wasm_bindgen::prelude::*; type TClosure = Closure Result<(), JsValue> + 'static>; +const LBL_UPGRADE: &str = "onupgradeneeded"; +const LBL_BLOCKED: &str = "onblocked"; + #[derive(Debug, Default)] pub(super) enum Status { #[default] Ok, + #[cfg(feature = "async-upgrade")] + Pending, Err(Error), Taken, } -pub(super) struct Listener { +/// A Open DB request event listener. +pub struct OpenDbListener { status: Arc>, + #[cfg(feature = "async-upgrade")] + async_notify: tokio::sync::mpsc::UnboundedReceiver<()>, listener: TClosure, } +impl Status { + fn new() -> Arc> { + Arc::new(Mutex::new(Self::default())) + } +} + impl From for crate::Result<()> { fn from(status: Status) -> Self { match status { Status::Ok => Ok(()), Status::Err(e) => Err(e), Status::Taken => Err(UnexpectedDataError::PollState.into()), + #[cfg(feature = "async-upgrade")] + Status::Pending => Err(UnexpectedDataError::PollState.into()), } } } -impl Status { - fn new() -> Arc> { - Arc::new(Mutex::new(Self::default())) - } -} - -impl Listener { +impl OpenDbListener { #[generic_bounds(upgrade_cb(F))] - pub fn new_upgrade(callback: F) -> Self { + pub(crate) fn new_upgrade(callback: F) -> Self { let status = Status::new(); Self { status: status.clone(), + #[cfg(feature = "async-upgrade")] + async_notify: Self::fake_rx(), listener: Closure::once(move |evt: web_sys::IdbVersionChangeEvent| { let res = Database::from_event(&evt) .and_then(move |db| callback(VersionChangeEvent::new(evt), db)); - Self::handle_result("onupgradeneeded", &status, res) + Self::handle_result(LBL_UPGRADE, &status, res) }), } } #[generic_bounds(blocked_cb(F))] - pub fn new_blocked(callback: F) -> Self { + pub(crate) fn new_blocked(callback: F) -> Self { let status = Status::new(); Self { status: status.clone(), + #[cfg(feature = "async-upgrade")] + async_notify: Self::fake_rx(), listener: Closure::once(move |evt: web_sys::IdbVersionChangeEvent| { let res = callback(VersionChangeEvent::new(evt)); - Self::handle_result("onblocked", &status, res) + Self::handle_result(LBL_BLOCKED, &status, res) }), } } @@ -74,16 +88,18 @@ impl Listener { { match res { Ok(()) => Ok(()), - Err(e) => { - let js_err = Self::create_error(label, &e); + Err(e) => Self::handle_error_result(label, status, e), + } + } - if let Ok(mut status) = status.lock() { - *status = Status::Err(e); - } + fn handle_error_result(label: L, status: &Mutex, e: Error) -> Result<(), JsValue> + where + L: Display, + { + let js_err = Self::create_error(&label, &e); + let _ = Self::set_status(status, Status::Err(e), label); - Err(js_err.unchecked_into()) - } - } + Err(js_err.unchecked_into()) } fn create_error(label: L, error: E) -> js_sys::Error { @@ -93,25 +109,33 @@ impl Listener { js_sys::Error::new(&msg) } - pub fn take_error(&self) -> crate::Result<()> { - match self.status.lock() { - Ok(mut lock) => { - let status = mem::replace(&mut *lock, Status::Taken); - drop(lock); - status.into() + fn set_status(mutex: &Mutex, status: Status, label: L) -> Result<(), JsValue> + where + L: Display, + { + match mutex.lock() { + Ok(mut guard) => { + *guard = status; + Ok(()) } - Err(_) => Err(UnexpectedDataError::PoisonedLock.into()), + Err(e) => Err(Self::create_error(label, &e).unchecked_into()), } } -} -impl AsRef for Listener { - fn as_ref(&self) -> &js_sys::Function { + pub(super) fn take_error(&self) -> crate::Result<()> { + self.take_status()?.into() + } + + fn take_status(&self) -> crate::Result { + Ok(mem::replace(&mut *self.status.lock()?, Status::Taken)) + } + + pub(super) fn as_fn(&self) -> &js_sys::Function { self.listener.as_ref().unchecked_ref() } } -impl Debug for Listener { +impl Debug for OpenDbListener { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self.status.lock() { Ok(lock) => Debug::fmt(&*lock, f), @@ -119,3 +143,83 @@ impl Debug for Listener { } } } + +#[cfg(feature = "async-upgrade")] +const _: () = { + use crate::future::PollUnpinned; + use std::task::{Context, Poll}; + + impl OpenDbListener { + fn fake_rx() -> tokio::sync::mpsc::UnboundedReceiver<()> { + tokio::sync::mpsc::unbounded_channel().1 + } + + #[generic_bounds(upgrade_async_cb(fun(Fn), fut(Fut)))] + pub(crate) fn new_upgrade_fut(callback: Fn) -> Self { + let status = Status::new(); + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + Self { + status: status.clone(), + async_notify: rx, + listener: Closure::once(move |evt: web_sys::IdbVersionChangeEvent| { + let db = match Database::from_event(&evt) { + Ok(db) => db, + Err(e) => return Self::handle_error_result(LBL_UPGRADE, &status, e), + }; + + Self::set_status(&status, Status::Pending, LBL_UPGRADE)?; + let fut = callback(VersionChangeEvent::new(evt), db); + + wasm_bindgen_futures::spawn_local(async move { + let result = match fut.await { + Ok(()) => Status::Ok, + Err(e) => Status::Err(e), + }; + let _ = Self::set_status(&status, result, LBL_UPGRADE); + let _ = tx.send(()); + }); + + Ok(()) + }), + } + } + + fn poll_rx(&mut self, cx: &mut Context<'_>) -> Poll> { + match self.async_notify.poll_recv(cx) { + Poll::Ready(Some(())) => { + self.async_notify.close(); + self.poll_unpinned(cx) + } + Poll::Pending => Poll::Pending, + Poll::Ready(None) => Poll::Ready(Err(UnexpectedDataError::ChannelDropped.into())), + } + } + } + + #[sealed::sealed] + #[allow(unused_qualifications)] + impl crate::future::PollUnpinned for OpenDbListener { + type Output = crate::Result<()>; + + fn poll_unpinned(&mut self, cx: &mut Context) -> Poll { + match self.status.lock() { + Ok(mut lock) => match *lock { + Status::Ok => return Poll::Ready(Ok(())), + Status::Err(_) => { + match mem::replace(&mut *lock, Status::Taken) { + Status::Err(e) => return Poll::Ready(Err(e)), + _ => unreachable!(), + }; + } + Status::Pending => {} + Status::Taken => { + return Poll::Ready(Err(UnexpectedDataError::PollState.into())) + } + }, + Err(e) => return Poll::Ready(Err(e.into())), + } + + self.poll_rx(cx) + } + } +}; diff --git a/src/future/open_db/listeners.rs b/src/future/open_db/listeners.rs index ef781d0..236ba35 100644 --- a/src/future/open_db/listeners.rs +++ b/src/future/open_db/listeners.rs @@ -1,49 +1,67 @@ -use super::Listener; +use super::OpenDbListener; use derive_more::Debug; use smallvec::SmallVec; +#[cfg(feature = "async-upgrade")] +use std::task::{Context, Poll}; + type Req = web_sys::IdbOpenDbRequest; type CleanupFn = fn(&Req); #[derive(Debug)] -pub(super) struct Listeners { - listeners: SmallVec<[Listener; 2]>, +pub(crate) struct Listeners { + listeners: SmallVec<[OpenDbListener; 2]>, #[debug(skip)] cleanup_fn: CleanupFn, } impl Listeners { - pub fn with_block(req: &Req, listener: Listener) -> Self { + pub(super) fn take_error(&self) -> crate::Result<()> { + for listener in &self.listeners { + listener.take_error()?; + } + + Ok(()) + } + + #[inline] + pub(super) fn drop_listeners(&self, req: &Req) { + (self.cleanup_fn)(req); + } +} + +impl Listeners { + pub fn with_block(req: &Req, listener: OpenDbListener) -> Self { #[inline] fn cleanup_fn(req: &Req) { req.set_onblocked(None); } - req.set_onblocked(Some(listener.as_ref())); + req.set_onblocked(Some(listener.as_fn())); Self::with_one(listener, cleanup_fn) } - pub fn with_on_upgrade_needed(req: &Req, listener: Listener) -> Self { + pub fn with_on_upgrade_needed(req: &Req, listener: OpenDbListener) -> Self { #[inline] fn cleanup_fn(req: &Req) { req.set_onupgradeneeded(None); } - req.set_onupgradeneeded(Some(listener.as_ref())); + req.set_onupgradeneeded(Some(listener.as_fn())); Self::with_one(listener, cleanup_fn) } - pub fn with_both(req: &Req, blocked: Listener, upgrade: Listener) -> Self { + pub fn with_both(req: &Req, blocked: OpenDbListener, upgrade: OpenDbListener) -> Self { fn cleanup_fn(req: &Req) { req.set_onblocked(None); req.set_onupgradeneeded(None); } - req.set_onblocked(Some(blocked.as_ref())); - req.set_onupgradeneeded(Some(upgrade.as_ref())); + req.set_onblocked(Some(blocked.as_fn())); + req.set_onupgradeneeded(Some(upgrade.as_fn())); Self { listeners: SmallVec::from_buf([blocked, upgrade]), @@ -61,7 +79,7 @@ impl Listeners { } } - fn with_one(listener: Listener, cleanup_fn: CleanupFn) -> Self { + fn with_one(listener: OpenDbListener, cleanup_fn: CleanupFn) -> Self { Self { listeners: { let mut out = SmallVec::new(); @@ -71,17 +89,22 @@ impl Listeners { cleanup_fn, } } +} - pub fn take_error(&self) -> crate::Result<()> { - for listener in &self.listeners { - listener.take_error()?; +#[cfg(feature = "async-upgrade")] +#[sealed::sealed] +impl crate::future::PollUnpinned for Listeners { + type Output = crate::Result<()>; + + fn poll_unpinned(&mut self, cx: &mut Context) -> Poll { + for listener in &mut self.listeners { + match listener.poll_unpinned(cx) { + Poll::Ready(Ok(())) => {} + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Pending => return Poll::Pending, + } } - Ok(()) - } - - #[inline] - pub fn drop_listeners(&self, req: &Req) { - (self.cleanup_fn)(req); + Poll::Ready(Ok(())) } } diff --git a/src/lib.rs b/src/lib.rs index 3416b3f..0ec688d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -205,6 +205,7 @@ //! //! | Feature | Description | //! |---------|-------------| +//! | `async-upgrade` | Enable async closures in [`upgradeneeded`](https://developer.mozilla.org/en-US/docs/Web/API/IDBOpenDBRequest/upgradeneeded_event) event listeners. | //! | `cursors` | Enable opening IndexedDB [cursors](https://developer.mozilla.org/en-US/docs/Web/API/IDBCursor). | //! | `dates` | Enable [`SystemTime`](std::time::SystemTime) & [`Date`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Date) handling. | //! | `indices` | Enable IndexedDB [indices](https://developer.mozilla.org/en-US/docs/Web/API/IDBIndex). | @@ -212,6 +213,7 @@ //! | `serde` | Enable [`serde`](::serde) integration. | //! | `streams` | Implement [`Stream`](::futures_core::Stream) where applicable. | //! | `switch` | Enable [switches](primitive::Switch2). | +//! | `tx-done` | Enable waiting for transactions to complete without consuming them. | //! | `typed-arrays` | Enable [typed array](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/TypedArray) handling. | //! | `version-change` | Enable listening for [`versionchange`](https://developer.mozilla.org/en-US/docs/Web/API/IDBDatabase/versionchange_event) events. | diff --git a/src/transaction.rs b/src/transaction.rs index c68421d..3ce7ae8 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -17,6 +17,12 @@ mod listeners; mod options; mod tx_sys; +iffeat! { + #[cfg(feature = "tx-done")] + mod on_done; + pub use on_done::{TransactionDone, TransactionFinishKind}; +} + /// An [`IDBTransaction`](https://developer.mozilla.org/en-US/docs/Web/API/IDBTransaction) implementation. /// /// Unlike JS transactions, **this defaults to aborting the transaction instead of committing it** - diff --git a/src/transaction/base.rs b/src/transaction/base.rs index 1f9fd34..b6f0038 100644 --- a/src/transaction/base.rs +++ b/src/transaction/base.rs @@ -59,6 +59,16 @@ impl<'a> TransactionRef<'a> { pub fn error(&self) -> Option { self.as_sys().error().map(Into::into) } + + /// Return a Future that resolves when the transaction finishes, successfully or not. + /// + /// The primary use case is awaiting a `versionchange` transaction. You must make sure that the transaction hasn't + /// finished already before you call this fn otherwise the future will never complete. + #[cfg(feature = "tx-done")] + #[allow(clippy::missing_errors_doc)] + pub fn on_done(&self) -> crate::Result { + super::TransactionDone::new(self.as_sys().clone()) + } } impl Debug for TransactionRef<'_> { diff --git a/src/transaction/on_done.rs b/src/transaction/on_done.rs new file mode 100644 index 0000000..b4403a4 --- /dev/null +++ b/src/transaction/on_done.rs @@ -0,0 +1,120 @@ +use super::TransactionSys; +use internal_macros::FutureFromPollUnpinned; +use sealed::sealed; +use std::task::{Context, Poll}; +use wasm_evt_listener::Listener; + +const EVT_ABORT: &str = "abort"; +const EVT_ERROR: &str = "error"; +const EVT_COMPLETE: &str = "complete"; + +/// How a transaction finished. +/// +/// Note that this enum does not include the actual error in its [`Err`](TransactionFinishKind::Err) variant - the +/// error will be propagated to and should be handled by the piece of code that spawned the transaction. +#[derive(Copy, Clone, Eq, PartialEq, Debug)] +#[repr(u8)] +pub enum TransactionFinishKind { + /// Finished with a `complete` event + Ok, + + /// Finished with an `error` event + Err, + + /// Finished with an `abort` event + Abort, +} + +/// A Future that resolves when a transaction finishes, successfully or not. +#[derive(FutureFromPollUnpinned)] +pub struct TransactionDone { + tx: TransactionSys, + on_abort: Listener, + on_error: Listener, + on_complete: Listener, +} + +impl TransactionDone { + pub(super) fn new(tx: TransactionSys) -> crate::Result { + let on_abort = Listener::builder().build()?; + let on_error = Listener::builder().build()?; + let on_complete = Listener::builder().build()?; + + on_abort.add_to(EVT_ABORT, &tx)?; + on_error.add_to(EVT_ERROR, &tx)?; + on_complete.add_to(EVT_COMPLETE, &tx)?; + + Ok(Self { + tx, + on_abort, + on_error, + on_complete, + }) + } + + fn finish_with(&mut self, kind: TransactionFinishKind) -> TransactionFinishKind { + self.on_complete.close(); + self.on_abort.close(); + self.on_error.close(); + kind + } +} + +#[sealed] +impl crate::future::PollUnpinned for TransactionDone { + type Output = TransactionFinishKind; + + fn poll_unpinned(&mut self, cx: &mut Context) -> Poll { + match self.on_complete.poll_recv(cx) { + Poll::Ready(_) => Poll::Ready(self.finish_with(TransactionFinishKind::Ok)), + Poll::Pending => match self.on_abort.poll_recv(cx) { + Poll::Ready(_) => Poll::Ready(self.finish_with(TransactionFinishKind::Abort)), + Poll::Pending => match self.on_error.poll_recv(cx) { + Poll::Ready(_) => Poll::Ready(self.finish_with(TransactionFinishKind::Err)), + Poll::Pending => Poll::Pending, + }, + }, + } + } +} + +impl Drop for TransactionDone { + fn drop(&mut self) { + let _ = self.on_abort.rm_from(EVT_ABORT, &self.tx); + let _ = self.on_error.rm_from(EVT_ERROR, &self.tx); + let _ = self.on_complete.rm_from(EVT_COMPLETE, &self.tx); + } +} + +impl TransactionFinishKind { + /// Returns `true` if the transaction finished successfully. + #[must_use] + pub const fn is_ok(self) -> bool { + matches!(self, Self::Ok) + } + + /// Convert the enum into a [`Result<()>`](crate::Result). + #[allow(clippy::missing_errors_doc)] + pub fn into_result(self) -> crate::Result<()> { + let msg = match self { + Self::Ok => return Ok(()), + Self::Err => "The transaction errored", + Self::Abort => "The transaction was aborted", + }; + Err(js_sys::Error::new(msg).into()) + } +} + +impl From for crate::Result<()> { + #[inline] + fn from(finish: TransactionFinishKind) -> Self { + finish.into_result() + } +} + +#[cfg(feature = "streams")] +impl futures_core::FusedFuture for TransactionDone { + fn is_terminated(&self) -> bool { + futures_core::FusedStream::is_terminated(&self.on_complete) + } +} diff --git a/tests/tests/transaction/mod.rs b/tests/tests/transaction/mod.rs index 2046458..54afa41 100644 --- a/tests/tests/transaction/mod.rs +++ b/tests/tests/transaction/mod.rs @@ -2,6 +2,9 @@ use crate::prelude::*; pub mod commit_rollback; +#[cfg(feature = "tx-done")] +pub mod on_done; + #[wasm_bindgen_test] pub async fn multi_store() { let db = random_db_with_init(move |_, db| { diff --git a/tests/tests/transaction/on_done.rs b/tests/tests/transaction/on_done.rs new file mode 100644 index 0000000..febb2e8 --- /dev/null +++ b/tests/tests/transaction/on_done.rs @@ -0,0 +1,120 @@ +use crate::prelude::*; +use indexed_db_futures::transaction::TransactionFinishKind; + +#[wasm_bindgen_test] +pub async fn complete() { + let db = random_db_with_store().await; + open_tx!(db, Readonly > (tx, store)); + + let on_done = tx.on_done().unwrap(); + drop(store); + tx.commit().await.unwrap(); + + assert_eq!(on_done.await, TransactionFinishKind::Ok); +} + +#[wasm_bindgen_test] +pub async fn abort() { + let db = random_db_with_store().await; + open_tx!(db, Readonly > (tx, store)); + + let on_done = tx.on_done().unwrap(); + drop(store); + tx.abort().await.unwrap(); + + assert_eq!(on_done.await, TransactionFinishKind::Abort); +} + +#[cfg(feature = "async-upgrade")] +pub mod async_upgrade { + use crate::prelude::*; + use indexed_db_futures::database::Database; + use indexed_db_futures::error::{Error, JSError, OpenDbError}; + use indexed_db_futures::transaction::TransactionMode; + use std::sync::{Arc, Mutex}; + + const STORE_NAME: &str = "foostore"; + + pub async fn upgrade_needed_err_thrown() { + const ERR_MSG: &str = "upgrade_needed_err_thrown"; + + let err = Database::open(random_str()) + .with_version(2u8) + .with_on_upgrade_needed_fut(move |_, db| async move { + // Create an object store and await its transaction + db.create_object_store(STORE_NAME) + .with_auto_increment(true) + .build()? + .transaction() + .on_done()? + .await + .into_result()?; + + Err(js_sys::Error::new(ERR_MSG).into()) + }) + .await + .unwrap_err(); + + let expect = OpenDbError::Base(Error::Unknown(JSError::Error(js_sys::Error::new(ERR_MSG)))); + assert_eq!(err, expect); + } + + #[wasm_bindgen_test] + pub async fn upgrade_needed_ok() { + #[derive(Copy, Clone, Eq, PartialEq, Debug)] + enum Event { + CallbackStart, + InitialTransaction, + InnerTransaction, + } + + let events = Arc::new(Mutex::new(Vec::with_capacity(3))); + + Database::open(random_str()) + .with_version(2u8) + .with_on_upgrade_needed_fut({ + let events = Arc::clone(&events); + move |_, db| async move { + events.lock().unwrap().push(Event::CallbackStart); + + // Create an object store and await its transaction + db.create_object_store(STORE_NAME) + .with_auto_increment(true) + .build()? + .transaction() + .on_done()? + .await + .into_result()?; + + events.lock().unwrap().push(Event::InitialTransaction); + + let tx = db + .transaction(STORE_NAME) + .with_mode(TransactionMode::Readwrite) + .build()?; + let store = tx.object_store(STORE_NAME)?; + + dyn_await!(store.add("foo"))?; + dyn_await!(store.add("bar"))?; + tx.commit().await?; + + events.lock().unwrap().push(Event::InnerTransaction); + + Ok(()) + } + }) + .await + .unwrap(); + + let events = events.lock().unwrap(); + + assert_eq!( + &*events, + &[ + Event::CallbackStart, + Event::InitialTransaction, + Event::InnerTransaction + ] + ); + } +}