Skip to content

Commit 3a60588

Browse files
committed
Implement PaginatedKVStore for FilesystemStore
Test created with claude code
1 parent cab0b9b commit 3a60588

File tree

1 file changed

+176
-2
lines changed

1 file changed

+176
-2
lines changed

lightning-persister/src/fs_store.rs

Lines changed: 176 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,22 @@
22
use crate::utils::{check_namespace_key_validity, is_valid_kvstore_str};
33

44
use lightning::types::string::PrintableString;
5-
use lightning::util::persist::{KVStoreSync, MigratableKVStore};
5+
use lightning::util::persist::{
6+
KVStoreSync, MigratableKVStore, PageToken, PaginatedKVStoreSync, PaginatedListResponse,
7+
};
68

79
use std::collections::HashMap;
810
use std::fs;
911
use std::io::{Read, Write};
1012
use std::path::{Path, PathBuf};
1113
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
1214
use std::sync::{Arc, Mutex, RwLock};
15+
use std::time::SystemTime;
1316

1417
#[cfg(feature = "tokio")]
1518
use core::future::Future;
1619
#[cfg(feature = "tokio")]
17-
use lightning::util::persist::KVStore;
20+
use lightning::util::persist::{KVStore, PaginatedKVStore};
1821

1922
#[cfg(target_os = "windows")]
2023
use {std::ffi::OsStr, std::os::windows::ffi::OsStrExt};
@@ -39,6 +42,9 @@ fn path_to_windows_str<T: AsRef<OsStr>>(path: &T) -> Vec<u16> {
3942
// a consistent view and error out.
4043
const LIST_DIR_CONSISTENCY_RETRIES: usize = 10;
4144

45+
// The default page size for paginated list operations.
46+
const PAGINATED_LIST_DEFAULT_PAGE_SIZE: usize = 50;
47+
4248
struct FilesystemStoreInner {
4349
data_dir: PathBuf,
4450
tmp_file_counter: AtomicUsize,
@@ -148,6 +154,22 @@ impl KVStoreSync for FilesystemStore {
148154
}
149155
}
150156

157+
impl PaginatedKVStoreSync for FilesystemStore {
158+
fn list_paginated(
159+
&self, primary_namespace: &str, secondary_namespace: &str, page_token: Option<PageToken>,
160+
) -> Result<PaginatedListResponse, lightning::io::Error> {
161+
let path = self.inner.get_checked_dest_file_path(
162+
primary_namespace,
163+
secondary_namespace,
164+
None,
165+
"list_paginated",
166+
)?;
167+
// Extract the last key from the page token for internal use
168+
let last_key = page_token.map(|t| t.0);
169+
self.inner.list_paginated(path, last_key, PAGINATED_LIST_DEFAULT_PAGE_SIZE)
170+
}
171+
}
172+
151173
impl FilesystemStoreInner {
152174
fn get_inner_lock_ref(&self, path: PathBuf) -> Arc<RwLock<u64>> {
153175
let mut outer_lock = self.locks.lock().unwrap();
@@ -456,6 +478,77 @@ impl FilesystemStoreInner {
456478

457479
Ok(keys)
458480
}
481+
482+
fn list_paginated(
483+
&self, prefixed_dest: PathBuf, last_key: Option<String>, page_size: usize,
484+
) -> lightning::io::Result<PaginatedListResponse> {
485+
if !Path::new(&prefixed_dest).exists() {
486+
return Ok(PaginatedListResponse { keys: Vec::new(), next_page_token: None });
487+
}
488+
489+
let mut entries: Vec<(String, SystemTime)> = Vec::with_capacity(page_size);
490+
let mut retries = LIST_DIR_CONSISTENCY_RETRIES;
491+
492+
'retry_list: loop {
493+
entries.clear();
494+
'skip_entry: for entry in fs::read_dir(&prefixed_dest)? {
495+
let entry = entry?;
496+
let p = entry.path();
497+
498+
let res = dir_entry_is_key(&entry);
499+
match res {
500+
Ok(true) => {
501+
let key = get_key_from_dir_entry_path(&p, &prefixed_dest)?;
502+
// Get file creation time, falling back to modified time if unavailable.
503+
let metadata = entry.metadata()?;
504+
let created_time = metadata
505+
.created()
506+
.or_else(|_| metadata.modified())
507+
.unwrap_or(SystemTime::UNIX_EPOCH);
508+
entries.push((key, created_time));
509+
},
510+
Ok(false) => {
511+
continue 'skip_entry;
512+
},
513+
Err(e) => {
514+
if e.kind() == lightning::io::ErrorKind::NotFound && retries > 0 {
515+
retries -= 1;
516+
continue 'retry_list;
517+
} else {
518+
return Err(e.into());
519+
}
520+
},
521+
}
522+
}
523+
break 'retry_list;
524+
}
525+
526+
if entries.is_empty() {
527+
return Ok(PaginatedListResponse { keys: Vec::new(), next_page_token: None });
528+
}
529+
530+
// Sort by creation time descending (newest first), then by key name for stability.
531+
entries.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
532+
533+
// Apply pagination: find the first entry AFTER the given key in sort order.
534+
let start_idx = if let Some(ref key) = last_key {
535+
// Find the position of this key and start after it
536+
entries.iter().position(|(k, _)| k == key).map(|pos| pos + 1).unwrap_or(0)
537+
} else {
538+
0
539+
};
540+
541+
let page_entries: Vec<String> =
542+
entries.into_iter().skip(start_idx).take(page_size).map(|(k, _)| k).collect();
543+
544+
let next_page_token = if page_entries.len() == page_size {
545+
page_entries.last().cloned().map(PageToken)
546+
} else {
547+
None
548+
};
549+
550+
Ok(PaginatedListResponse { keys: page_entries, next_page_token })
551+
}
459552
}
460553

