Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
al8n committed May 16, 2024
1 parent e7a3e66 commit 45d725b
Show file tree
Hide file tree
Showing 5 changed files with 483 additions and 233 deletions.
289 changes: 240 additions & 49 deletions src/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ pub use entry::*;
mod iterator;
pub use iterator::*;

mod insert;
mod upsert;

#[cfg(all(test, not(loom)))]
mod tests;

Expand Down Expand Up @@ -114,36 +111,6 @@ impl<T, C> SkipMap<T, C> {
}
}

impl<T: Trailer, C: Comparator> SkipMap<T, C> {
/// ## Safety
///
/// - The caller must ensure that the node is allocated by the arena.
#[inline]
unsafe fn get_prev(&self, nd: NodePtr<T>, height: usize) -> NodePtr<T> {
if nd.is_null() {
return NodePtr::NULL;
}

let offset = nd.prev_offset(&self.arena, height);
let ptr = self.arena.get_pointer(offset as usize);
NodePtr::new(ptr, offset)
}

/// ## Safety
///
/// - The caller must ensure that the node is allocated by the arena.
#[inline]
unsafe fn get_next(&self, nptr: NodePtr<T>, height: usize) -> NodePtr<T> {
if nptr.is_null() {
return NodePtr::NULL;
}
let offset = nptr.next_offset(&self.arena, height);
let ptr = self.arena.get_pointer(offset as usize);
NodePtr::new(ptr, offset)
}
}

// --------------------------------Private Methods--------------------------------
impl<T: Trailer, C> SkipMap<T, C> {
#[allow(clippy::type_complexity)]
fn new_node<'a, 'b: 'a, E>(
Expand Down Expand Up @@ -172,9 +139,62 @@ impl<T: Trailer, C> SkipMap<T, C> {
}
Ok((nd, height))
}

#[allow(clippy::type_complexity)]
fn new_remove_node<'a, 'b: 'a>(
&'a self,
key: &'b [u8],
trailer: T,
) -> Result<(NodePtr<T>, u32), Error> {
let height = super::random_height();
let nd = Node::new_remove_node_ptr(&self.arena, height, key, trailer)?;

// Try to increase self.height via CAS.
let mut list_height = self.height();
while height > list_height {
match self.arena.atomic_height().compare_exchange_weak(
list_height,
height,
Ordering::SeqCst,
Ordering::Acquire,
) {
// Successfully increased skiplist.height.
Ok(_) => break,
Err(h) => list_height = h,
}
}
Ok((nd, height))
}
}

impl<T: Trailer, C: Comparator> SkipMap<T, C> {
/// ## Safety
///
/// - The caller must ensure that the node is allocated by the arena.
#[inline]
unsafe fn get_prev(&self, nd: NodePtr<T>, height: usize) -> NodePtr<T> {
if nd.is_null() {
return NodePtr::NULL;
}

let offset = nd.prev_offset(&self.arena, height);
let ptr = self.arena.get_pointer(offset as usize);
NodePtr::new(ptr, offset)
}

/// ## Safety
///
/// - The caller must ensure that the node is allocated by the arena.
#[inline]
unsafe fn get_next(&self, nptr: NodePtr<T>, height: usize) -> NodePtr<T> {
if nptr.is_null() {
return NodePtr::NULL;
}
let offset = nptr.next_offset(&self.arena, height);
let ptr = self.arena.get_pointer(offset as usize);
NodePtr::new(ptr, offset)
}

/// Returns the first entry in the map.
fn first_in(&self, version: u64) -> Option<NodePtr<T>> {
// Safety: head node was definitely allocated by self.arena
Expand Down Expand Up @@ -625,14 +645,11 @@ impl<T: Trailer, C: Comparator> SkipMap<T, C> {
let node_ptr = ptr.expect("the NodePtr cannot be `None` when we found");
let old = EntryRef::from_node(node_ptr, self);

return if !upsert {
Ok(Some(old))
} else {
node_ptr
.as_ptr()
.set_value(&self.arena, value_size, f)
.map(|_| Some(old))
};
if upsert {
node_ptr.as_ptr().set_value(&self.arena, value_size, f)?;
}

return Ok(if old.is_removed() { None } else { Some(old) });
}
}

Expand Down Expand Up @@ -753,14 +770,188 @@ impl<T: Trailer, C: Comparator> SkipMap<T, C> {
.curr
.expect("the current should not be `None` when we found");
let old = EntryRef::from_node(node_ptr, self);
return if !upsert {
Ok(Some(old))
} else {
node_ptr
.as_ptr()
.set_value(&self.arena, value_size, f)
.map(|_| Some(old))
};

if upsert {
node_ptr.as_ptr().set_value(&self.arena, value_size, f)?;
}

return Ok(if old.is_removed() { None } else { Some(old) });
}

invalid_data_splice = true;
prev = fr.splice.prev;
next = fr.splice.next;
}
}
}
}
}

