Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
al8n committed May 18, 2024
1 parent f9a2519 commit 2a11bd9
Show file tree
Hide file tree
Showing 7 changed files with 287 additions and 27 deletions.
2 changes: 1 addition & 1 deletion .clippy.toml
Original file line number Diff line number Diff line change
@@ -1 +1 @@
msrv = "1.56.0"
msrv = "1.75.0"
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
## 0.10.0

- Remove `SkipSet`.
- Add `compare_insert`, `insert`, `get_or_insert` and `get_or_insert_with` methods.
- Add `insert`, `get_or_insert` and `get_or_insert_with` methods.
- Add `compare_remove` and `get_or_remove` methods.
- Add an ordered double end linked list to track holes.

## 0.9.0

Expand Down Expand Up @@ -67,3 +66,4 @@
### 0.11.0

- Add `unaligned` feature, which does not apply pad for each allocation from ARENA.
- Add an ordered double end linked list to track holes.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "skl"
version = "0.10.0"
edition = "2021"
rust-version = "1.56.0"
rust-version = "1.75.0"
repository = "https://github.com/al8n/skl-rs"
description = "A lock-free thread-safe concurrent ARENA based (heap backend or memory map backend) skiplist implementation which helps develop MVCC memtable for LSM-Tree."
documentation = "https://docs.rs/skl"
Expand Down Expand Up @@ -35,6 +35,8 @@ default = ["std"]
alloc = []
memmap = ["memmap2", "fs4", "std"]
std = ["rand/default", "either/default"]
unaligned = []


[target.'cfg(loom)'.dependencies]
loom = "0.7"
Expand Down
12 changes: 12 additions & 0 deletions src/align8vp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,23 @@ impl Pointer {
Self(AtomicU64::new(encode_value(offset, u32::MAX)))
}

#[cfg(not(feature = "unaligned"))]
#[inline]
pub(crate) fn load(&self, ordering: Ordering) -> (u32, u32) {
decode_value(self.0.load(ordering))
}

#[cfg(feature = "unaligned")]
#[inline]
pub(crate) fn load(&self, ordering: Ordering) -> (u32, u32) {
use core::ptr;

let ptr = &self.0 as *const AtomicU64;
unsafe {
decode_value(ptr::read_unaligned(ptr).load(ordering))
}
}

#[inline]
pub(crate) fn store(&self, offset: u32, len: u32, ordering: Ordering) {
self.0.store(encode_value(offset, len), ordering);
Expand Down
89 changes: 87 additions & 2 deletions src/arena.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ pub(super) struct Header {
/// Current height. 1 <= height <= kMaxHeight. CAS.
height: AtomicU32,
len: AtomicU32,

// Reserved for segmented list.
segmented_head_ptr: AtomicU32,
segmented_tail_ptr: AtomicU32,
}

impl Header {
Expand All @@ -47,6 +51,8 @@ impl Header {
// Don't store data at position 0 in order to reserve offset=0 as a kind
// of nil pointer.
allocated: AtomicU64::new(size + 1),
segmented_head_ptr: AtomicU32::new(0),
segmented_tail_ptr: AtomicU32::new(0),
}
}
}
Expand Down Expand Up @@ -369,7 +375,7 @@ impl Arena {
}
}

#[inline]
#[cfg(not(feature = "unaligned"))]
pub(super) fn alloc<T>(
&self,
size: u32,
Expand Down Expand Up @@ -421,7 +427,53 @@ impl Arena {
}
}

#[inline]
#[cfg(feature = "unaligned")]
pub(super) fn alloc<T>(
&self,
size: u32,
value_size: u32,
_align: u32,
overflow: u32,
) -> Result<(u32, u32), ArenaError> {
let trailer_size = mem::size_of::<T>();
let header = self.header();
let mut current_allocated = header.allocated.load(Ordering::Acquire);
if current_allocated + size as u64 + overflow as u64 + trailer_size as u64 + value_size as u64
> self.cap as u64
{
return Err(ArenaError);
}

loop {
let want = current_allocated + size as u64 + trailer_size as u64 + value_size as u64;
match header.allocated.compare_exchange_weak(
current_allocated,
want,
Ordering::SeqCst,
Ordering::Acquire,
) {
Ok(current) => {
// Return the unaligned offset.
let node_offset = current as u32;
let allocated_for_trailer = current + size as u64;
let value_offset = allocated_for_trailer as u32;
std::println!("node_offset: {}, value_offset: {}", node_offset, value_offset);
return Ok((node_offset, value_offset));
}
Err(x) => {
if x + size as u64 + overflow as u64 + trailer_size as u64 + value_size as u64
> self.cap as u64
{
return Err(ArenaError);
}

current_allocated = x;
}
}
}
}

