Skip to content

Commit

Permalink
feat: add query iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
arriqaaq committed Jan 6, 2025
1 parent 5996d0b commit e7890e0
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 101 deletions.
55 changes: 29 additions & 26 deletions src/art.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1713,16 +1713,20 @@ impl<P: KeyTrait, V: Clone> Tree<P, V> {
self.size == 0
}

pub fn scan_at_ts<R>(&self, range: R, ts: u64) -> Vec<(Vec<u8>, V)>
pub fn scan_at_ts<'a, R>(
&'a self,
range: R,
ts: u64,
) -> impl Iterator<Item = (Box<[u8]>, V)> + 'a
where
R: RangeBounds<P>,
R: RangeBounds<P> + 'a,
{
scan_node(self.root.as_ref(), range, QueryType::LatestByTs(ts))
}

pub fn keys_at_ts<R>(&self, range: R, ts: u64) -> Vec<Vec<u8>>
pub fn keys_at_ts<'a, R>(&'a self, range: R, ts: u64) -> impl Iterator<Item = Box<[u8]>> + 'a
where
R: RangeBounds<P>,
R: RangeBounds<P> + 'a,
{
query_keys_at_node(self.root.as_ref(), range, QueryType::LatestByTs(ts))
}
Expand Down Expand Up @@ -3333,7 +3337,7 @@ mod tests {
#[test]
fn scan_empty_range() {
let tree: Tree<VariableSizeKey, i32> = Tree::new();
let result = tree.scan_at_ts(RangeFull {}, 0);
let result: Vec<_> = tree.scan_at_ts(RangeFull {}, 0).collect();
assert!(result.is_empty());
}

Expand All @@ -3347,7 +3351,7 @@ mod tests {
}
let range = VariableSizeKey::from_slice("key_1".as_bytes())
..=VariableSizeKey::from_slice("key_2".as_bytes());
let values = tree.scan_at_ts(range, 0);
let values: Vec<_> = tree.scan_at_ts(range, 0).collect();
assert_eq!(values.len(), 2);
}

Expand All @@ -3359,7 +3363,7 @@ mod tests {
tree.insert(&VariableSizeKey::from_str(key).unwrap(), 1, 0, 0)
.unwrap();
}
let values = tree.scan_at_ts(RangeFull {}, 0);
let values: Vec<_> = tree.scan_at_ts(RangeFull {}, 0).collect();
assert_eq!(values.len(), keys.len());
}

Expand All @@ -3373,7 +3377,7 @@ mod tests {
}
let range = VariableSizeKey::from_slice("key_4".as_bytes())
..VariableSizeKey::from_slice("key_5".as_bytes());
let values = tree.scan_at_ts(range, 0);
let values: Vec<_> = tree.scan_at_ts(range, 0).collect();
assert!(values.is_empty());
}

Expand All @@ -3391,7 +3395,7 @@ mod tests {
.unwrap();
}
for (i, _) in keys.iter().enumerate() {
let values = tree.scan_at_ts(RangeFull {}, i as u64);
let values: Vec<_> = tree.scan_at_ts(RangeFull {}, i as u64).collect();
assert_eq!(values.len(), i + 1);
}
}
Expand All @@ -3407,7 +3411,7 @@ mod tests {
.unwrap();
}

let values = tree.scan_at_ts(RangeFull {}, num_keys);
let values: Vec<_> = tree.scan_at_ts(RangeFull {}, num_keys).collect();
assert_eq!(values.len(), num_keys as usize); // Expect all keys to be visible
}

Expand All @@ -3428,15 +3432,15 @@ mod tests {
}

// Scan at a timestamp before any insertions
let result_before = tree.scan_at_ts(RangeFull {}, 0);
let result_before: Vec<_> = tree.scan_at_ts(RangeFull {}, 0).collect();
assert!(result_before.is_empty());

// Scan between insertions
let result_mid = tree.scan_at_ts(RangeFull {}, 4);
let result_mid: Vec<_> = tree.scan_at_ts(RangeFull {}, 4).collect();
assert_eq!(result_mid.len(), 2); // Expect first two keys to be visible

// Scan after all insertions
let result_after = tree.scan_at_ts(RangeFull {}, 7);
let result_after: Vec<_> = tree.scan_at_ts(RangeFull {}, 7).collect();
assert_eq!(result_after.len(), keys.len()); // Expect all keys to be visible
}

Expand All @@ -3447,16 +3451,15 @@ mod tests {
tree.insert(&VariableSizeKey::from_str("key_1").unwrap(), 42, 0, 0)
.unwrap();

let values = tree.scan_at_ts(RangeFull {}, 0);

let values: Vec<_> = tree.scan_at_ts(RangeFull {}, 0).collect();
assert_eq!(values.len(), 1);
assert_eq!(values[0].1, 42);
}