// If we had to recompute the splice for a level, invalidate the entire
// cached splice.
if invalid_data_splice {
ins.height = 0;
} else {
// The splice was valid. We inserted a node between spl[i].prev and
// spl[i].next. Optimistically update spl[i].prev for use in a subsequent
// call to add.
for i in 0..(height as usize) {
ins.spl[i].prev = nd;
}
}
self.arena.incr_len();
self.arena.update_max_version(version);
self.arena.update_min_version(version);
Ok(None)
}

fn remove_in<'a, 'b: 'a>(
&'a self,
trailer: T,
key: &'b [u8],
ins: &mut Inserter<T>,
upsert: bool,
) -> Result<Option<EntryRef<'a, T, C>>, Error> {
let version = trailer.version();
// Safety: a fresh new Inserter, so safe here
unsafe {
let (found, ptr) = self.find_splice(version, key, ins, true);
if found {
let node_ptr = ptr.expect("the NodePtr cannot be `None` when we found");
let old = EntryRef::from_node(node_ptr, self);

if upsert {
node_ptr.as_ptr().clear_value();
}

return Ok(if old.is_removed() { None } else { Some(old) });
}
}

#[cfg(all(test, feature = "std"))]
if self.yield_now {
// Add delay to make it easier to test race between this thread
// and another thread that sees the intermediate state between
// finding the splice and using it.
std::thread::yield_now();
}

let (nd, height) = self.new_remove_node(key, trailer)?;

// We always insert from the base level and up. After you add a node in base
// level, we cannot create a node in the level above because it would have
// discovered the node in the base level.
let mut invalid_data_splice = false;

for i in 0..(height as usize) {
let mut prev = ins.spl[i].prev;
let mut next = ins.spl[i].next;

if prev.is_null() {
// New node increased the height of the skiplist, so assume that the
// new level has not yet been populated.
if !next.is_null() {
panic!("next is expected to be nil, since prev is nil");
}

prev = self.head;
next = self.tail;
}

// +----------------+ +------------+ +----------------+
// | prev | | nd | | next |
// | prevNextOffset |---->| | | |
// | |<----| prevOffset | | |
// | | | nextOffset |---->| |
// | | | |<----| nextPrevOffset |
// +----------------+ +------------+ +----------------+
//
// 1. Initialize prevOffset and nextOffset to point to prev and next.
// 2. CAS prevNextOffset to repoint from next to nd.
// 3. CAS nextPrevOffset to repoint from prev to nd.
unsafe {
loop {
let prev_offset = prev.offset;
let next_offset = next.offset;
nd.write_tower(&self.arena, i, prev_offset, next_offset);

// Check whether next has an updated link to prev. If it does not,
// that can mean one of two things:
// 1. The thread that added the next node hasn't yet had a chance
// to add the prev link (but will shortly).
// 2. Another thread has added a new node between prev and next.
//
// Safety: we already check next is not null
let next_prev_offset = next.prev_offset(&self.arena, i);
if next_prev_offset != prev_offset {
// Determine whether #1 or #2 is true by checking whether prev
// is still pointing to next. As long as the atomic operations
// have at least acquire/release semantics (no need for
// sequential consistency), this works, as it is equivalent to
// the "publication safety" pattern.
let prev_next_offset = prev.next_offset(&self.arena, i);
if prev_next_offset == next_offset {
// Ok, case #1 is true, so help the other thread along by
// updating the next node's prev link.
let link = next.tower(&self.arena, i);
let _ = link.prev_offset.compare_exchange(
next_prev_offset,
prev_offset,
Ordering::SeqCst,
Ordering::Acquire,
);
}
}

let prev_link = prev.tower(&self.arena, i);
match prev_link.next_offset.compare_exchange_weak(
next.offset,
nd.offset,
Ordering::SeqCst,
Ordering::Acquire,
) {
Ok(_) => {
// Managed to insert nd between prev and next, so update the next
// node's prev link and go to the next level.
#[cfg(all(test, feature = "std"))]
if self.yield_now {
// Add delay to make it easier to test race between this thread
// and another thread that sees the intermediate state between
// setting next and setting prev.
std::thread::yield_now();
}

let next_link = next.tower(&self.arena, i);
let _ = next_link.prev_offset.compare_exchange(
prev_offset,
nd.offset,
Ordering::SeqCst,
Ordering::Acquire,
);

break;
}
Err(_) => {
// CAS failed. We need to recompute prev and next. It is unlikely to
// be helpful to try to use a different level as we redo the search,
// because it is unlikely that lots of nodes are inserted between prev
// and next.
let fr = self.find_splice_for_level(trailer.version(), key, i, prev);
if fr.found {
if i != 0 {
panic!("how can another thread have inserted a node at a non-base level?");
}

let node_ptr = fr
.curr
.expect("the current should not be `None` when we found");
let old = EntryRef::from_node(node_ptr, self);

if upsert {
node_ptr.as_ptr().clear_value();
}

return Ok(if old.is_removed() { None } else { Some(old) });
}

invalid_data_splice = true;
Expand Down
Loading

0 comments on commit 45d725b

Please sign in to comment.