#[cfg(not(feature = "unaligned"))]
pub(super) fn alloc_value<T>(&self, size: u32) -> Result<u32, ArenaError> {
let trailer_size = mem::size_of::<T>();
let align = mem::align_of::<T>();
Expand Down Expand Up @@ -459,6 +511,39 @@ impl Arena {
}
}

#[cfg(feature = "unaligned")]
pub(super) fn alloc_value<T>(&self, size: u32) -> Result<u32, ArenaError> {
let trailer_size = mem::size_of::<T>();
let size = size as u64 + trailer_size as u64;

let header = self.header();
let mut current_allocated = header.allocated.load(Ordering::Acquire);
if current_allocated + size > self.cap as u64 {
return Err(ArenaError);
}

loop {
let want = current_allocated + size;
match header.allocated.compare_exchange_weak(
current_allocated,
want,
Ordering::SeqCst,
Ordering::Acquire,
) {
Ok(current) => {
return Ok((current + size) as u32);
}
Err(x) => {
if x + size > self.cap as u64 {
return Err(ArenaError);
}

current_allocated = x;
}
}
}
}

/// ## Safety:
/// - The caller must make sure that `offset` must be less than the capacity of the arena.
/// - The caller must make sure that `size` must be less than the capacity of the arena.
Expand Down
23 changes: 11 additions & 12 deletions src/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -718,8 +718,8 @@ impl<T: Trailer, C: Comparator> SkipMap<T, C> {
if prev_next_offset == next_offset {
// Ok, case #1 is true, so help the other thread along by
// updating the next node's prev link.
let link = next.tower(&self.arena, i);
let _ = link.prev_offset.compare_exchange(
let _ = next.cas_prev_offset(
&self.arena, i,
next_prev_offset,
prev_offset,
Ordering::SeqCst,
Expand All @@ -728,8 +728,7 @@ impl<T: Trailer, C: Comparator> SkipMap<T, C> {
}
}

let prev_link = prev.tower(&self.arena, i);
match prev_link.next_offset.compare_exchange_weak(
match prev.cas_next_offset_weak(&self.arena, i,
next.offset,
nd.offset,
Ordering::SeqCst,
Expand All @@ -746,8 +745,8 @@ impl<T: Trailer, C: Comparator> SkipMap<T, C> {
std::thread::yield_now();
}

let next_link = next.tower(&self.arena, i);
let _ = next_link.prev_offset.compare_exchange(
let _ = next.cas_prev_offset(
&self.arena, i,
prev_offset,
nd.offset,
Ordering::SeqCst,
Expand Down Expand Up @@ -931,8 +930,8 @@ impl<T: Trailer, C: Comparator> SkipMap<T, C> {
if prev_next_offset == next_offset {
// Ok, case #1 is true, so help the other thread along by
// updating the next node's prev link.
let link = next.tower(&self.arena, i);
let _ = link.prev_offset.compare_exchange(
let _ = next.cas_prev_offset(
&self.arena, i,
next_prev_offset,
prev_offset,
Ordering::SeqCst,
Expand All @@ -941,8 +940,8 @@ impl<T: Trailer, C: Comparator> SkipMap<T, C> {
}
}

let prev_link = prev.tower(&self.arena, i);
match prev_link.next_offset.compare_exchange_weak(
match prev.cas_next_offset_weak(
&self.arena, i,
next.offset,
nd.offset,
Ordering::SeqCst,
Expand All @@ -959,8 +958,8 @@ impl<T: Trailer, C: Comparator> SkipMap<T, C> {
std::thread::yield_now();
}

let next_link = next.tower(&self.arena, i);
let _ = next_link.prev_offset.compare_exchange(
let _ = next.cas_prev_offset(
&self.arena, i,
prev_offset,
nd.offset,
Ordering::SeqCst,
Expand Down
Loading

0 comments on commit 2a11bd9

Please sign in to comment.