#[test]
fn keys_at_empty_range() {
let tree: Tree<VariableSizeKey, i32> = Tree::new();
let keys = tree.keys_at_ts(RangeFull {}, 0);
let keys: Vec<_> = tree.keys_at_ts(RangeFull {}, 0).collect();
assert!(keys.is_empty());
}

Expand All @@ -3470,7 +3473,7 @@ mod tests {
}
let range = VariableSizeKey::from_slice("key_1".as_bytes())
..=VariableSizeKey::from_slice("key_2".as_bytes());
let keys = tree.keys_at_ts(range, 0);
let keys: Vec<_> = tree.keys_at_ts(range, 0).collect();
assert_eq!(keys.len(), 2);
}

Expand All @@ -3482,7 +3485,7 @@ mod tests {
tree.insert(&VariableSizeKey::from_str(key).unwrap(), 1, 0, 0)
.unwrap();
}
let keys = tree.keys_at_ts(RangeFull {}, 0);
let keys: Vec<_> = tree.keys_at_ts(RangeFull {}, 0).collect();
assert_eq!(keys.len(), keys_to_insert.len());
}

Expand All @@ -3496,7 +3499,7 @@ mod tests {
}
let range = VariableSizeKey::from("key_4".as_bytes().to_vec())
..VariableSizeKey::from("key_5".as_bytes().to_vec());
let keys = tree.keys_at_ts(range, 0);
let keys: Vec<_> = tree.keys_at_ts(range, 0).collect();
assert!(keys.is_empty());
}

Expand All @@ -3514,7 +3517,7 @@ mod tests {
.unwrap();
}
for (i, _) in keys_to_insert.iter().enumerate() {
let keys = tree.keys_at_ts(RangeFull {}, i as u64);
let keys: Vec<_> = tree.keys_at_ts(RangeFull {}, i as u64).collect();
assert_eq!(keys.len(), i + 1);
}
}
Expand All @@ -3531,15 +3534,15 @@ mod tests {
.unwrap();
}

let keys = tree.keys_at_ts(RangeFull {}, num_keys);
let keys: Vec<_> = tree.keys_at_ts(RangeFull {}, num_keys).collect();
assert_eq!(keys.len(), num_keys as usize); // Expect all keys to be visible

// Sort the expected keys lexicographically
expected_keys.sort();

// Verify each key is proper
for (expected_key, key) in expected_keys.iter().zip(keys.iter()) {
assert_eq!(key, expected_key.to_slice());
assert_eq!(key.as_ref(), expected_key.to_slice());
}
}

Expand All @@ -3558,13 +3561,13 @@ mod tests {
.unwrap();
}

let keys_before = tree.keys_at_ts(RangeFull {}, 0);
let keys_before: Vec<_> = tree.keys_at_ts(RangeFull {}, 0).collect();
assert!(keys_before.is_empty());

let keys_mid = tree.keys_at_ts(RangeFull {}, 4);
let keys_mid: Vec<_> = tree.keys_at_ts(RangeFull {}, 4).collect();
assert_eq!(keys_mid.len(), 2); // Expect first two keys to be visible

let keys_after = tree.keys_at_ts(RangeFull {}, 7);
let keys_after: Vec<_> = tree.keys_at_ts(RangeFull {}, 7).collect();
assert_eq!(keys_after.len(), keys_to_insert.len()); // Expect all keys to be visible
}

Expand All @@ -3574,7 +3577,7 @@ mod tests {
tree.insert(&VariableSizeKey::from_str("key_1").unwrap(), 42, 0, 0)
.unwrap();

let keys = tree.keys_at_ts(RangeFull {}, 0);
let keys: Vec<_> = tree.keys_at_ts(RangeFull {}, 0).collect();
assert_eq!(keys.len(), 1);
}

Expand Down
163 changes: 88 additions & 75 deletions src/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,109 +534,122 @@ where
within_start_bound && within_end_bound
}

pub(crate) fn scan_node<K, V, R>(
node: Option<&Arc<Node<K, V>>>,
pub(crate) fn scan_node<'a, K, V, R>(
node: Option<&'a Arc<Node<K, V>>>,
range: R,
query_type: QueryType,
) -> Vec<(Vec<u8>, V)>
) -> impl Iterator<Item = (Box<[u8]>, V)> + 'a
where
K: KeyTrait,
K: KeyTrait + 'a,
V: Clone,
R: RangeBounds<K>,
R: RangeBounds<K> + 'a,
{
iterate(node, range, query_type, true)
.into_iter()
.filter_map(|(k, v_opt)| v_opt.map(|v| (k, v)))
.collect()
QueryIterator::new(node, range, query_type, true).filter_map(|(k, v_opt)| v_opt.map(|v| (k, v)))
}

