diff --git a/lightning-persister/src/fs_store.rs b/lightning-persister/src/fs_store.rs index 73c24dc6fc0..72e917418d0 100644 --- a/lightning-persister/src/fs_store.rs +++ b/lightning-persister/src/fs_store.rs @@ -1,58 +1,39 @@ //! Objects related to [`FilesystemStore`] live here. +use crate::fs_store_common::{prepare_atomic_write, FilesystemStoreState, WriteOptions}; use crate::utils::{check_namespace_key_validity, is_valid_kvstore_str}; use lightning::types::string::PrintableString; use lightning::util::persist::{KVStoreSync, MigratableKVStore}; -use std::collections::HashMap; use std::fs; -use std::io::{Read, Write}; +use std::io::Read; use std::path::{Path, PathBuf}; -use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, RwLock}; #[cfg(feature = "tokio")] use core::future::Future; #[cfg(feature = "tokio")] use lightning::util::persist::KVStore; +#[cfg(not(target_os = "windows"))] +use crate::fs_store_common::finalize_atomic_write_unix; #[cfg(target_os = "windows")] -use {std::ffi::OsStr, std::os::windows::ffi::OsStrExt}; - +use crate::fs_store_common::finalize_atomic_write_windows; +#[cfg(not(target_os = "windows"))] +use crate::fs_store_common::remove_file_unix; #[cfg(target_os = "windows")] -macro_rules! call { - ($e: expr) => { - if $e != 0 { - Ok(()) - } else { - Err(std::io::Error::last_os_error()) - } - }; -} +use crate::fs_store_common::remove_file_windows; -#[cfg(target_os = "windows")] -fn path_to_windows_str>(path: &T) -> Vec { - path.as_ref().encode_wide().chain(Some(0)).collect() -} - -// The number of times we retry listing keys in `FilesystemStore::list` before we give up reaching +// The number of times we retry listing keys in `list` before we give up reaching // a consistent view and error out. const LIST_DIR_CONSISTENCY_RETRIES: usize = 10; -struct FilesystemStoreInner { - data_dir: PathBuf, - tmp_file_counter: AtomicUsize, - - // Per path lock that ensures that we don't have concurrent writes to the same file. The lock also encapsulates the - // latest written version per key. - locks: Mutex>>>, -} - /// A [`KVStore`] and [`KVStoreSync`] implementation that writes to and reads from the file system. /// /// [`KVStore`]: lightning::util::persist::KVStore pub struct FilesystemStore { - inner: Arc, + inner: Arc, // Version counter to ensure that writes are applied in the correct order. It is assumed that read and list // operations aren't sensitive to the order of execution. @@ -62,10 +43,8 @@ pub struct FilesystemStore { impl FilesystemStore { /// Constructs a new [`FilesystemStore`]. pub fn new(data_dir: PathBuf) -> Self { - let locks = Mutex::new(HashMap::new()); - let tmp_file_counter = AtomicUsize::new(0); Self { - inner: Arc::new(FilesystemStoreInner { data_dir, tmp_file_counter, locks }), + inner: Arc::new(FilesystemStoreState::new(data_dir)), next_version: AtomicU64::new(1), } } @@ -94,81 +73,11 @@ impl FilesystemStore { let outer_lock = self.inner.locks.lock().unwrap(); outer_lock.len() } -} - -impl KVStoreSync for FilesystemStore { - fn read( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, - ) -> Result, lightning::io::Error> { - let path = self.inner.get_checked_dest_file_path( - primary_namespace, - secondary_namespace, - Some(key), - "read", - )?; - self.inner.read(path) - } - - fn write( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, - ) -> Result<(), lightning::io::Error> { - let path = self.inner.get_checked_dest_file_path( - primary_namespace, - secondary_namespace, - Some(key), - "write", - )?; - let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(path.clone()); - self.inner.write_version(inner_lock_ref, path, buf, version) - } - - fn remove( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, - ) -> Result<(), lightning::io::Error> { - let path = self.inner.get_checked_dest_file_path( - primary_namespace, - secondary_namespace, - Some(key), - "remove", - )?; - let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(path.clone()); - self.inner.remove_version(inner_lock_ref, path, lazy, version) - } - - fn list( - &self, primary_namespace: &str, secondary_namespace: &str, - ) -> Result, lightning::io::Error> { - let path = self.inner.get_checked_dest_file_path( - primary_namespace, - secondary_namespace, - None, - "list", - )?; - self.inner.list(path) - } -} - -impl FilesystemStoreInner { - fn get_inner_lock_ref(&self, path: PathBuf) -> Arc> { - let mut outer_lock = self.locks.lock().unwrap(); - Arc::clone(&outer_lock.entry(path).or_default()) - } fn get_dest_dir_path( &self, primary_namespace: &str, secondary_namespace: &str, ) -> std::io::Result { - let mut dest_dir_path = { - #[cfg(target_os = "windows")] - { - let data_dir = self.data_dir.clone(); - fs::create_dir_all(data_dir.clone())?; - fs::canonicalize(data_dir)? - } - #[cfg(not(target_os = "windows"))] - { - self.data_dir.clone() - } - }; + let mut dest_dir_path = self.inner.get_base_dir_path()?; dest_dir_path.push(primary_namespace); if !secondary_namespace.is_empty() { @@ -192,11 +101,11 @@ impl FilesystemStoreInner { Ok(dest_file_path) } - fn read(&self, dest_file_path: PathBuf) -> lightning::io::Result> { + fn read_impl(&self, dest_file_path: PathBuf) -> lightning::io::Result> { let mut buf = Vec::new(); - self.execute_locked_read(dest_file_path.clone(), || { - let mut f = fs::File::open(dest_file_path)?; + self.inner.execute_locked_read(dest_file_path.clone(), || { + let mut f = fs::File::open(&dest_file_path)?; f.read_to_end(&mut buf)?; Ok(()) })?; @@ -204,216 +113,62 @@ impl FilesystemStoreInner { Ok(buf) } - fn execute_locked_write Result<(), lightning::io::Error>>( - &self, inner_lock_ref: Arc>, dest_file_path: PathBuf, version: u64, callback: F, - ) -> Result<(), lightning::io::Error> { - let res = { - let mut last_written_version = inner_lock_ref.write().unwrap(); - - // Check if we already have a newer version written/removed. This is used in async contexts to realize eventual - // consistency. - let is_stale_version = version <= *last_written_version; - - // If the version is not stale, we execute the callback. Otherwise we can and must skip writing. - if is_stale_version { - Ok(()) - } else { - callback().map(|_| { - *last_written_version = version; - }) - } - }; - - self.clean_locks(&inner_lock_ref, dest_file_path); - - res - } - - fn execute_locked_read Result<(), lightning::io::Error>>( - &self, dest_file_path: PathBuf, callback: F, - ) -> Result<(), lightning::io::Error> { - let inner_lock_ref = self.get_inner_lock_ref(dest_file_path.clone()); - let res = { - let _guard = inner_lock_ref.read().unwrap(); - callback() - }; - self.clean_locks(&inner_lock_ref, dest_file_path); - res - } - - fn clean_locks(&self, inner_lock_ref: &Arc>, dest_file_path: PathBuf) { - // If there no arcs in use elsewhere, this means that there are no in-flight writes. We can remove the map entry - // to prevent leaking memory. The two arcs that are expected are the one in the map and the one held here in - // inner_lock_ref. The outer lock is obtained first, to avoid a new arc being cloned after we've already - // counted. - let mut outer_lock = self.locks.lock().unwrap(); - - let strong_count = Arc::strong_count(&inner_lock_ref); - debug_assert!(strong_count >= 2, "Unexpected FilesystemStore strong count"); - - if strong_count == 2 { - outer_lock.remove(&dest_file_path); - } - } - /// Writes a specific version of a key to the filesystem. If a newer version has been written already, this function /// returns early without writing. fn write_version( &self, inner_lock_ref: Arc>, dest_file_path: PathBuf, buf: Vec, version: u64, ) -> lightning::io::Result<()> { - let parent_directory = dest_file_path.parent().ok_or_else(|| { - let msg = - format!("Could not retrieve parent directory of {}.", dest_file_path.display()); - std::io::Error::new(std::io::ErrorKind::InvalidInput, msg) - })?; - fs::create_dir_all(&parent_directory)?; - - // Do a crazy dance with lots of fsync()s to be overly cautious here... - // We never want to end up in a state where we've lost the old data, or end up using the - // old data on power loss after we've returned. - // The way to atomically write a file on Unix platforms is: - // open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir) - let mut tmp_file_path = dest_file_path.clone(); - let tmp_file_ext = format!("{}.tmp", self.tmp_file_counter.fetch_add(1, Ordering::AcqRel)); - tmp_file_path.set_extension(tmp_file_ext); + let options = WriteOptions::default(); + let tmp_file_path = prepare_atomic_write(&self.inner, &dest_file_path, &buf, &options)?; - { - let mut tmp_file = fs::File::create(&tmp_file_path)?; - tmp_file.write_all(&buf)?; - tmp_file.sync_all()?; - } - - self.execute_locked_write(inner_lock_ref, dest_file_path.clone(), version, || { - #[cfg(not(target_os = "windows"))] - { - fs::rename(&tmp_file_path, &dest_file_path)?; - let dir_file = fs::OpenOptions::new().read(true).open(&parent_directory)?; - dir_file.sync_all()?; - Ok(()) - } - - #[cfg(target_os = "windows")] - { - let res = if dest_file_path.exists() { - call!(unsafe { - windows_sys::Win32::Storage::FileSystem::ReplaceFileW( - path_to_windows_str(&dest_file_path).as_ptr(), - path_to_windows_str(&tmp_file_path).as_ptr(), - std::ptr::null(), - windows_sys::Win32::Storage::FileSystem::REPLACEFILE_IGNORE_MERGE_ERRORS, - std::ptr::null_mut() as *const core::ffi::c_void, - std::ptr::null_mut() as *const core::ffi::c_void, - ) - }) - } else { - call!(unsafe { - windows_sys::Win32::Storage::FileSystem::MoveFileExW( - path_to_windows_str(&tmp_file_path).as_ptr(), - path_to_windows_str(&dest_file_path).as_ptr(), - windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH - | windows_sys::Win32::Storage::FileSystem::MOVEFILE_REPLACE_EXISTING, - ) - }) - }; + self.inner + .execute_locked_write(inner_lock_ref, dest_file_path.clone(), version, || { + #[cfg(not(target_os = "windows"))] + { + finalize_atomic_write_unix(&tmp_file_path, &dest_file_path) + } - match res { - Ok(()) => { - // We fsync the dest file in hopes this will also flush the metadata to disk. - let dest_file = - fs::OpenOptions::new().read(true).write(true).open(&dest_file_path)?; - dest_file.sync_all()?; - Ok(()) - }, - Err(e) => Err(e.into()), + #[cfg(target_os = "windows")] + { + finalize_atomic_write_windows(&tmp_file_path, &dest_file_path, &options) } - } - }) + }) + .map(|_| ()) } fn remove_version( &self, inner_lock_ref: Arc>, dest_file_path: PathBuf, lazy: bool, version: u64, ) -> lightning::io::Result<()> { - self.execute_locked_write(inner_lock_ref, dest_file_path.clone(), version, || { - if !dest_file_path.is_file() { - return Ok(()); - } - - if lazy { - // If we're lazy we just call remove and be done with it. - fs::remove_file(&dest_file_path)?; - } else { - // If we're not lazy we try our best to persist the updated metadata to ensure - // atomicity of this call. - #[cfg(not(target_os = "windows"))] - { - fs::remove_file(&dest_file_path)?; - - let parent_directory = dest_file_path.parent().ok_or_else(|| { - let msg = format!( - "Could not retrieve parent directory of {}.", - dest_file_path.display() - ); - std::io::Error::new(std::io::ErrorKind::InvalidInput, msg) - })?; - let dir_file = fs::OpenOptions::new().read(true).open(parent_directory)?; - // The above call to `fs::remove_file` corresponds to POSIX `unlink`, whose changes - // to the inode might get cached (and hence possibly lost on crash), depending on - // the target platform and file system. - // - // In order to assert we permanently removed the file in question we therefore - // call `fsync` on the parent directory on platforms that support it. - dir_file.sync_all()?; + self.inner + .execute_locked_write(inner_lock_ref, dest_file_path.clone(), version, || { + if !dest_file_path.is_file() { + return Ok(()); } - #[cfg(target_os = "windows")] - { - // Since Windows `DeleteFile` API is not persisted until the last open file handle - // is dropped, and there seemingly is no reliable way to flush the directory - // metadata, we here fall back to use a 'recycling bin' model, i.e., first move the - // file to be deleted to a temporary trash file and remove the latter file - // afterwards. - // - // This should be marginally better, as, according to the documentation, - // `MoveFileExW` APIs should offer stronger persistence guarantees, - // at least if `MOVEFILE_WRITE_THROUGH`/`MOVEFILE_REPLACE_EXISTING` is set. - // However, all this is partially based on assumptions and local experiments, as - // Windows API is horribly underdocumented. - let mut trash_file_path = dest_file_path.clone(); - let trash_file_ext = - format!("{}.trash", self.tmp_file_counter.fetch_add(1, Ordering::AcqRel)); - trash_file_path.set_extension(trash_file_ext); - - call!(unsafe { - windows_sys::Win32::Storage::FileSystem::MoveFileExW( - path_to_windows_str(&dest_file_path).as_ptr(), - path_to_windows_str(&trash_file_path).as_ptr(), - windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH - | windows_sys::Win32::Storage::FileSystem::MOVEFILE_REPLACE_EXISTING, - ) - })?; - + if lazy { + // If we're lazy we just call remove and be done with it. + fs::remove_file(&dest_file_path)?; + } else { + // If we're not lazy we try our best to persist the updated metadata to ensure + // atomicity of this call. + #[cfg(not(target_os = "windows"))] { - // We fsync the trash file in hopes this will also flush the original's file - // metadata to disk. - let trash_file = fs::OpenOptions::new() - .read(true) - .write(true) - .open(&trash_file_path.clone())?; - trash_file.sync_all()?; + remove_file_unix(&dest_file_path)?; } - // We're fine if this remove would fail as the trash file will be cleaned up in - // list eventually. - fs::remove_file(trash_file_path).ok(); + #[cfg(target_os = "windows")] + { + remove_file_windows(&self.inner, &dest_file_path)?; + } } - } - Ok(()) - }) + Ok(()) + }) + .map(|_| ()) } - fn list(&self, prefixed_dest: PathBuf) -> lightning::io::Result> { + fn list_impl(&self, prefixed_dest: PathBuf) -> lightning::io::Result> { if !Path::new(&prefixed_dest).exists() { return Ok(Vec::new()); } @@ -446,7 +201,7 @@ impl FilesystemStoreInner { continue 'retry_list; } else { // For all errors or if we exhausted retries, bubble up. - return Err(e.into()); + return Err(e); } }, } @@ -458,13 +213,61 @@ impl FilesystemStoreInner { } } +impl KVStoreSync for FilesystemStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Result, lightning::io::Error> { + let path = self.get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + Some(key), + "read", + )?; + self.read_impl(path) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Result<(), lightning::io::Error> { + let path = self.get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + Some(key), + "write", + )?; + let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(path.clone()); + self.write_version(inner_lock_ref, path, buf, version) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Result<(), lightning::io::Error> { + let path = self.get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + Some(key), + "remove", + )?; + let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(path.clone()); + self.remove_version(inner_lock_ref, path, lazy, version) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Result, lightning::io::Error> { + let path = + self.get_checked_dest_file_path(primary_namespace, secondary_namespace, None, "list")?; + self.list_impl(path) + } +} + #[cfg(feature = "tokio")] impl KVStore for FilesystemStore { fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> impl Future, lightning::io::Error>> + 'static + Send { let this = Arc::clone(&self.inner); - let path = this.get_checked_dest_file_path( + let path = self.get_checked_dest_file_path( primary_namespace, secondary_namespace, Some(key), @@ -476,9 +279,18 @@ impl KVStore for FilesystemStore { Ok(path) => path, Err(e) => return Err(e), }; - tokio::task::spawn_blocking(move || this.read(path)).await.unwrap_or_else(|e| { - Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)) + + tokio::task::spawn_blocking(move || { + let mut buf = Vec::new(); + this.execute_locked_read(path.clone(), || { + let mut f = fs::File::open(&path)?; + f.read_to_end(&mut buf)?; + Ok(()) + })?; + Ok(buf) }) + .await + .unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))) } } @@ -486,17 +298,39 @@ impl KVStore for FilesystemStore { &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, ) -> impl Future> + 'static + Send { let this = Arc::clone(&self.inner); - let path = this - .get_checked_dest_file_path(primary_namespace, secondary_namespace, Some(key), "write") - .map(|path| (self.get_new_version_and_lock_ref(path.clone()), path)); + let path_result = self.get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + Some(key), + "write", + ); + let version_and_lock = path_result + .as_ref() + .ok() + .map(|path| (self.get_new_version_and_lock_ref(path.clone()), path.clone())); async move { - let ((inner_lock_ref, version), path) = match path { - Ok(res) => res, - Err(e) => return Err(e), + let ((inner_lock_ref, version), dest_file_path) = match version_and_lock { + Some(v) => v, + None => return Err(path_result.unwrap_err()), }; + tokio::task::spawn_blocking(move || { - this.write_version(inner_lock_ref, path, buf, version) + let options = WriteOptions::default(); + let tmp_file_path = prepare_atomic_write(&this, &dest_file_path, &buf, &options)?; + + this.execute_locked_write(inner_lock_ref, dest_file_path.clone(), version, || { + #[cfg(not(target_os = "windows"))] + { + finalize_atomic_write_unix(&tmp_file_path, &dest_file_path) + } + + #[cfg(target_os = "windows")] + { + finalize_atomic_write_windows(&tmp_file_path, &dest_file_path, &options) + } + }) + .map(|_| ()) }) .await .unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))) @@ -507,17 +341,46 @@ impl KVStore for FilesystemStore { &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, ) -> impl Future> + 'static + Send { let this = Arc::clone(&self.inner); - let path = this - .get_checked_dest_file_path(primary_namespace, secondary_namespace, Some(key), "remove") - .map(|path| (self.get_new_version_and_lock_ref(path.clone()), path)); + let path_result = self.get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + Some(key), + "remove", + ); + let version_and_lock = path_result + .as_ref() + .ok() + .map(|path| (self.get_new_version_and_lock_ref(path.clone()), path.clone())); async move { - let ((inner_lock_ref, version), path) = match path { - Ok(res) => res, - Err(e) => return Err(e), + let ((inner_lock_ref, version), dest_file_path) = match version_and_lock { + Some(v) => v, + None => return Err(path_result.unwrap_err()), }; + tokio::task::spawn_blocking(move || { - this.remove_version(inner_lock_ref, path, lazy, version) + this.execute_locked_write(inner_lock_ref, dest_file_path.clone(), version, || { + if !dest_file_path.is_file() { + return Ok(()); + } + + if lazy { + fs::remove_file(&dest_file_path)?; + } else { + #[cfg(not(target_os = "windows"))] + { + remove_file_unix(&dest_file_path)?; + } + + #[cfg(target_os = "windows")] + { + remove_file_windows(&this, &dest_file_path)?; + } + } + + Ok(()) + }) + .map(|_| ()) }) .await .unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))) @@ -527,19 +390,55 @@ impl KVStore for FilesystemStore { fn list( &self, primary_namespace: &str, secondary_namespace: &str, ) -> impl Future, lightning::io::Error>> + 'static + Send { - let this = Arc::clone(&self.inner); - let path = - this.get_checked_dest_file_path(primary_namespace, secondary_namespace, None, "list"); + self.get_checked_dest_file_path(primary_namespace, secondary_namespace, None, "list"); async move { let path = match path { Ok(path) => path, Err(e) => return Err(e), }; - tokio::task::spawn_blocking(move || this.list(path)).await.unwrap_or_else(|e| { - Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)) + + tokio::task::spawn_blocking(move || { + if !Path::new(&path).exists() { + return Ok(Vec::new()); + } + + let mut keys; + let mut retries = LIST_DIR_CONSISTENCY_RETRIES; + + 'retry_list: loop { + keys = Vec::new(); + 'skip_entry: for entry in fs::read_dir(&path)? { + let entry = entry?; + let p = entry.path(); + + let res = dir_entry_is_key(&entry); + match res { + Ok(true) => { + let key = get_key_from_dir_entry_path(&p, &path)?; + keys.push(key); + }, + Ok(false) => { + continue 'skip_entry; + }, + Err(e) => { + if e.kind() == lightning::io::ErrorKind::NotFound && retries > 0 { + retries -= 1; + continue 'retry_list; + } else { + return Err(e); + } + }, + } + } + break 'retry_list; + } + + Ok(keys) }) + .await + .unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))) } } } @@ -585,7 +484,7 @@ fn dir_entry_is_key(dir_entry: &fs::DirEntry) -> Result Result { - match p.strip_prefix(&base_path) { + match p.strip_prefix(base_path) { Ok(stripped_path) => { if let Some(relative_path) = stripped_path.to_str() { if is_valid_kvstore_str(relative_path) { diff --git a/lightning-persister/src/fs_store_common.rs b/lightning-persister/src/fs_store_common.rs new file mode 100644 index 00000000000..c15f3aed0ab --- /dev/null +++ b/lightning-persister/src/fs_store_common.rs @@ -0,0 +1,322 @@ +//! Common utilities shared between [`FilesystemStore`] and [`FilesystemStoreV2`]. +//! +//! [`FilesystemStore`]: crate::fs_store::FilesystemStore +//! [`FilesystemStoreV2`]: crate::fs_store_v2::FilesystemStoreV2 + +use std::collections::HashMap; +use std::fs; +use std::io::Write; +use std::path::PathBuf; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex, RwLock}; + +#[cfg(target_os = "windows")] +use std::ffi::OsStr; +#[cfg(target_os = "windows")] +use std::os::windows::ffi::OsStrExt; + +/// Calls a Windows API function and returns Ok(()) on success or the last OS error on failure. +#[cfg(target_os = "windows")] +macro_rules! call { + ($e: expr) => { + if $e != 0 { + Ok(()) + } else { + Err(std::io::Error::last_os_error()) + } + }; +} + +#[cfg(target_os = "windows")] +pub(crate) use call; + +/// Converts a path to a null-terminated wide string for Windows API calls. +#[cfg(target_os = "windows")] +pub(crate) fn path_to_windows_str>(path: &T) -> Vec { + path.as_ref().encode_wide().chain(Some(0)).collect() +} + +/// Inner state shared between sync and async operations for filesystem stores. +/// +/// This struct manages the data directory, temporary file counter, and per-path locks +/// that ensure we don't have concurrent writes to the same file. +pub(crate) struct FilesystemStoreState { + pub(crate) data_dir: PathBuf, + pub(crate) tmp_file_counter: AtomicUsize, + /// Per path lock that ensures that we don't have concurrent writes to the same file. + /// The lock also encapsulates the latest written version per key. + pub(crate) locks: Mutex>>>, +} + +impl FilesystemStoreState { + /// Creates a new `FilesystemStoreState` with the given data directory. + pub(crate) fn new(data_dir: PathBuf) -> Self { + Self { data_dir, tmp_file_counter: AtomicUsize::new(0), locks: Mutex::new(HashMap::new()) } + } + + /// Gets or creates a lock reference for the given path. + pub(crate) fn get_inner_lock_ref(&self, path: PathBuf) -> Arc> { + let mut outer_lock = self.locks.lock().unwrap(); + Arc::clone(&outer_lock.entry(path).or_default()) + } + + /// Cleans up unused locks to prevent memory leaks. + /// + /// If there are no arcs in use elsewhere (besides the map entry and the provided reference), + /// we can remove the map entry to prevent leaking memory. + pub(crate) fn clean_locks(&self, inner_lock_ref: &Arc>, dest_file_path: PathBuf) { + let mut outer_lock = self.locks.lock().unwrap(); + + let strong_count = Arc::strong_count(inner_lock_ref); + debug_assert!(strong_count >= 2, "Unexpected FilesystemStore strong count"); + + if strong_count == 2 { + outer_lock.remove(&dest_file_path); + } + } + + /// Executes a read operation while holding the read lock for the given path. + pub(crate) fn execute_locked_read Result<(), lightning::io::Error>>( + &self, dest_file_path: PathBuf, callback: F, + ) -> Result<(), lightning::io::Error> { + let inner_lock_ref = self.get_inner_lock_ref(dest_file_path.clone()); + let res = { + let _guard = inner_lock_ref.read().unwrap(); + callback() + }; + self.clean_locks(&inner_lock_ref, dest_file_path); + res + } + + /// Executes a write operation with version tracking. + /// + /// Returns `Ok(true)` if the callback was executed, `Ok(false)` if skipped due to staleness. + pub(crate) fn execute_locked_write Result<(), lightning::io::Error>>( + &self, inner_lock_ref: Arc>, lock_key: PathBuf, version: u64, callback: F, + ) -> Result { + let res = { + let mut last_written_version = inner_lock_ref.write().unwrap(); + + // Check if we already have a newer version written/removed. This is used in async + // contexts to realize eventual consistency. + let is_stale_version = version <= *last_written_version; + + // If the version is not stale, we execute the callback. Otherwise we can and must skip. + if is_stale_version { + Ok(false) + } else { + callback().map(|_| { + *last_written_version = version; + true + }) + } + }; + + self.clean_locks(&inner_lock_ref, lock_key); + + res + } + + /// Returns the base directory path for a namespace combination. + /// + /// On Windows, this canonicalizes the path after creating the data directory. + pub(crate) fn get_base_dir_path(&self) -> std::io::Result { + #[cfg(target_os = "windows")] + { + let data_dir = self.data_dir.clone(); + fs::create_dir_all(data_dir.clone())?; + fs::canonicalize(data_dir) + } + #[cfg(not(target_os = "windows"))] + { + Ok(self.data_dir.clone()) + } + } + + /// Generates a unique temporary file path based on the destination path. + pub(crate) fn get_tmp_file_path(&self, dest_file_path: &PathBuf) -> PathBuf { + let mut tmp_file_path = dest_file_path.clone(); + let tmp_file_ext = format!("{}.tmp", self.tmp_file_counter.fetch_add(1, Ordering::AcqRel)); + tmp_file_path.set_extension(tmp_file_ext); + tmp_file_path + } + + /// Generates a unique trash file path for Windows deletion operations. + #[cfg(target_os = "windows")] + pub(crate) fn get_trash_file_path(&self, dest_file_path: &PathBuf) -> PathBuf { + let mut trash_file_path = dest_file_path.clone(); + let trash_file_ext = + format!("{}.trash", self.tmp_file_counter.fetch_add(1, Ordering::AcqRel)); + trash_file_path.set_extension(trash_file_ext); + trash_file_path + } +} + +/// Options for writing a file atomically. +#[derive(Default)] +pub(crate) struct WriteOptions { + /// If set, the file's modification time will be set to this value. + pub(crate) preserve_mtime: Option, +} + +/// Writes data to a temporary file and prepares it for atomic rename. +/// +/// This handles: +/// - Creating the parent directory +/// - Writing to a temporary file +/// - Setting mtime if requested (for FilesystemStoreV2) +/// - Syncing the temp file +/// +/// Returns the temporary file path that should be renamed to the destination. +pub(crate) fn prepare_atomic_write( + state: &FilesystemStoreState, dest_file_path: &PathBuf, buf: &[u8], options: &WriteOptions, +) -> lightning::io::Result { + let parent_directory = dest_file_path.parent().ok_or_else(|| { + let msg = format!("Could not retrieve parent directory of {}.", dest_file_path.display()); + std::io::Error::new(std::io::ErrorKind::InvalidInput, msg) + })?; + fs::create_dir_all(parent_directory)?; + + let tmp_file_path = state.get_tmp_file_path(dest_file_path); + + { + let tmp_file = fs::File::create(&tmp_file_path)?; + let mut writer = std::io::BufWriter::new(&tmp_file); + writer.write_all(buf)?; + writer.flush()?; + + // If we need to preserve the original mtime (for updates), set it before fsync. + if let Some(mtime) = options.preserve_mtime { + let times = std::fs::FileTimes::new().set_modified(mtime); + tmp_file.set_times(times)?; + } + + tmp_file.sync_all()?; + } + + Ok(tmp_file_path) +} + +/// Performs the atomic rename from temp file to destination on Unix. +#[cfg(not(target_os = "windows"))] +pub(crate) fn finalize_atomic_write_unix( + tmp_file_path: &PathBuf, dest_file_path: &PathBuf, +) -> lightning::io::Result<()> { + fs::rename(tmp_file_path, dest_file_path)?; + + let parent_directory = dest_file_path.parent().ok_or_else(|| { + let msg = format!("Could not retrieve parent directory of {}.", dest_file_path.display()); + std::io::Error::new(std::io::ErrorKind::InvalidInput, msg) + })?; + + let dir_file = fs::OpenOptions::new().read(true).open(parent_directory)?; + dir_file.sync_all()?; + Ok(()) +} + +/// Performs the atomic rename from temp file to destination on Windows. +#[cfg(target_os = "windows")] +pub(crate) fn finalize_atomic_write_windows( + tmp_file_path: &PathBuf, dest_file_path: &PathBuf, options: &WriteOptions, +) -> lightning::io::Result<()> { + let res = if dest_file_path.exists() { + call!(unsafe { + windows_sys::Win32::Storage::FileSystem::ReplaceFileW( + path_to_windows_str(&dest_file_path).as_ptr(), + path_to_windows_str(&tmp_file_path).as_ptr(), + std::ptr::null(), + windows_sys::Win32::Storage::FileSystem::REPLACEFILE_IGNORE_MERGE_ERRORS, + std::ptr::null_mut() as *const core::ffi::c_void, + std::ptr::null_mut() as *const core::ffi::c_void, + ) + }) + } else { + call!(unsafe { + windows_sys::Win32::Storage::FileSystem::MoveFileExW( + path_to_windows_str(&tmp_file_path).as_ptr(), + path_to_windows_str(&dest_file_path).as_ptr(), + windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH + | windows_sys::Win32::Storage::FileSystem::MOVEFILE_REPLACE_EXISTING, + ) + }) + }; + + match res { + Ok(()) => { + // Open the destination file to fsync it and set mtime if needed. + let dest_file = fs::OpenOptions::new().read(true).write(true).open(dest_file_path)?; + + // On Windows, ReplaceFileW/MoveFileExW may not preserve the mtime we set + // on the tmp file, so we explicitly set it again here. + if let Some(mtime) = options.preserve_mtime { + let times = std::fs::FileTimes::new().set_modified(mtime); + dest_file.set_times(times)?; + } + + dest_file.sync_all()?; + Ok(()) + }, + Err(e) => Err(e.into()), + } +} + +/// Removes a file atomically on Unix with fsync on the parent directory. +#[cfg(not(target_os = "windows"))] +pub(crate) fn remove_file_unix(dest_file_path: &PathBuf) -> lightning::io::Result<()> { + fs::remove_file(dest_file_path)?; + + let parent_directory = dest_file_path.parent().ok_or_else(|| { + let msg = format!("Could not retrieve parent directory of {}.", dest_file_path.display()); + std::io::Error::new(std::io::ErrorKind::InvalidInput, msg) + })?; + let dir_file = fs::OpenOptions::new().read(true).open(parent_directory)?; + // The above call to `fs::remove_file` corresponds to POSIX `unlink`, whose changes + // to the inode might get cached (and hence possibly lost on crash), depending on + // the target platform and file system. + // + // In order to assert we permanently removed the file in question we therefore + // call `fsync` on the parent directory on platforms that support it. + dir_file.sync_all()?; + Ok(()) +} + +/// Removes a file on Windows using the trash file approach for durability. +#[cfg(target_os = "windows")] +pub(crate) fn remove_file_windows( + state: &FilesystemStoreState, dest_file_path: &PathBuf, +) -> lightning::io::Result<()> { + // Since Windows `DeleteFile` API is not persisted until the last open file handle + // is dropped, and there seemingly is no reliable way to flush the directory + // metadata, we here fall back to use a 'recycling bin' model, i.e., first move the + // file to be deleted to a temporary trash file and remove the latter file + // afterwards. + // + // This should be marginally better, as, according to the documentation, + // `MoveFileExW` APIs should offer stronger persistence guarantees, + // at least if `MOVEFILE_WRITE_THROUGH`/`MOVEFILE_REPLACE_EXISTING` is set. + // However, all this is partially based on assumptions and local experiments, as + // Windows API is horribly underdocumented. + let trash_file_path = state.get_trash_file_path(dest_file_path); + + call!(unsafe { + windows_sys::Win32::Storage::FileSystem::MoveFileExW( + path_to_windows_str(&dest_file_path).as_ptr(), + path_to_windows_str(&trash_file_path).as_ptr(), + windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH + | windows_sys::Win32::Storage::FileSystem::MOVEFILE_REPLACE_EXISTING, + ) + })?; + + { + // We fsync the trash file in hopes this will also flush the original's file + // metadata to disk. + let trash_file = fs::OpenOptions::new().read(true).write(true).open(&trash_file_path)?; + trash_file.sync_all()?; + } + + // We're fine if this remove would fail as the trash file will be cleaned up in + // list eventually. + fs::remove_file(trash_file_path).ok(); + + Ok(()) +} diff --git a/lightning-persister/src/fs_store_v2.rs b/lightning-persister/src/fs_store_v2.rs new file mode 100644 index 00000000000..01f411214ed --- /dev/null +++ b/lightning-persister/src/fs_store_v2.rs @@ -0,0 +1,1345 @@ +//! Objects related to [`FilesystemStoreV2`] live here. +use crate::fs_store_common::{prepare_atomic_write, FilesystemStoreState, WriteOptions}; +use crate::utils::{check_namespace_key_validity, is_valid_kvstore_str}; + +use lightning::util::persist::{ + KVStoreSync, MigratableKVStore, PageToken, PaginatedKVStoreSync, PaginatedListResponse, +}; + +use std::fs; +use std::io::Read; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, RwLock}; +use std::time::{SystemTime, UNIX_EPOCH}; + +#[cfg(feature = "tokio")] +use core::future::Future; +#[cfg(feature = "tokio")] +use lightning::util::persist::{KVStore, PaginatedKVStore}; + +#[cfg(not(target_os = "windows"))] +use crate::fs_store_common::finalize_atomic_write_unix; +#[cfg(target_os = "windows")] +use crate::fs_store_common::finalize_atomic_write_windows; +#[cfg(not(target_os = "windows"))] +use crate::fs_store_common::remove_file_unix; +#[cfg(target_os = "windows")] +use crate::fs_store_common::remove_file_windows; + +/// The fixed page size for paginated listing operations. +const PAGE_SIZE: usize = 50; + +/// The directory name used for empty namespaces. +/// Uses brackets which are not in KVSTORE_NAMESPACE_KEY_ALPHABET, preventing collisions +/// with valid namespace names. +const EMPTY_NAMESPACE_DIR: &str = "[empty]"; + +/// The length of the timestamp in a page token (milliseconds since epoch as 16-digit decimal). +const PAGE_TOKEN_TIMESTAMP_LEN: usize = 16; + +/// A [`KVStore`] and [`KVStoreSync`] implementation that writes to and reads from the file system. +/// +/// This is version 2 of the filesystem store which provides: +/// - Consistent directory structure using `[empty]` for empty namespaces +/// - File modification times for creation-order pagination +/// - Support for [`PaginatedKVStoreSync`] with newest-first ordering +/// +/// ## Directory Structure +/// +/// Files are stored with a consistent two-level namespace hierarchy: +/// ```text +/// data_dir/ +/// [empty]/ # empty primary namespace +/// [empty]/ # empty secondary namespace +/// {key} +/// primary_ns/ +/// [empty]/ # empty secondary namespace +/// {key} +/// secondary_ns/ +/// {key} +/// ``` +/// +/// ## File Ordering +/// +/// Files are ordered by their modification time (mtime). When a file is created, it gets +/// the current time. When updated, the original creation time is preserved by setting +/// the mtime of the new file to match the original before the atomic rename. +/// +/// [`KVStore`]: lightning::util::persist::KVStore +pub struct FilesystemStoreV2 { + inner: Arc, + + // Version counter to ensure that writes are applied in the correct order. It is assumed that read and list + // operations aren't sensitive to the order of execution. + next_version: AtomicU64, +} + +impl FilesystemStoreV2 { + /// Constructs a new [`FilesystemStoreV2`]. + pub fn new(data_dir: PathBuf) -> std::io::Result { + Ok(Self { + inner: Arc::new(FilesystemStoreState::new(data_dir)), + next_version: AtomicU64::new(1), + }) + } + + /// Returns the data directory. + pub fn get_data_dir(&self) -> PathBuf { + self.inner.data_dir.clone() + } + + fn get_new_version_and_lock_ref(&self, lock_key: PathBuf) -> (Arc>, u64) { + let version = self.next_version.fetch_add(1, Ordering::Relaxed); + if version == u64::MAX { + panic!("FilesystemStoreV2 version counter overflowed"); + } + + // Get a reference to the inner lock. We do this early so that the arc can double as an in-flight counter for + // cleaning up unused locks. + let inner_lock_ref = self.inner.get_inner_lock_ref(lock_key); + + (inner_lock_ref, version) + } + + #[cfg(any(all(feature = "tokio", test), fuzzing))] + /// Returns the size of the async state. + pub fn state_size(&self) -> usize { + let outer_lock = self.inner.locks.lock().unwrap(); + outer_lock.len() + } + + fn get_dest_dir_path( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> std::io::Result { + let mut dest_dir_path = self.inner.get_base_dir_path()?; + + // Use [empty] for empty namespaces to ensure consistent directory depth + let primary_dir = + if primary_namespace.is_empty() { EMPTY_NAMESPACE_DIR } else { primary_namespace }; + let secondary_dir = + if secondary_namespace.is_empty() { EMPTY_NAMESPACE_DIR } else { secondary_namespace }; + + dest_dir_path.push(primary_dir); + dest_dir_path.push(secondary_dir); + + Ok(dest_dir_path) + } + + /// Returns the file path for a given namespace/key combination. + fn get_file_path( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> std::io::Result { + let dir = self.get_dest_dir_path(primary_namespace, secondary_namespace)?; + Ok(dir.join(key)) + } + + fn read_impl(&self, dest_file_path: PathBuf) -> lightning::io::Result> { + let mut buf = Vec::new(); + + self.inner.execute_locked_read(dest_file_path.clone(), || { + let mut f = fs::File::open(&dest_file_path)?; + f.read_to_end(&mut buf)?; + Ok(()) + })?; + + Ok(buf) + } + + /// Writes a specific version of a key to the filesystem. If a newer version has been written already, this function + /// returns early without writing. + /// If `preserve_mtime` is Some, the file's modification time will be set to that value to preserve creation order. + /// Returns `Ok(true)` if the write was performed, `Ok(false)` if skipped due to staleness. + fn write_version( + &self, inner_lock_ref: Arc>, dest_file_path: PathBuf, buf: Vec, + preserve_mtime: Option, version: u64, + ) -> lightning::io::Result { + let options = WriteOptions { preserve_mtime }; + let tmp_file_path = prepare_atomic_write(&self.inner, &dest_file_path, &buf, &options)?; + + self.inner.execute_locked_write(inner_lock_ref, dest_file_path.clone(), version, || { + #[cfg(not(target_os = "windows"))] + { + finalize_atomic_write_unix(&tmp_file_path, &dest_file_path) + } + + #[cfg(target_os = "windows")] + { + finalize_atomic_write_windows(&tmp_file_path, &dest_file_path, &options) + } + }) + } + + /// Removes a specific version of a key from the filesystem. If a newer version has been written already, this function + /// returns early without removing. + /// Returns `Ok(true)` if the remove was performed, `Ok(false)` if skipped due to staleness. + fn remove_version( + &self, inner_lock_ref: Arc>, lock_key: PathBuf, dest_file_path: PathBuf, + lazy: bool, version: u64, + ) -> lightning::io::Result { + self.inner.execute_locked_write(inner_lock_ref, lock_key, version, || { + if !dest_file_path.is_file() { + return Ok(()); + } + + if lazy { + // If we're lazy we just call remove and be done with it. + fs::remove_file(&dest_file_path)?; + } else { + // If we're not lazy we try our best to persist the updated metadata to ensure + // atomicity of this call. + #[cfg(not(target_os = "windows"))] + { + remove_file_unix(&dest_file_path)?; + } + + #[cfg(target_os = "windows")] + { + remove_file_windows(&self.inner, &dest_file_path)?; + } + } + + Ok(()) + }) + } + + fn list_impl(&self, prefixed_dest: PathBuf) -> lightning::io::Result> { + if !Path::new(&prefixed_dest).exists() { + return Ok(Vec::new()); + } + + let mut keys = Vec::new(); + for entry in fs::read_dir(&prefixed_dest)? { + let entry = entry?; + let path = entry.path(); + + if let Some(key) = entry_to_key(&path) { + keys.push(key); + } + } + + Ok(keys) + } + + fn list_paginated_impl( + &self, prefixed_dest: PathBuf, page_token: Option, + ) -> lightning::io::Result { + if !Path::new(&prefixed_dest).exists() { + return Ok(PaginatedListResponse { keys: Vec::new(), next_page_token: None }); + } + + // Collect all entries with their modification times + let mut entries: Vec<(u64, String)> = Vec::new(); + for entry in fs::read_dir(&prefixed_dest)? { + let entry = entry?; + let path = entry.path(); + + if let Some(key) = entry_to_key(&path) { + // Get modification time as millis since epoch + let mtime_millis = entry + .metadata() + .ok() + .and_then(|m| m.modified().ok()) + .and_then(|t| t.duration_since(UNIX_EPOCH).ok()) + .map(|d| d.as_millis() as u64) + .unwrap_or(0); + + entries.push((mtime_millis, key)); + } + } + + // Sort by mtime descending (newest first), then by key descending for same mtime + entries.sort_by(|a, b| b.0.cmp(&a.0).then_with(|| b.1.cmp(&a.1))); + + // Find starting position based on page token + let start_idx = if let Some(token) = page_token { + let (token_mtime, token_key) = parse_page_token(&token.0)?; + + // Find entries that come after the token (older entries = lower mtime) + // or same mtime but lexicographically smaller key (since we sort descending) + entries + .iter() + .position(|(mtime, key)| { + *mtime < token_mtime + || (*mtime == token_mtime && key.as_str() < token_key.as_str()) + }) + .unwrap_or(entries.len()) + } else { + 0 + }; + + // Take PAGE_SIZE entries starting from start_idx + let page_entries: Vec<_> = + entries.iter().skip(start_idx).take(PAGE_SIZE).cloned().collect(); + + let keys: Vec = page_entries.iter().map(|(_, key)| key.clone()).collect(); + + // Determine next page token + let next_page_token = if start_idx + PAGE_SIZE < entries.len() { + page_entries.last().map(|(mtime, key)| PageToken(format_page_token(*mtime, key))) + } else { + None + }; + + Ok(PaginatedListResponse { keys, next_page_token }) + } +} + +impl KVStoreSync for FilesystemStoreV2 { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Result, lightning::io::Error> { + check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?; + + let file_path = self.get_file_path(primary_namespace, secondary_namespace, key)?; + self.read_impl(file_path) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Result<(), lightning::io::Error> { + check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?; + + let dest_file_path = self.get_file_path(primary_namespace, secondary_namespace, key)?; + + // Get the existing file's mtime if it exists (to preserve creation order on update) + let existing_mtime = fs::metadata(&dest_file_path).ok().and_then(|m| m.modified().ok()); + + let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(dest_file_path.clone()); + self.write_version(inner_lock_ref, dest_file_path, buf, existing_mtime, version).map(|_| ()) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Result<(), lightning::io::Error> { + check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "remove")?; + + let file_path = self.get_file_path(primary_namespace, secondary_namespace, key)?; + + if !file_path.exists() { + // File doesn't exist, nothing to remove + return Ok(()); + } + + let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(file_path.clone()); + self.remove_version(inner_lock_ref, file_path.clone(), file_path, lazy, version).map(|_| ()) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Result, lightning::io::Error> { + check_namespace_key_validity(primary_namespace, secondary_namespace, None, "list")?; + + let dest_dir_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?; + self.list_impl(dest_dir_path) + } +} + +impl PaginatedKVStoreSync for FilesystemStoreV2 { + fn list_paginated( + &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option, + ) -> Result { + check_namespace_key_validity( + primary_namespace, + secondary_namespace, + None, + "list_paginated", + )?; + + let dest_dir_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?; + self.list_paginated_impl(dest_dir_path, page_token) + } +} + +/// Extracts key from a path if it's a valid key file. +fn entry_to_key(path: &Path) -> Option { + if let Some(ext) = path.extension() { + #[cfg(target_os = "windows")] + { + // Clean up any trash files lying around. + if ext == "trash" { + fs::remove_file(path).ok(); + return None; + } + } + if ext == "tmp" { + return None; + } + } + + if !path.is_file() { + return None; + } + + path.file_name().and_then(|n| n.to_str()).and_then(|key| { + if is_valid_kvstore_str(key) { + Some(key.to_string()) + } else { + None + } + }) +} + +/// Formats a page token from mtime (millis since epoch) and key. +fn format_page_token(mtime_millis: u64, key: &str) -> String { + format!("{:016}:{}", mtime_millis, key) +} + +/// Parses a page token into mtime (millis since epoch) and key. +fn parse_page_token(token: &str) -> lightning::io::Result<(u64, String)> { + let colon_pos = token.find(':').ok_or_else(|| { + lightning::io::Error::new( + lightning::io::ErrorKind::InvalidInput, + "Invalid page token format", + ) + })?; + + if colon_pos != PAGE_TOKEN_TIMESTAMP_LEN { + return Err(lightning::io::Error::new( + lightning::io::ErrorKind::InvalidInput, + "Invalid page token format", + )); + } + + let mtime = token[..colon_pos].parse::().map_err(|_| { + lightning::io::Error::new( + lightning::io::ErrorKind::InvalidInput, + "Invalid page token timestamp", + ) + })?; + + let key = token[colon_pos + 1..].to_string(); + + Ok((mtime, key)) +} + +#[cfg(feature = "tokio")] +impl KVStore for FilesystemStoreV2 { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> impl Future, lightning::io::Error>> + 'static + Send { + let this = Arc::clone(&self.inner); + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + let validation = check_namespace_key_validity( + &primary_namespace, + &secondary_namespace, + Some(&key), + "read", + ); + let file_path = self.get_file_path(&primary_namespace, &secondary_namespace, &key); + + async move { + validation?; + let file_path = file_path + .map_err(|e| lightning::io::Error::new(lightning::io::ErrorKind::Other, e))?; + + tokio::task::spawn_blocking(move || { + let mut buf = Vec::new(); + this.execute_locked_read(file_path.clone(), || { + let mut f = fs::File::open(&file_path)?; + f.read_to_end(&mut buf)?; + Ok(()) + })?; + Ok(buf) + }) + .await + .unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))) + } + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> impl Future> + 'static + Send { + let this = Arc::clone(&self.inner); + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key_str = key.to_string(); + let validation = check_namespace_key_validity( + &primary_namespace, + &secondary_namespace, + Some(&key_str), + "write", + ); + + let dest_file_path = self.get_file_path(&primary_namespace, &secondary_namespace, &key_str); + let (inner_lock_ref, version) = match &dest_file_path { + Ok(path) => self.get_new_version_and_lock_ref(path.clone()), + Err(_) => { + // We'll error out below, but we need placeholder values + (Arc::new(RwLock::new(0)), 0) + }, + }; + + async move { + validation?; + let dest_file_path = dest_file_path + .map_err(|e| lightning::io::Error::new(lightning::io::ErrorKind::Other, e))?; + + tokio::task::spawn_blocking(move || { + // Get the existing file's mtime if it exists (to preserve creation order on update) + let existing_mtime = + fs::metadata(&dest_file_path).ok().and_then(|m| m.modified().ok()); + + let options = WriteOptions { preserve_mtime: existing_mtime }; + let tmp_file_path = prepare_atomic_write(&this, &dest_file_path, &buf, &options)?; + + this.execute_locked_write(inner_lock_ref, dest_file_path.clone(), version, || { + #[cfg(not(target_os = "windows"))] + { + finalize_atomic_write_unix(&tmp_file_path, &dest_file_path) + } + + #[cfg(target_os = "windows")] + { + finalize_atomic_write_windows(&tmp_file_path, &dest_file_path, &options) + } + }) + .map(|_| ()) + }) + .await + .unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))) + } + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> impl Future> + 'static + Send { + let this = Arc::clone(&self.inner); + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key_str = key.to_string(); + let validation = check_namespace_key_validity( + &primary_namespace, + &secondary_namespace, + Some(&key_str), + "remove", + ); + + let file_path = self.get_file_path(&primary_namespace, &secondary_namespace, &key_str); + let (inner_lock_ref, version) = match &file_path { + Ok(path) => self.get_new_version_and_lock_ref(path.clone()), + Err(_) => (Arc::new(RwLock::new(0)), 0), + }; + + async move { + validation?; + let file_path = file_path + .map_err(|e| lightning::io::Error::new(lightning::io::ErrorKind::Other, e))?; + + tokio::task::spawn_blocking(move || { + if !file_path.exists() { + // File doesn't exist, but we still need to clean up the lock + this.clean_locks(&inner_lock_ref, file_path); + return Ok(()); + } + + this.execute_locked_write(inner_lock_ref, file_path.clone(), version, || { + if !file_path.is_file() { + return Ok(()); + } + + if lazy { + fs::remove_file(&file_path)?; + } else { + #[cfg(not(target_os = "windows"))] + { + remove_file_unix(&file_path)?; + } + + #[cfg(target_os = "windows")] + { + remove_file_windows(&this, &file_path)?; + } + } + + Ok(()) + }) + .map(|_| ()) + }) + .await + .unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))) + } + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> impl Future, lightning::io::Error>> + 'static + Send { + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let validation = + check_namespace_key_validity(&primary_namespace, &secondary_namespace, None, "list"); + let dest_dir_path = self.get_dest_dir_path(&primary_namespace, &secondary_namespace); + + async move { + validation?; + let path = dest_dir_path + .map_err(|e| lightning::io::Error::new(lightning::io::ErrorKind::Other, e))?; + + tokio::task::spawn_blocking(move || { + if !Path::new(&path).exists() { + return Ok(Vec::new()); + } + + let mut keys = Vec::new(); + for entry in fs::read_dir(&path)? { + let entry = entry?; + let entry_path = entry.path(); + + if let Some(key) = entry_to_key(&entry_path) { + keys.push(key); + } + } + + Ok(keys) + }) + .await + .unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))) + } + } +} + +#[cfg(feature = "tokio")] +impl PaginatedKVStore for FilesystemStoreV2 { + fn list_paginated( + &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option, + ) -> impl Future> + 'static + Send + { + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let validation = check_namespace_key_validity( + &primary_namespace, + &secondary_namespace, + None, + "list_paginated", + ); + let dest_dir_path = self.get_dest_dir_path(&primary_namespace, &secondary_namespace); + + async move { + validation?; + let path = dest_dir_path + .map_err(|e| lightning::io::Error::new(lightning::io::ErrorKind::Other, e))?; + + tokio::task::spawn_blocking(move || { + if !Path::new(&path).exists() { + return Ok(PaginatedListResponse { keys: Vec::new(), next_page_token: None }); + } + + // Collect all entries with their modification times + let mut entries: Vec<(u64, String)> = Vec::new(); + for entry in fs::read_dir(&path)? { + let entry = entry?; + let entry_path = entry.path(); + + if let Some(key) = entry_to_key(&entry_path) { + let mtime_millis = entry + .metadata() + .ok() + .and_then(|m| m.modified().ok()) + .and_then(|t| t.duration_since(UNIX_EPOCH).ok()) + .map(|d| d.as_millis() as u64) + .unwrap_or(0); + + entries.push((mtime_millis, key)); + } + } + + // Sort by mtime descending (newest first), then by key descending for same mtime + entries.sort_by(|a, b| b.0.cmp(&a.0).then_with(|| b.1.cmp(&a.1))); + + // Find starting position based on page token + let start_idx = if let Some(token) = page_token { + let (token_mtime, token_key) = parse_page_token(&token.0)?; + + entries + .iter() + .position(|(mtime, key)| { + *mtime < token_mtime + || (*mtime == token_mtime && key.as_str() < token_key.as_str()) + }) + .unwrap_or(entries.len()) + } else { + 0 + }; + + // Take PAGE_SIZE entries starting from start_idx + let page_entries: Vec<_> = + entries.iter().skip(start_idx).take(PAGE_SIZE).cloned().collect(); + + let keys: Vec = page_entries.iter().map(|(_, key)| key.clone()).collect(); + + // Determine next page token + let next_page_token = if start_idx + PAGE_SIZE < entries.len() { + page_entries + .last() + .map(|(mtime, key)| PageToken(format_page_token(*mtime, key))) + } else { + None + }; + + Ok(PaginatedListResponse { keys, next_page_token }) + }) + .await + .unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))) + } + } +} + +impl MigratableKVStore for FilesystemStoreV2 { + fn list_all_keys(&self) -> Result, lightning::io::Error> { + let prefixed_dest = &self.inner.data_dir; + if !prefixed_dest.exists() { + return Ok(Vec::new()); + } + + let mut keys = Vec::new(); + + for primary_entry in fs::read_dir(prefixed_dest)? { + let primary_entry = primary_entry?; + let primary_path = primary_entry.path(); + + if !primary_path.is_dir() { + // Skip non-directory entries at the root level + continue; + } + + let primary_namespace = match primary_path.file_name().and_then(|n| n.to_str()) { + Some(EMPTY_NAMESPACE_DIR) => String::new(), + Some(name) if is_valid_kvstore_str(name) => name.to_string(), + _ => continue, + }; + + for secondary_entry in fs::read_dir(&primary_path)? { + let secondary_entry = secondary_entry?; + let secondary_path = secondary_entry.path(); + + if !secondary_path.is_dir() { + // Skip non-directory entries at the secondary level + continue; + } + + let secondary_namespace = match secondary_path.file_name().and_then(|n| n.to_str()) + { + Some(EMPTY_NAMESPACE_DIR) => String::new(), + Some(name) if is_valid_kvstore_str(name) => name.to_string(), + _ => continue, + }; + + // Read all key files in this namespace + for key_entry in fs::read_dir(&secondary_path)? { + let key_entry = key_entry?; + let key_path = key_entry.path(); + + if let Some(key) = entry_to_key(&key_path) { + keys.push((primary_namespace.clone(), secondary_namespace.clone(), key)); + } + } + } + } + + Ok(keys) + } +} + +/// Migrates all data from a [`FilesystemStore`] (v1) to a [`FilesystemStoreV2`]. +/// +/// This function reads all keys from the source v1 store and writes them to the target v2 store. +/// The v2 store will use the new directory structure with `[empty]` markers for empty namespaces. +/// +/// # Arguments +/// +/// * `source` - The source v1 filesystem store to migrate from +/// * `target` - The target v2 filesystem store to migrate to +/// +/// # Errors +/// +/// Returns an error if any read or write operation fails. Note that in case of an error, +/// the target store may be left in a partially migrated state. +/// +/// # Example +/// +/// ```no_run +/// use lightning_persister::fs_store::FilesystemStore; +/// use lightning_persister::fs_store_v2::{FilesystemStoreV2, migrate_v1_to_v2}; +/// use std::path::PathBuf; +/// +/// let v1_store = FilesystemStore::new(PathBuf::from("/path/to/v1/data")); +/// let v2_store = FilesystemStoreV2::new(PathBuf::from("/path/to/v2/data")) +/// .expect("Failed to open v2 store"); +/// +/// migrate_v1_to_v2(&v1_store, &v2_store).expect("Migration failed"); +/// ``` +/// +/// [`FilesystemStore`]: crate::fs_store::FilesystemStore +pub fn migrate_v1_to_v2( + source: &S, target: &FilesystemStoreV2, +) -> Result<(), lightning::io::Error> { + let keys_to_migrate = source.list_all_keys()?; + + for (primary_namespace, secondary_namespace, key) in &keys_to_migrate { + let data = source.read(primary_namespace, secondary_namespace, key)?; + KVStoreSync::write(target, primary_namespace, secondary_namespace, key, data)?; + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_utils::{ + do_read_write_remove_list_persist, do_test_data_migration, do_test_store, + }; + use std::fs::FileTimes; + + impl Drop for FilesystemStoreV2 { + fn drop(&mut self) { + // We test for invalid directory names, so it's OK if directory removal + // fails. + match fs::remove_dir_all(&self.inner.data_dir) { + Err(e) => println!("Failed to remove test persister directory: {}", e), + _ => {}, + } + } + } + + #[test] + fn read_write_remove_list_persist() { + let mut temp_path = std::env::temp_dir(); + temp_path.push("test_read_write_remove_list_persist_v2"); + let fs_store = FilesystemStoreV2::new(temp_path).unwrap(); + do_read_write_remove_list_persist(&fs_store); + } + + #[cfg(feature = "tokio")] + #[tokio::test] + async fn read_write_remove_list_persist_async() { + use lightning::util::persist::KVStore; + use std::sync::Arc; + + let mut temp_path = std::env::temp_dir(); + temp_path.push("test_read_write_remove_list_persist_async_v2"); + let fs_store = Arc::new(FilesystemStoreV2::new(temp_path).unwrap()); + assert_eq!(fs_store.state_size(), 0); + + let async_fs_store = Arc::clone(&fs_store); + + let data1 = vec![42u8; 32]; + let data2 = vec![43u8; 32]; + + let primary = "testspace"; + let secondary = "testsubspace"; + let key = "testkey"; + + // Test writing the same key twice with different data. Execute the asynchronous part out of order to ensure + // that eventual consistency works. + let fut1 = KVStore::write(&*async_fs_store, primary, secondary, key, data1); + assert_eq!(fs_store.state_size(), 1); + + let fut2 = KVStore::remove(&*async_fs_store, primary, secondary, key, false); + assert_eq!(fs_store.state_size(), 1); + + let fut3 = KVStore::write(&*async_fs_store, primary, secondary, key, data2.clone()); + assert_eq!(fs_store.state_size(), 1); + + fut3.await.unwrap(); + assert_eq!(fs_store.state_size(), 1); + + fut2.await.unwrap(); + assert_eq!(fs_store.state_size(), 1); + + fut1.await.unwrap(); + assert_eq!(fs_store.state_size(), 0); + + // Test list. + let listed_keys = KVStore::list(&*async_fs_store, primary, secondary).await.unwrap(); + assert_eq!(listed_keys.len(), 1); + assert_eq!(listed_keys[0], key); + + // Test read. We expect to read data2, as the write call was initiated later. + let read_data = KVStore::read(&*async_fs_store, primary, secondary, key).await.unwrap(); + assert_eq!(data2, &*read_data); + + // Test remove. + KVStore::remove(&*async_fs_store, primary, secondary, key, false).await.unwrap(); + + let listed_keys = KVStore::list(&*async_fs_store, primary, secondary).await.unwrap(); + assert_eq!(listed_keys.len(), 0); + } + + #[test] + fn test_data_migration() { + let mut source_temp_path = std::env::temp_dir(); + source_temp_path.push("test_data_migration_source_v2"); + let mut source_store = FilesystemStoreV2::new(source_temp_path).unwrap(); + + let mut target_temp_path = std::env::temp_dir(); + target_temp_path.push("test_data_migration_target_v2"); + let mut target_store = FilesystemStoreV2::new(target_temp_path).unwrap(); + + do_test_data_migration(&mut source_store, &mut target_store); + } + + #[test] + fn test_v1_to_v2_migration() { + use crate::fs_store::FilesystemStore; + use lightning::util::persist::KVStoreSync; + + // Create v1 store and populate with test data + let mut v1_path = std::env::temp_dir(); + v1_path.push("test_v1_to_v2_migration_source"); + let v1_store = FilesystemStore::new(v1_path.clone()); + + let data = vec![42u8; 32]; + + // Write data with various namespace combinations + KVStoreSync::write(&v1_store, "", "", "root_key", data.clone()).unwrap(); + KVStoreSync::write(&v1_store, "primary", "", "primary_key", data.clone()).unwrap(); + KVStoreSync::write(&v1_store, "primary", "secondary", "nested_key", data.clone()).unwrap(); + + // Create v2 store + let mut v2_path = std::env::temp_dir(); + v2_path.push("test_v1_to_v2_migration_target"); + let v2_store = FilesystemStoreV2::new(v2_path.clone()).unwrap(); + + // Migrate + migrate_v1_to_v2(&v1_store, &v2_store).unwrap(); + + // Verify all data was migrated correctly + assert_eq!(KVStoreSync::read(&v2_store, "", "", "root_key").unwrap(), data); + assert_eq!(KVStoreSync::read(&v2_store, "primary", "", "primary_key").unwrap(), data); + assert_eq!( + KVStoreSync::read(&v2_store, "primary", "secondary", "nested_key").unwrap(), + data + ); + + // Verify v2 directory structure uses [empty] for empty namespaces + assert!(v2_path.join(EMPTY_NAMESPACE_DIR).join(EMPTY_NAMESPACE_DIR).exists()); + assert!(v2_path.join("primary").join(EMPTY_NAMESPACE_DIR).exists()); + assert!(v2_path.join("primary").join("secondary").exists()); + + // Verify list_all_keys works on the migrated data + let mut all_keys = v2_store.list_all_keys().unwrap(); + all_keys.sort(); + assert_eq!(all_keys.len(), 3); + assert!(all_keys.contains(&("".to_string(), "".to_string(), "root_key".to_string()))); + assert!(all_keys.contains(&( + "primary".to_string(), + "".to_string(), + "primary_key".to_string() + ))); + assert!(all_keys.contains(&( + "primary".to_string(), + "secondary".to_string(), + "nested_key".to_string() + ))); + } + + #[test] + fn test_v1_to_v2_migration_empty_store() { + use crate::fs_store::FilesystemStore; + + // Create empty v1 store + let mut v1_path = std::env::temp_dir(); + v1_path.push("test_v1_to_v2_migration_empty_source"); + let v1_store = FilesystemStore::new(v1_path); + + // Create v2 store + let mut v2_path = std::env::temp_dir(); + v2_path.push("test_v1_to_v2_migration_empty_target"); + let v2_store = FilesystemStoreV2::new(v2_path).unwrap(); + + // Migrate empty store should succeed + migrate_v1_to_v2(&v1_store, &v2_store).unwrap(); + + // Verify no keys exist + let all_keys = v2_store.list_all_keys().unwrap(); + assert_eq!(all_keys.len(), 0); + } + + #[test] + fn test_v1_to_v2_migration_data_integrity() { + use crate::fs_store::FilesystemStore; + use lightning::util::persist::KVStoreSync; + + // Create v1 store with different data for each key + let mut v1_path = std::env::temp_dir(); + v1_path.push("test_v1_to_v2_migration_integrity_source"); + let v1_store = FilesystemStore::new(v1_path); + + // Write unique data for each key + let data1 = vec![1u8; 100]; + let data2 = vec![2u8; 200]; + let data3 = vec![3u8; 50]; + let data4 = (0..255u8).collect::>(); // All byte values + + KVStoreSync::write(&v1_store, "", "", "key1", data1.clone()).unwrap(); + KVStoreSync::write(&v1_store, "ns1", "", "key2", data2.clone()).unwrap(); + KVStoreSync::write(&v1_store, "ns1", "ns2", "key3", data3.clone()).unwrap(); + KVStoreSync::write(&v1_store, "ns1", "ns2", "key4", data4.clone()).unwrap(); + + // Create v2 store and migrate + let mut v2_path = std::env::temp_dir(); + v2_path.push("test_v1_to_v2_migration_integrity_target"); + let v2_store = FilesystemStoreV2::new(v2_path).unwrap(); + + migrate_v1_to_v2(&v1_store, &v2_store).unwrap(); + + // Verify each key has exactly the right data + assert_eq!(KVStoreSync::read(&v2_store, "", "", "key1").unwrap(), data1); + assert_eq!(KVStoreSync::read(&v2_store, "ns1", "", "key2").unwrap(), data2); + assert_eq!(KVStoreSync::read(&v2_store, "ns1", "ns2", "key3").unwrap(), data3); + assert_eq!(KVStoreSync::read(&v2_store, "ns1", "ns2", "key4").unwrap(), data4); + } + + #[test] + fn test_v1_to_v2_migration_many_keys() { + use crate::fs_store::FilesystemStore; + use lightning::util::persist::{KVStoreSync, PaginatedKVStoreSync}; + + // Create v1 store with many keys + let mut v1_path = std::env::temp_dir(); + v1_path.push("test_v1_to_v2_migration_many_source"); + let v1_store = FilesystemStore::new(v1_path); + + let num_keys = 75; // More than one page (PAGE_SIZE = 50) + for i in 0..num_keys { + let key = format!("key_{:04}", i); + let data = vec![i as u8; 32]; + KVStoreSync::write(&v1_store, "bulk", "test", &key, data).unwrap(); + } + + // Create v2 store and migrate + let mut v2_path = std::env::temp_dir(); + v2_path.push("test_v1_to_v2_migration_many_target"); + let v2_store = FilesystemStoreV2::new(v2_path).unwrap(); + + migrate_v1_to_v2(&v1_store, &v2_store).unwrap(); + + // Verify all keys migrated + let keys = KVStoreSync::list(&v2_store, "bulk", "test").unwrap(); + assert_eq!(keys.len(), num_keys); + + // Verify pagination works on migrated data + let page1 = PaginatedKVStoreSync::list_paginated(&v2_store, "bulk", "test", None).unwrap(); + assert_eq!(page1.keys.len(), PAGE_SIZE); + assert!(page1.next_page_token.is_some()); + + let page2 = + PaginatedKVStoreSync::list_paginated(&v2_store, "bulk", "test", page1.next_page_token) + .unwrap(); + assert_eq!(page2.keys.len(), num_keys - PAGE_SIZE); + assert!(page2.next_page_token.is_none()); + + // Verify data integrity for a few random keys + for i in [0, 25, 50, 74] { + let key = format!("key_{:04}", i); + let expected_data = vec![i as u8; 32]; + assert_eq!(KVStoreSync::read(&v2_store, "bulk", "test", &key).unwrap(), expected_data); + } + } + + #[test] + fn test_v1_to_v2_migration_post_migration_operations() { + use crate::fs_store::FilesystemStore; + use lightning::util::persist::KVStoreSync; + + // Create v1 store with some data + let mut v1_path = std::env::temp_dir(); + v1_path.push("test_v1_to_v2_migration_post_ops_source"); + let v1_store = FilesystemStore::new(v1_path); + + let original_data = vec![42u8; 32]; + KVStoreSync::write(&v1_store, "ns", "sub", "existing_key", original_data.clone()).unwrap(); + + // Create v2 store and migrate + let mut v2_path = std::env::temp_dir(); + v2_path.push("test_v1_to_v2_migration_post_ops_target"); + let v2_store = FilesystemStoreV2::new(v2_path).unwrap(); + + migrate_v1_to_v2(&v1_store, &v2_store).unwrap(); + + // Test that we can write new keys after migration + let new_data = vec![43u8; 32]; + KVStoreSync::write(&v2_store, "ns", "sub", "new_key", new_data.clone()).unwrap(); + + // Test that we can update migrated keys + let updated_data = vec![44u8; 32]; + KVStoreSync::write(&v2_store, "ns", "sub", "existing_key", updated_data.clone()).unwrap(); + + // Verify reads work correctly + assert_eq!( + KVStoreSync::read(&v2_store, "ns", "sub", "existing_key").unwrap(), + updated_data + ); + assert_eq!(KVStoreSync::read(&v2_store, "ns", "sub", "new_key").unwrap(), new_data); + + // Verify list includes both old and new keys + let mut keys = KVStoreSync::list(&v2_store, "ns", "sub").unwrap(); + keys.sort(); + assert_eq!(keys, vec!["existing_key", "new_key"]); + + // Test removal works + KVStoreSync::remove(&v2_store, "ns", "sub", "existing_key", false).unwrap(); + let keys = KVStoreSync::list(&v2_store, "ns", "sub").unwrap(); + assert_eq!(keys, vec!["new_key"]); + } + + #[test] + fn test_v1_to_v2_migration_max_length_names() { + use crate::fs_store::FilesystemStore; + use lightning::util::persist::{KVStoreSync, KVSTORE_NAMESPACE_KEY_MAX_LEN}; + + // Create v1 store with maximum length names + let mut v1_path = std::env::temp_dir(); + v1_path.push("test_v1_to_v2_migration_max_len_source"); + let v1_store = FilesystemStore::new(v1_path); + + let max_name = "A".repeat(KVSTORE_NAMESPACE_KEY_MAX_LEN); + let data = vec![42u8; 32]; + + KVStoreSync::write(&v1_store, &max_name, &max_name, &max_name, data.clone()).unwrap(); + + // Create v2 store and migrate + let mut v2_path = std::env::temp_dir(); + v2_path.push("test_v1_to_v2_migration_max_len_target"); + let v2_store = FilesystemStoreV2::new(v2_path).unwrap(); + + migrate_v1_to_v2(&v1_store, &v2_store).unwrap(); + + // Verify the key was migrated correctly + assert_eq!(KVStoreSync::read(&v2_store, &max_name, &max_name, &max_name).unwrap(), data); + + // Verify list works + let keys = KVStoreSync::list(&v2_store, &max_name, &max_name).unwrap(); + assert_eq!(keys, vec![max_name.clone()]); + } + + #[test] + fn test_filesystem_store_v2() { + // Create the nodes, giving them FilesystemStoreV2s for data stores. + let store_0 = FilesystemStoreV2::new("test_filesystem_store_v2_0".into()).unwrap(); + let store_1 = FilesystemStoreV2::new("test_filesystem_store_v2_1".into()).unwrap(); + do_test_store(&store_0, &store_1) + } + + #[test] + fn test_page_token_format() { + let mtime: u64 = 1706500000000; + let key = "test_key"; + let token = format_page_token(mtime, key); + assert_eq!(token, "0001706500000000:test_key"); + + let parsed = parse_page_token(&token).unwrap(); + assert_eq!(parsed, (mtime, key.to_string())); + + // Test invalid tokens + assert!(parse_page_token("invalid").is_err()); + assert!(parse_page_token("0001706500000000_key").is_err()); // wrong separator + } + + #[test] + fn test_directory_structure() { + use lightning::util::persist::KVStoreSync; + + let mut temp_path = std::env::temp_dir(); + temp_path.push("test_directory_structure_v2"); + let fs_store = FilesystemStoreV2::new(temp_path.clone()).unwrap(); + + let data = vec![42u8; 32]; + + // Write with empty namespaces + KVStoreSync::write(&fs_store, "", "", "key1", data.clone()).unwrap(); + assert!(temp_path.join(EMPTY_NAMESPACE_DIR).join(EMPTY_NAMESPACE_DIR).exists()); + + // Write with non-empty primary, empty secondary + KVStoreSync::write(&fs_store, "primary", "", "key2", data.clone()).unwrap(); + assert!(temp_path.join("primary").join(EMPTY_NAMESPACE_DIR).exists()); + + // Write with both non-empty + KVStoreSync::write(&fs_store, "primary", "secondary", "key3", data.clone()).unwrap(); + assert!(temp_path.join("primary").join("secondary").exists()); + + // Verify we can read them back + assert_eq!(KVStoreSync::read(&fs_store, "", "", "key1").unwrap(), data); + assert_eq!(KVStoreSync::read(&fs_store, "primary", "", "key2").unwrap(), data); + assert_eq!(KVStoreSync::read(&fs_store, "primary", "secondary", "key3").unwrap(), data); + + // Verify files are named just by key (no timestamp prefix) + assert!(temp_path + .join(EMPTY_NAMESPACE_DIR) + .join(EMPTY_NAMESPACE_DIR) + .join("key1") + .exists()); + assert!(temp_path.join("primary").join(EMPTY_NAMESPACE_DIR).join("key2").exists()); + assert!(temp_path.join("primary").join("secondary").join("key3").exists()); + } + + #[test] + fn test_update_preserves_mtime() { + use lightning::util::persist::KVStoreSync; + + let mut temp_path = std::env::temp_dir(); + temp_path.push("test_update_preserves_mtime_v2"); + let fs_store = FilesystemStoreV2::new(temp_path.clone()).unwrap(); + + let data1 = vec![42u8; 32]; + let data2 = vec![43u8; 32]; + + // Write initial data + KVStoreSync::write(&fs_store, "ns", "sub", "key", data1).unwrap(); + + // Get the original mtime + let file_path = temp_path.join("ns").join("sub").join("key"); + let original_mtime = fs::metadata(&file_path).unwrap().modified().unwrap(); + + // Sleep briefly to ensure different timestamp if not preserved + std::thread::sleep(std::time::Duration::from_millis(50)); + + // Update with new data + KVStoreSync::write(&fs_store, "ns", "sub", "key", data2.clone()).unwrap(); + + // Verify mtime is preserved + let updated_mtime = fs::metadata(&file_path).unwrap().modified().unwrap(); + assert_eq!(original_mtime, updated_mtime); + + // Verify data was updated + assert_eq!(KVStoreSync::read(&fs_store, "ns", "sub", "key").unwrap(), data2); + } + + #[test] + fn test_paginated_listing() { + use lightning::util::persist::{KVStoreSync, PaginatedKVStoreSync}; + + let mut temp_path = std::env::temp_dir(); + temp_path.push("test_paginated_listing_v2"); + let fs_store = FilesystemStoreV2::new(temp_path).unwrap(); + + let data = vec![42u8; 32]; + + // Write several keys with small delays to ensure different mtimes + let keys: Vec = (0..5).map(|i| format!("key{}", i)).collect(); + for key in &keys { + KVStoreSync::write(&fs_store, "ns", "sub", key, data.clone()).unwrap(); + std::thread::sleep(std::time::Duration::from_millis(10)); + } + + // List paginated - should return newest first + let response = PaginatedKVStoreSync::list_paginated(&fs_store, "ns", "sub", None).unwrap(); + assert_eq!(response.keys.len(), 5); + // Newest key (key4) should be first + assert_eq!(response.keys[0], "key4"); + assert_eq!(response.keys[4], "key0"); + assert!(response.next_page_token.is_none()); // Less than PAGE_SIZE items + } + + #[test] + fn test_paginated_listing_with_pagination() { + use lightning::util::persist::{KVStoreSync, PaginatedKVStoreSync}; + + let mut temp_path = std::env::temp_dir(); + temp_path.push("test_paginated_listing_with_pagination_v2"); + let fs_store = FilesystemStoreV2::new(temp_path).unwrap(); + + let data = vec![42u8; 32]; + + // Write more than PAGE_SIZE keys + let num_keys = PAGE_SIZE + 50; + for i in 0..num_keys { + let key = format!("key{:04}", i); + KVStoreSync::write(&fs_store, "ns", "sub", &key, data.clone()).unwrap(); + // Small delay to ensure ordering + if i % 10 == 0 { + std::thread::sleep(std::time::Duration::from_millis(1)); + } + } + + // First page + let response1 = PaginatedKVStoreSync::list_paginated(&fs_store, "ns", "sub", None).unwrap(); + assert_eq!(response1.keys.len(), PAGE_SIZE); + assert!(response1.next_page_token.is_some()); + + // Second page + let response2 = + PaginatedKVStoreSync::list_paginated(&fs_store, "ns", "sub", response1.next_page_token) + .unwrap(); + assert_eq!(response2.keys.len(), 50); + assert!(response2.next_page_token.is_none()); + + // Verify no duplicates between pages + let all_keys: std::collections::HashSet<_> = + response1.keys.iter().chain(response2.keys.iter()).collect(); + assert_eq!(all_keys.len(), num_keys); + } + + #[test] + fn test_page_token_after_deletion() { + use lightning::util::persist::{KVStoreSync, PaginatedKVStoreSync}; + + let mut temp_path = std::env::temp_dir(); + temp_path.push("test_page_token_after_deletion_v2"); + let fs_store = FilesystemStoreV2::new(temp_path).unwrap(); + + let data = vec![42u8; 32]; + + // Write keys + for i in 0..10 { + let key = format!("key{}", i); + KVStoreSync::write(&fs_store, "ns", "sub", &key, data.clone()).unwrap(); + std::thread::sleep(std::time::Duration::from_millis(10)); + } + + // Verify initial listing + let response1 = PaginatedKVStoreSync::list_paginated(&fs_store, "ns", "sub", None).unwrap(); + assert_eq!(response1.keys.len(), 10); + + // Delete some keys + KVStoreSync::remove(&fs_store, "ns", "sub", "key5", false).unwrap(); + KVStoreSync::remove(&fs_store, "ns", "sub", "key3", false).unwrap(); + + // List again - should work fine with deleted keys + let response2 = PaginatedKVStoreSync::list_paginated(&fs_store, "ns", "sub", None).unwrap(); + assert_eq!(response2.keys.len(), 8); // 10 - 2 deleted + } + + #[test] + fn test_same_mtime_sorted_by_key() { + use lightning::util::persist::PaginatedKVStoreSync; + use std::time::Duration; + + // Create files directly on disk first with the same mtime + let mut temp_path = std::env::temp_dir(); + temp_path.push("test_same_mtime_sorted_by_key_v2"); + let _ = fs::remove_dir_all(&temp_path); + + let data = vec![42u8; 32]; + let dir = temp_path.join("ns").join("sub"); + fs::create_dir_all(&dir).unwrap(); + + // Write files with the same mtime but different keys + let keys = vec!["zebra", "apple", "mango", "banana"]; + let fixed_time = UNIX_EPOCH + Duration::from_secs(1706500000); + + for key in &keys { + let file_path = dir.join(key); + let file = fs::File::create(&file_path).unwrap(); + std::io::Write::write_all(&mut &file, &data).unwrap(); + file.set_times(FileTimes::new().set_modified(fixed_time)).unwrap(); + } + + // Open the store + let fs_store = FilesystemStoreV2::new(temp_path.clone()).unwrap(); + + // List paginated - should return keys sorted by key in reverse order + // (for same mtime, keys are sorted reverse alphabetically) + let response = PaginatedKVStoreSync::list_paginated(&fs_store, "ns", "sub", None).unwrap(); + assert_eq!(response.keys.len(), 4); + + // Same mtime means sorted by key in reverse order (z > m > b > a) + assert_eq!(response.keys[0], "zebra"); + assert_eq!(response.keys[1], "mango"); + assert_eq!(response.keys[2], "banana"); + assert_eq!(response.keys[3], "apple"); + } +} diff --git a/lightning-persister/src/lib.rs b/lightning-persister/src/lib.rs index 0e3541e1b27..ba64e916335 100644 --- a/lightning-persister/src/lib.rs +++ b/lightning-persister/src/lib.rs @@ -9,7 +9,9 @@ extern crate criterion; pub mod fs_store; +pub mod fs_store_v2; +mod fs_store_common; mod utils; #[cfg(test)] diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 440d1d31331..12a0e4859c2 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -374,6 +374,195 @@ where } } +/// An opaque token used for paginated listing operations. +/// +/// This token should be treated as an opaque value by callers. Pass the token returned from +/// one `list_paginated` call to the next call to continue pagination. The internal format +/// is implementation-defined and may change between versions. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PageToken(pub String); + +/// Represents the response from a paginated `list` operation. +/// +/// Contains the list of keys and a token for retrieving the next page of results. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PaginatedListResponse { + /// A vector of keys, ordered from most recently created to least recently created. + pub keys: Vec, + + /// A token that can be passed to the next call to continue pagination. + /// + /// Is `None` if there are no more pages to retrieve. + pub next_page_token: Option, +} + +/// Provides an interface that allows storage and retrieval of persisted values that are associated +/// with given keys, with support for pagination. +/// +/// In order to avoid collisions, the key space is segmented based on the given `primary_namespace`s +/// and `secondary_namespace`s. Implementations of this trait are free to handle them in different +/// ways, as long as per-namespace key uniqueness is asserted. +/// +/// Keys and namespaces are required to be valid ASCII strings in the range of +/// [`KVSTORE_NAMESPACE_KEY_ALPHABET`] and no longer than [`KVSTORE_NAMESPACE_KEY_MAX_LEN`]. Empty +/// primary namespaces and secondary namespaces (`""`) are considered valid; however, if +/// `primary_namespace` is empty, `secondary_namespace` must also be empty. This means that concerns +/// should always be separated by primary namespace first, before secondary namespaces are used. +/// While the number of primary namespaces will be relatively small and determined at compile time, +/// there may be many secondary namespaces per primary namespace. Note that per-namespace uniqueness +/// needs to also hold for keys *and* namespaces in any given namespace, i.e., conflicts between keys +/// and equally named primary or secondary namespaces must be avoided. +/// +/// **Note:** This trait extends the functionality of [`KVStoreSync`] by adding support for +/// paginated listing of keys in creation order. This is useful when dealing with a large number +/// of keys that cannot be efficiently retrieved all at once. +/// +/// For an asynchronous version of this trait, see [`PaginatedKVStore`]. +pub trait PaginatedKVStoreSync: KVStoreSync { + /// Returns a paginated list of keys that are stored under the given `secondary_namespace` in + /// `primary_namespace`, ordered from most recently created to least recently created. + /// + /// Implementations must return keys in reverse creation order (newest first). How creation + /// order is tracked is implementation-defined (e.g., storing creation timestamps, using an + /// incrementing ID, or another mechanism). Creation order (not last-updated order) is used + /// to prevent race conditions during pagination: if keys were ordered by update time, a key + /// updated mid-pagination could shift position, causing it to be skipped or returned twice + /// across pages. + /// + /// If `page_token` is provided, listing continues from where the previous page left off. + /// If `None`, listing starts from the most recently created entry. The `next_page_token` + /// in the returned [`PaginatedListResponse`] can be passed to subsequent calls to fetch + /// the next page. + /// + /// Implementations must generate a [`PageToken`] that encodes enough information to resume + /// listing from the correct position. The token should encode the creation timestamp (or + /// sequence number) and key name of the last returned entry. Tokens must remain valid across + /// multiple calls within a reasonable timeframe. If the entry referenced by a token has been + /// deleted, implementations should resume from the next valid position rather than failing. + /// Tokens are scoped to a specific `(primary_namespace, secondary_namespace)` pair and should + /// not be used across different namespace pairs. + /// + /// Returns an empty list if `primary_namespace` or `secondary_namespace` is unknown or if + /// there are no more keys to return. + fn list_paginated( + &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option, + ) -> Result; +} + +/// A wrapper around a [`PaginatedKVStoreSync`] that implements the [`PaginatedKVStore`] trait. +/// It is not necessary to use this type directly. +#[derive(Clone)] +pub struct PaginatedKVStoreSyncWrapper(pub K) +where + K::Target: PaginatedKVStoreSync; + +/// This is not exported to bindings users as async is only supported in Rust. +impl KVStore for PaginatedKVStoreSyncWrapper +where + K::Target: PaginatedKVStoreSync, +{ + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> impl Future, io::Error>> + 'static + MaybeSend { + let res = self.0.read(primary_namespace, secondary_namespace, key); + + async move { res } + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> impl Future> + 'static + MaybeSend { + let res = self.0.write(primary_namespace, secondary_namespace, key, buf); + + async move { res } + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> impl Future> + 'static + MaybeSend { + let res = self.0.remove(primary_namespace, secondary_namespace, key, lazy); + + async move { res } + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> impl Future, io::Error>> + 'static + MaybeSend { + let res = self.0.list(primary_namespace, secondary_namespace); + + async move { res } + } +} + +/// This is not exported to bindings users as async is only supported in Rust. +impl PaginatedKVStore for PaginatedKVStoreSyncWrapper +where + K::Target: PaginatedKVStoreSync, +{ + fn list_paginated( + &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option, + ) -> impl Future> + 'static + MaybeSend { + let res = self.0.list_paginated(primary_namespace, secondary_namespace, page_token); + + async move { res } + } +} + +/// Provides an interface that allows storage and retrieval of persisted values that are associated +/// with given keys, with support for pagination. +/// +/// In order to avoid collisions, the key space is segmented based on the given `primary_namespace`s +/// and `secondary_namespace`s. Implementations of this trait are free to handle them in different +/// ways, as long as per-namespace key uniqueness is asserted. +/// +/// Keys and namespaces are required to be valid ASCII strings in the range of +/// [`KVSTORE_NAMESPACE_KEY_ALPHABET`] and no longer than [`KVSTORE_NAMESPACE_KEY_MAX_LEN`]. Empty +/// primary namespaces and secondary namespaces (`""`) are considered valid; however, if +/// `primary_namespace` is empty, `secondary_namespace` must also be empty. This means that concerns +/// should always be separated by primary namespace first, before secondary namespaces are used. +/// While the number of primary namespaces will be relatively small and determined at compile time, +/// there may be many secondary namespaces per primary namespace. Note that per-namespace uniqueness +/// needs to also hold for keys *and* namespaces in any given namespace, i.e., conflicts between keys +/// and equally named primary or secondary namespaces must be avoided. +/// +/// **Note:** This trait extends the functionality of [`KVStore`] by adding support for +/// paginated listing of keys in creation order. This is useful when dealing with a large number +/// of keys that cannot be efficiently retrieved all at once. +/// +/// For a synchronous version of this trait, see [`PaginatedKVStoreSync`]. +/// +/// This is not exported to bindings users as async is only supported in Rust. +pub trait PaginatedKVStore: KVStore { + /// Returns a paginated list of keys that are stored under the given `secondary_namespace` in + /// `primary_namespace`, ordered from most recently created to least recently created. + /// + /// Implementations must return keys in reverse creation order (newest first). How creation + /// order is tracked is implementation-defined (e.g., storing creation timestamps, using an + /// incrementing ID, or another mechanism). Creation order (not last-updated order) is used + /// to prevent race conditions during pagination: if keys were ordered by update time, a key + /// updated mid-pagination could shift position, causing it to be skipped or returned twice + /// across pages. + /// + /// If `page_token` is provided, listing continues from where the previous page left off. + /// If `None`, listing starts from the most recently created entry. The `next_page_token` + /// in the returned [`PaginatedListResponse`] can be passed to subsequent calls to fetch + /// the next page. + /// + /// Implementations must generate a [`PageToken`] that encodes enough information to resume + /// listing from the correct position. The token should encode the creation timestamp (or + /// sequence number) and key name of the last returned entry. Tokens must remain valid across + /// multiple calls within a reasonable timeframe. If the entry referenced by a token has been + /// deleted, implementations should resume from the next valid position rather than failing. + /// Tokens are scoped to a specific `(primary_namespace, secondary_namespace)` pair and should + /// not be used across different namespace pairs. + /// + /// Returns an empty list if `primary_namespace` or `secondary_namespace` is unknown or if + /// there are no more keys to return. + fn list_paginated( + &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option, + ) -> impl Future> + 'static + MaybeSend; +} + /// Provides additional interface methods that are required for [`KVStore`]-to-[`KVStore`] /// data migration. pub trait MigratableKVStore: KVStoreSync { @@ -1546,7 +1735,7 @@ mod tests { use crate::ln::msgs::BaseMessageHandler; use crate::sync::Arc; use crate::util::test_channel_signer::TestChannelSigner; - use crate::util::test_utils::{self, TestStore}; + use crate::util::test_utils::{self, TestPaginatedStore, TestStore}; use bitcoin::hashes::hex::FromHex; use core::cmp; @@ -1956,4 +2145,78 @@ mod tests { let store: Arc = Arc::new(TestStore::new(false)); assert!(persist_fn::<_, TestChannelSigner>(Arc::clone(&store))); } + + #[test] + fn paginated_store_basic_operations() { + let store = TestPaginatedStore::new(10); + + // Write some data + store.write("ns1", "ns2", "key1", vec![1, 2, 3]).unwrap(); + store.write("ns1", "ns2", "key2", vec![4, 5, 6]).unwrap(); + + // Read it back + assert_eq!(KVStoreSync::read(&store, "ns1", "ns2", "key1").unwrap(), vec![1, 2, 3]); + assert_eq!(KVStoreSync::read(&store, "ns1", "ns2", "key2").unwrap(), vec![4, 5, 6]); + + // List should return keys in descending order + let response = store.list_paginated("ns1", "ns2", None).unwrap(); + assert_eq!(response.keys, vec!["key2", "key1"]); + assert!(response.next_page_token.is_none()); + + // Remove a key + KVStoreSync::remove(&store, "ns1", "ns2", "key1", false).unwrap(); + assert!(KVStoreSync::read(&store, "ns1", "ns2", "key1").is_err()); + } + + #[test] + fn paginated_store_pagination() { + let store = TestPaginatedStore::new(2); + + // Write 5 items with different order values + for i in 0..5i64 { + store.write("ns", "", &format!("key{i}"), vec![i as u8]).unwrap(); + } + + // First page should have 2 items (most recently created first: key4, key3) + let page1 = store.list_paginated("ns", "", None).unwrap(); + assert_eq!(page1.keys.len(), 2); + assert_eq!(page1.keys, vec!["key4", "key3"]); + assert!(page1.next_page_token.is_some()); + + // Second page + let page2 = store.list_paginated("ns", "", page1.next_page_token).unwrap(); + assert_eq!(page2.keys.len(), 2); + assert_eq!(page2.keys, vec!["key2", "key1"]); + assert!(page2.next_page_token.is_some()); + + // Third page (last item) + let page3 = store.list_paginated("ns", "", page2.next_page_token).unwrap(); + assert_eq!(page3.keys.len(), 1); + assert_eq!(page3.keys, vec!["key0"]); + assert!(page3.next_page_token.is_none()); + } + + #[test] + fn paginated_store_update_preserves_order() { + let store = TestPaginatedStore::new(10); + + // Write items with specific order values + store.write("ns", "", "key1", vec![1]).unwrap(); + store.write("ns", "", "key2", vec![2]).unwrap(); + store.write("ns", "", "key3", vec![3]).unwrap(); + + // Verify initial order (newest first) + let response = store.list_paginated("ns", "", None).unwrap(); + assert_eq!(response.keys, vec!["key3", "key2", "key1"]); + + // Update key1 with a new order value that would put it first if used + store.write("ns", "", "key1", vec![1, 1]).unwrap(); + + // Verify data was updated + assert_eq!(KVStoreSync::read(&store, "ns", "", "key1").unwrap(), vec![1, 1]); + + // Verify order is unchanged - creation order should have been preserved + let response = store.list_paginated("ns", "", None).unwrap(); + assert_eq!(response.keys, vec!["key3", "key2", "key1"]); + } } diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 34f5d5fe36e..de04054861f 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -51,6 +51,7 @@ use crate::sign::{ChannelSigner, PeerStorageKey}; use crate::sync::RwLock; use crate::types::features::{ChannelFeatures, InitFeatures, NodeFeatures}; use crate::util::async_poll::MaybeSend; +use crate::util::atomic_counter::AtomicCounter; use crate::util::config::UserConfig; use crate::util::dyn_signer::{ DynKeysInterface, DynKeysInterfaceTrait, DynPhantomKeysInterface, DynSigner, @@ -58,7 +59,7 @@ use crate::util::dyn_signer::{ use crate::util::logger::{Logger, Record}; #[cfg(feature = "std")] use crate::util::mut_global::MutGlobal; -use crate::util::persist::{KVStore, KVStoreSync, MonitorName}; +use crate::util::persist::{KVStore, KVStoreSync, MonitorName, PageToken, PaginatedListResponse}; use crate::util::ser::{Readable, ReadableArgs, Writeable, Writer}; use crate::util::test_channel_signer::{EnforcementState, TestChannelSigner}; use crate::util::wakers::Notifier; @@ -1124,6 +1125,122 @@ impl KVStoreSync for TestStore { unsafe impl Sync for TestStore {} unsafe impl Send for TestStore {} +/// A simple in-memory implementation of [`PaginatedKVStoreSync`] for testing. +/// +/// [`PaginatedKVStoreSync`]: crate::util::persist::PaginatedKVStoreSync +pub struct TestPaginatedStore { + data: Mutex)>>, + page_size: usize, + time_counter: AtomicCounter, +} + +impl TestPaginatedStore { + /// Creates a new `TestPaginatedStore` with the given page size. + pub fn new(page_size: usize) -> Self { + Self { data: Mutex::new(new_hash_map()), page_size, time_counter: AtomicCounter::new() } + } +} + +impl KVStoreSync for TestPaginatedStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Result, io::Error> { + let data = self.data.lock().unwrap(); + data.get(&(primary_namespace.to_string(), secondary_namespace.to_string(), key.to_string())) + .map(|(_, v)| v.clone()) + .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "key not found")) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Result<(), io::Error> { + let mut data = self.data.lock().unwrap(); + let order = self.time_counter.next() as i64; + let key_tuple = + (primary_namespace.to_string(), secondary_namespace.to_string(), key.to_string()); + // Only use order for new entries; preserve existing order on updates + let order_to_use = + data.get(&key_tuple).map(|(existing_order, _)| *existing_order).unwrap_or(order); + data.insert(key_tuple, (order_to_use, buf)); + Ok(()) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool, + ) -> Result<(), io::Error> { + let mut data = self.data.lock().unwrap(); + data.remove(&( + primary_namespace.to_string(), + secondary_namespace.to_string(), + key.to_string(), + )); + Ok(()) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Result, io::Error> { + let mut all_keys = Vec::new(); + let mut page_token = None; + loop { + let response = crate::util::persist::PaginatedKVStoreSync::list_paginated( + self, + primary_namespace, + secondary_namespace, + page_token, + )?; + all_keys.extend(response.keys); + match response.next_page_token { + Some(token) => page_token = Some(token), + None => break, + } + } + Ok(all_keys) + } +} + +impl crate::util::persist::PaginatedKVStoreSync for TestPaginatedStore { + fn list_paginated( + &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option, + ) -> Result { + let data = self.data.lock().unwrap(); + let mut entries: Vec<_> = data + .iter() + .filter(|((pn, sn, _), _)| pn == primary_namespace && sn == secondary_namespace) + .map(|((_, _, k), (t, _))| (k.clone(), *t)) + .collect(); + + // Sort by time descending, then by key + entries.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0))); + + // Apply pagination: find the first entry AFTER the given key in sort order. + // This implementation uses the last key as the page token. + let start_idx = if let Some(PageToken(ref last_key)) = page_token { + // Find the position of this key and start after it + entries.iter().position(|(k, _)| k == last_key).map(|pos| pos + 1).unwrap_or(0) + } else { + 0 + }; + + let page_entries: Vec<_> = + entries.into_iter().skip(start_idx).take(self.page_size).collect(); + + let next_page_token = if page_entries.len() == self.page_size { + page_entries.last().map(|(k, _)| PageToken(k.clone())) + } else { + None + }; + + Ok(PaginatedListResponse { + keys: page_entries.into_iter().map(|(k, _)| k).collect(), + next_page_token, + }) + } +} + +unsafe impl Sync for TestPaginatedStore {} +unsafe impl Send for TestPaginatedStore {} + pub struct TestBroadcaster { pub txn_broadcasted: Mutex>, pub blocks: Arc>>,