Skip to content

Commit

Permalink
chore: remove snapshot counters
Browse files Browse the repository at this point in the history
  • Loading branch information
arriqaaq committed Mar 16, 2024
1 parent 07fa2f5 commit 2eba6cd
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 155 deletions.
86 changes: 4 additions & 82 deletions src/art.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
use core::panic;
use std::cmp::min;
use std::ops::RangeBounds;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

use hashbrown::HashSet;

use crate::iter::{Iter, Range};
use crate::node::{FlatNode, Node256, Node48, NodeTrait, TwigNode, Version};
use crate::snapshot::Snapshot;
Expand All @@ -26,9 +23,6 @@ const NODE48MAX: usize = 48;
// Minimum and maximum number of children for Node256
const NODE256MIN: usize = NODE48MAX + 1;

// Maximum number of active snapshots
pub const DEFAULT_MAX_ACTIVE_SNAPSHOTS: u64 = 10000;

/// A struct representing a node in an Adaptive Radix Trie.
///
/// The `Node` struct encapsulates a single node within the adaptive radix trie structure.
Expand Down Expand Up @@ -877,8 +871,7 @@ impl<P: KeyTrait + Clone, V: Clone> Node<P, V> {
/// A struct representing an Adaptive Radix Trie.
///
/// The `Tree` struct encompasses the entire adaptive radix trie data structure.
/// It manages the root node of the tree, maintains snapshots of the tree's state,
/// and keeps track of various properties related to snapshot management.
/// It manages the root node of the tree.
///
/// # Type Parameters
///
Expand All @@ -888,19 +881,10 @@ impl<P: KeyTrait + Clone, V: Clone> Node<P, V> {
/// # Fields
///
/// - `root`: An optional shared reference (using `Rc`) to the root node of the tree.
/// - `snapshots`: A `HashSet` storing snapshots of the tree's state, mapped by snapshot IDs.
/// - `max_snapshot_id`: An `AtomicU64` representing the maximum snapshot ID assigned.
/// - `max_active_snapshots`: The maximum number of active snapshots allowed.
///
pub struct Tree<P: KeyTrait, V: Clone> {
/// An optional shared reference to the root node of the tree.
pub(crate) root: Option<Arc<Node<P, V>>>,
/// A mapping of snapshot IDs to their corresponding snapshots.
pub(crate) snapshots: HashSet<u64>,
/// An atomic value indicating the maximum snapshot ID assigned.
pub(crate) max_snapshot_id: AtomicU64,
/// The maximum number of active snapshots allowed.
pub(crate) max_active_snapshots: u64,
/// A flag indicating whether the tree is closed.
pub(crate) closed: bool,
}
Expand Down Expand Up @@ -951,17 +935,10 @@ impl<P: KeyTrait, V: Clone> Tree<P, V> {
pub fn new() -> Self {
Tree {
root: None,
max_snapshot_id: AtomicU64::new(0),
snapshots: HashSet::new(),
max_active_snapshots: DEFAULT_MAX_ACTIVE_SNAPSHOTS,
closed: false,
}
}

pub fn set_max_active_snapshots(&mut self, max_active_snapshots: u64) {
self.max_active_snapshots = max_active_snapshots;
}

/// Inserts a new key-value pair with the specified version into the Trie.
///
/// This function inserts a new key-value pair into the Trie. If the key already exists,
Expand Down Expand Up @@ -1171,66 +1148,17 @@ impl<P: KeyTrait, V: Clone> Tree<P, V> {
/// Returns a `Result` containing the `Snapshot` if the snapshot is created successfully,
/// or an `Err` with an appropriate error message if creation fails.
///
pub fn create_snapshot(&mut self) -> Result<Snapshot<P, V>, TrieError> {
pub fn create_snapshot(&self) -> Result<Snapshot<P, V>, TrieError> {
// Check if the tree is already closed
self.is_closed()?;

if self.snapshots.len() >= self.max_active_snapshots as usize {
return Err(TrieError::Other(
"max number of snapshots reached".to_string(),
));
}

// Increment the snapshot ID atomically
let new_snapshot_id = self.max_snapshot_id.fetch_add(1, Ordering::SeqCst);
self.snapshots.insert(new_snapshot_id);

let root = self.root.as_ref().cloned();
let version = self.root.as_ref().map_or(1, |root| root.version() + 1);
let new_snapshot = Snapshot::new(new_snapshot_id, root, version);
let new_snapshot = Snapshot::new(root, version);

Ok(new_snapshot)
}

/// Closes a snapshot and removes it from the list of active snapshots.
///
/// This function takes a `snapshot_id` as an argument and closes the corresponding snapshot.
/// If the snapshot exists, it is removed from the active snapshots list. If the snapshot is not
/// found, an `Err` is returned with a `TrieError::SnapshotNotFound` variant.
///
/// # Arguments
///
/// * `snapshot_id` - The ID of the snapshot to be closed and removed.
///
/// # Returns
///
/// Returns `Ok(())` if the snapshot is successfully closed and removed. Returns an `Err`
/// with `TrieError::SnapshotNotFound` if the snapshot with the given ID is not found.
///
pub fn close_snapshot(&mut self, snapshot_id: u64) -> Result<(), TrieError> {
// Check if the tree is already closed
self.is_closed()?;

if self.snapshots.remove(&snapshot_id) {
Ok(())
} else {
Err(TrieError::SnapshotNotFound)
}
}

/// Returns the count of active snapshots.
///
/// This function returns the number of currently active snapshots in the Trie.
///
/// # Returns
///
/// Returns a `Result` containing the count of active snapshots if successful, or an `Err`
/// if there is an issue retrieving the snapshot count.
///
pub fn snapshot_count(&self) -> usize {
self.snapshots.len()
}

/// Creates an iterator over the Trie's key-value pairs.
///
/// This function creates and returns an iterator that can be used to traverse the key-value pairs
Expand Down Expand Up @@ -1277,7 +1205,7 @@ impl<P: KeyTrait, V: Clone> Tree<P, V> {

fn is_closed(&self) -> Result<(), TrieError> {
if self.closed {
return Err(TrieError::SnapshotAlreadyClosed);
return Err(TrieError::TreeAlreadyClosed);
}
Ok(())
}
Expand All @@ -1287,12 +1215,6 @@ impl<P: KeyTrait, V: Clone> Tree<P, V> {
// Check if the tree is already closed
self.is_closed()?;

// Check if there are any active readers for the snapshot
if self.snapshot_count() > 0 {
return Err(TrieError::SnapshotNotClosed);
}

// Mark the snapshot as closed
self.closed = true;

Ok(())
Expand Down
7 changes: 2 additions & 5 deletions src/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ use crate::KeyTrait;
// TODO: need to add more tests for snapshot readers
/// A structure representing a pointer for iterating over the Trie's key-value pairs.
pub struct IterationPointer<P: KeyTrait, V: Clone> {
#[allow(dead_code)]
pub(crate) id: u64,
root: Arc<Node<P, V>>,
}

Expand All @@ -19,10 +17,9 @@ impl<P: KeyTrait, V: Clone> IterationPointer<P, V> {
/// # Arguments
///
/// * `root` - The root node of the Trie.
/// * `id` - The ID of the snapshot.
///
pub fn new(root: Arc<Node<P, V>>, id: u64) -> IterationPointer<P, V> {
IterationPointer { id, root }
pub fn new(root: Arc<Node<P, V>>) -> IterationPointer<P, V> {
IterationPointer { root }
}

/// Returns an iterator over the key-value pairs within the Trie.
Expand Down
6 changes: 0 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,11 +387,9 @@ pub enum TrieError {
IllegalArguments,
NotFound,
KeyNotFound,
SnapshotNotFound,
SnapshotEmpty,
SnapshotNotClosed,
SnapshotAlreadyClosed,
SnapshotReadersNotClosed,
TreeAlreadyClosed,
FixedSizeKeyLengthExceeded,
Other(String),
Expand All @@ -406,12 +404,8 @@ impl fmt::Display for TrieError {
TrieError::IllegalArguments => write!(f, "Illegal arguments"),
TrieError::NotFound => write!(f, "Not found"),
TrieError::KeyNotFound => write!(f, "Key not found"),
TrieError::SnapshotNotFound => write!(f, "Snapshot not found"),
TrieError::SnapshotNotClosed => write!(f, "Snapshot not closed"),
TrieError::SnapshotAlreadyClosed => write!(f, "Snapshot already closed"),
TrieError::SnapshotReadersNotClosed => {
write!(f, "Readers in the snapshot are not closed")
}
TrieError::TreeAlreadyClosed => write!(f, "Tree already closed"),
TrieError::Other(ref message) => write!(f, "Other error: {}", message),
TrieError::SnapshotEmpty => write!(f, "Snapshot is empty"),
Expand Down
65 changes: 3 additions & 62 deletions src/snapshot.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,24 @@
//! This module defines the Snapshot struct for managing snapshots within a Trie structure.
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use hashbrown::HashSet;

use crate::art::Node;
use crate::iter::IterationPointer;
use crate::node::Version;
use crate::{KeyTrait, TrieError};

/// Represents a snapshot of the data within the Trie.
pub struct Snapshot<P: KeyTrait, V: Clone> {
pub(crate) id: u64,
pub(crate) ts: u64,
pub(crate) root: Option<Arc<Node<P, V>>>,
pub(crate) readers: HashSet<u64>,
pub(crate) max_active_readers: AtomicU64,
pub(crate) closed: bool,
}

impl<P: KeyTrait, V: Clone> Snapshot<P, V> {
/// Creates a new Snapshot instance with the provided snapshot_id and root node.
pub(crate) fn new(id: u64, root: Option<Arc<Node<P, V>>>, ts: u64) -> Self {
pub(crate) fn new(root: Option<Arc<Node<P, V>>>, ts: u64) -> Self {
Snapshot {
id,
ts,
root,
readers: HashSet::new(),
max_active_readers: AtomicU64::new(0),
closed: false,
}
}
Expand Down Expand Up @@ -95,10 +85,6 @@ impl<P: KeyTrait, V: Clone> Snapshot<P, V> {
// Check if the snapshot is already closed
self.is_closed()?;

// Check if there are any active readers for the snapshot
if self.max_active_readers.load(Ordering::SeqCst) > 0 {
return Err(TrieError::SnapshotReadersNotClosed);
}
// Mark the snapshot as closed
self.closed = true;

Expand All @@ -113,28 +99,7 @@ impl<P: KeyTrait, V: Clone> Snapshot<P, V> {
return Err(TrieError::SnapshotEmpty);
}

let reader_id = self.max_active_readers.fetch_add(1, Ordering::SeqCst) + 1;
self.readers.insert(reader_id);
Ok(IterationPointer::new(
self.root.as_ref().unwrap().clone(),
reader_id,
))
}

pub fn active_readers(&self) -> Result<u64, TrieError> {
// Check if the snapshot is already closed
self.is_closed()?;

Ok(self.max_active_readers.load(Ordering::SeqCst))
}

pub fn close_reader(&mut self, reader_id: u64) -> Result<(), TrieError> {
// Check if the snapshot is already closed
self.is_closed()?;

self.readers.remove(&reader_id);
let _readers = self.max_active_readers.fetch_sub(1, Ordering::SeqCst);
Ok(())
Ok(IterationPointer::new(self.root.as_ref().unwrap().clone()))
}

pub fn remove(&mut self, key: &P) -> Result<bool, TrieError> {
Expand Down Expand Up @@ -164,10 +129,6 @@ impl<P: KeyTrait, V: Clone> Snapshot<P, V> {
pub fn ts(&self) -> u64 {
self.ts
}

pub fn id(&self) -> u64 {
self.id
}
}

#[cfg(test)]
Expand Down Expand Up @@ -196,7 +157,6 @@ mod tests {

let expected_snap_ts = keys.len() as u64 + 1;
assert_eq!(snap1.version(), expected_snap_ts);
assert_eq!(tree.snapshot_count(), 1);

let expected_tree_ts = keys.len() as u64;
assert_eq!(tree.version(), expected_tree_ts);
Expand All @@ -214,15 +174,11 @@ mod tests {

// Keys inserted before snapshot creation should be visible
let mut snap1 = tree.create_snapshot().unwrap();
assert_eq!(snap1.id, 0);
assert_eq!(snap1.get(&key_1).unwrap(), (1, 1, 0));

let mut snap2 = tree.create_snapshot().unwrap();
assert_eq!(snap2.id, 1);
assert_eq!(snap2.get(&key_1).unwrap(), (1, 1, 0));

assert_eq!(tree.snapshot_count(), 2);

// Keys inserted after snapshot creation should not be visible to other snapshots
assert!(tree.insert(&key_2, 1, 0, 0).is_ok());
assert!(snap1.get(&key_2).is_err());
Expand All @@ -241,11 +197,6 @@ mod tests {

assert!(snap1.close().is_ok());
assert!(snap2.close().is_ok());

assert!(tree.close_snapshot(snap1.id).is_ok());
assert!(tree.close_snapshot(snap2.id).is_ok());

assert_eq!(tree.snapshot_count(), 0);
}

#[test]
Expand All @@ -265,23 +216,13 @@ mod tests {

// Reader 1
let reader1 = snap.new_reader().unwrap();
let reader1_id = reader1.id;
assert_eq!(count_items(&reader1), 4);
assert_eq!(reader1_id, 1);

// Reader 2
let reader2 = snap.new_reader().unwrap();
let reader2_id = reader2.id;
assert_eq!(count_items(&reader2), 4);
assert_eq!(reader2_id, 2);

// Active readers
assert_eq!(snap.active_readers().unwrap(), 2);
assert!(snap.close().is_err());

// Close readers
assert!(snap.close_reader(reader1_id).is_ok());
assert!(snap.close_reader(reader2_id).is_ok());
// Close snapshot
assert!(snap.close().is_ok());
}

Expand Down

0 comments on commit 2eba6cd

Please sign in to comment.