pub(crate) fn query_keys_at_node<K, V, R>(
node: Option<&Arc<Node<K, V>>>,
pub(crate) fn query_keys_at_node<'a, K, V, R>(
node: Option<&'a Arc<Node<K, V>>>,
range: R,
query_type: QueryType,
) -> Vec<Vec<u8>>
) -> impl Iterator<Item = Box<[u8]>> + 'a
where
K: KeyTrait,
K: KeyTrait + 'a,
V: Clone,
R: RangeBounds<K>,
R: RangeBounds<K> + 'a,
{
iterate(node, range, query_type, false)
.into_iter()
.map(|(k, _)| k)
.collect()
QueryIterator::new(node, range, query_type, false).map(|(k, _)| k)
}

fn iterate<K, V, R>(
node: Option<&Arc<Node<K, V>>>,
pub(crate) struct QueryIterator<'a, K: KeyTrait, V: Clone, R: RangeBounds<K>> {
forward: ForwardIterState<'a, K, V>,
prefix: Vec<u8>,
prefix_lengths: Vec<usize>,
range: R,
query_type: QueryType,
include_values: bool,
) -> Vec<(Vec<u8>, Option<V>)>
where
K: KeyTrait,
V: Clone,
R: RangeBounds<K>,
{
let mut results = Vec::new();
let mut forward = node.map_or_else(ForwardIterState::empty, |n| {
ForwardIterState::scan_at(n, &range, query_type)
});

let mut prefix = forward.prefix.clone();
let mut prefix_lengths = Vec::new();

while let Some(node) = forward.iters.last_mut() {
let e = node.next();
match e {
Some(other) => {
if let NodeType::Twig(twig) = &other.node_type {
if range.contains(&twig.key) {
// Iterate through leaves of the twig
if let Some(leaf) = twig.get_leaf_by_query(query_type) {
let key = twig.key.as_slice().to_vec();
if include_values {
results.push((key, Some(leaf.value.clone())));
} else {
results.push((key, None));
}

impl<'a, K: KeyTrait, V: Clone, R: RangeBounds<K>> QueryIterator<'a, K, V, R> {
pub(crate) fn new(
node: Option<&'a Arc<Node<K, V>>>,
range: R,
query_type: QueryType,
include_values: bool,
) -> Self {
let forward = node.map_or_else(ForwardIterState::empty, |n| {
ForwardIterState::scan_at(n, &range, query_type)
});
let prefix = forward.prefix.clone();

Self {
forward,
prefix,
prefix_lengths: Vec::new(),
range,
query_type,
include_values,
}
}
}

impl<'a, K: KeyTrait, V: Clone, R: RangeBounds<K>> Iterator for QueryIterator<'a, K, V, R> {
type Item = (Box<[u8]>, Option<V>);

fn next(&mut self) -> Option<Self::Item> {
// First try to get item from the current node iteration
while let Some(node) = self.forward.iters.last_mut() {
match node.next() {
Some(other) => {
if let NodeType::Twig(twig) = &other.node_type {
if self.range.contains(&twig.key) {
if let Some(leaf) = twig.get_leaf_by_query(self.query_type) {
let key = twig.key.as_slice();
let value = if self.include_values {
Some(leaf.value.clone())
} else {
None
};
return Some((Box::from(key), value));
}
} else if is_key_out_of_range(&self.range, &twig.key) {
// stop iteration if the range end is exceeded
self.forward.iters.clear();
return None;
}
} else if is_key_out_of_range(&range, &twig.key) {
// stop iteration if the range end is exceeded
forward.iters.clear()
} else {
handle_non_twig_node(
&mut self.prefix,
&mut self.prefix_lengths,
&self.range,
other,
&mut self.forward.iters,
);
}
} else {
handle_non_twig_node(
&mut prefix,
&mut prefix_lengths,
&range,
other,
&mut forward.iters,
);
}
}
None => {
// Pop the iterator if no more elements
forward.iters.pop();
// Restore the prefix to its previous state
if let Some(prefix_len_before) = prefix_lengths.pop() {
prefix.truncate(prefix_len_before);
None => {
// Pop the iterator if no more elements
self.forward.iters.pop();
// Restore the prefix to its previous state
if let Some(prefix_len_before) = self.prefix_lengths.pop() {
self.prefix.truncate(prefix_len_before);
}
}
}
}
}

// Iterate over all leafs in forward.leafs and append them to results
while let Some(leaf) = forward.leafs.pop_front() {
let key = leaf.0.as_slice().to_vec();
let value = if include_values {
Some(leaf.1.value.clone())
// If no more nodes to iterate, try the leaf queue
if let Some(leaf) = self.forward.leafs.pop_front() {
let key = leaf.0.as_slice();
let value = if self.include_values {
Some(leaf.1.value.clone())
} else {
None
};
Some((Box::from(key), value))
} else {
None
};
results.push((key, value));
}
}

results
}

#[cfg(test)]
Expand Down

0 comments on commit e7890e0

Please sign in to comment.