461554
#[cfg(feature = "tokio")]
@@ -544,6 +637,38 @@ impl KVStore for FilesystemStore {
544637
}
545638
}
546639

640+
#[cfg(feature = "tokio")]
641+
impl PaginatedKVStore for FilesystemStore {
642+
fn list_paginated(
643+
&self, primary_namespace: &str, secondary_namespace: &str, page_token: Option<PageToken>,
644+
) -> impl Future<Output = Result<PaginatedListResponse, lightning::io::Error>> + 'static + Send
645+
{
646+
let this = Arc::clone(&self.inner);
647+
648+
let path = this.get_checked_dest_file_path(
649+
primary_namespace,
650+
secondary_namespace,
651+
None,
652+
"list_paginated",
653+
);
654+
655+
// Extract the last key from the page token for internal use
656+
let last_key = page_token.map(|t| t.0);
657+
658+
async move {
659+
let path = match path {
660+
Ok(path) => path,
661+
Err(e) => return Err(e),
662+
};
663+
tokio::task::spawn_blocking(move || {
664+
this.list_paginated(path, last_key, PAGINATED_LIST_DEFAULT_PAGE_SIZE)
665+
})
666+
.await
667+
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))
668+
}
669+
}
670+
}
671+
547672
fn dir_entry_is_key(dir_entry: &fs::DirEntry) -> Result<bool, lightning::io::Error> {
548673
let p = dir_entry.path();
549674
if let Some(ext) = p.extension() {
@@ -792,6 +917,55 @@ mod tests {
792917
assert_eq!(listed_keys.len(), 0);
793918
}
794919

920+
#[test]
921+
fn test_list_paginated() {
922+
let mut temp_path = std::env::temp_dir();
923+
temp_path.push("test_list_paginated");
924+
let fs_store = FilesystemStore::new(temp_path);
925+
926+
let primary = "testspace";
927+
let secondary = "testsubspace";
928+
929+
// Write multiple keys with small delays to ensure different creation times
930+
let keys = ["key_a", "key_b", "key_c", "key_d", "key_e"];
931+
for key in &keys {
932+
KVStoreSync::write(&fs_store, primary, secondary, key, vec![42u8]).unwrap();
933+
// Small delay to ensure different creation times
934+
std::thread::sleep(std::time::Duration::from_millis(10));
935+
}
936+
937+
// Test that all keys are returned (no pagination cursor)
938+
let response =
939+
PaginatedKVStoreSync::list_paginated(&fs_store, primary, secondary, None).unwrap();
940+
assert_eq!(response.keys.len(), 5);
941+
// Keys should be ordered by creation time descending (newest first)
942+
// The last written key should be first
943+
assert_eq!(response.keys[0], "key_e");
944+
assert_eq!(response.keys[4], "key_a");
945+
// No more pages since we have less than page_size (50)
946+
assert!(response.next_page_token.is_none());
947+
948+
// Test pagination with a cursor
949+
// First, get the first page starting from the beginning
950+
let response =
951+
PaginatedKVStoreSync::list_paginated(&fs_store, primary, secondary, None).unwrap();
952+
// Use one of the middle keys as a cursor to get remaining keys
953+
let cursor = PageToken(response.keys[2].clone()); // Should be "key_c"
954+
let response2 =
955+
PaginatedKVStoreSync::list_paginated(&fs_store, primary, secondary, Some(cursor))
956+
.unwrap();
957+
// Should return the keys after "key_c" in the sorted order
958+
assert_eq!(response2.keys.len(), 2);
959+
assert_eq!(response2.keys[0], "key_b");
960+
assert_eq!(response2.keys[1], "key_a");
961+
962+
// Test with non-existent namespace returns empty
963+
let response =
964+
PaginatedKVStoreSync::list_paginated(&fs_store, "nonexistent", "", None).unwrap();
965+
assert!(response.keys.is_empty());
966+
assert!(response.next_page_token.is_none());
967+
}
968+
795969
#[test]
796970
fn test_data_migration() {
797971
let mut source_temp_path = std::env::temp_dir();

0 commit comments

Comments
 (0)