Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0.10.0 #18

Merged
merged 22 commits into from
May 18, 2024
2 changes: 1 addition & 1 deletion .clippy.toml
Original file line number Diff line number Diff line change
@@ -1 +1 @@
msrv = "1.56.0"
msrv = "1.75.0"
11 changes: 10 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# CHANGELOG

## 0.10.0

- Remove `SkipSet`.
- Add `insert`, `get_or_insert` and `get_or_insert_with` methods.
- Add `compare_remove` and `get_or_remove` methods.

## 0.9.0

- Make file backed mmap `SkipMap` and `SkipSet` still can be reopened even last time the program was aborted.
Expand Down Expand Up @@ -57,4 +63,7 @@

## UNRELEASED

FEATURES
### 0.11.0

- Add `unaligned` feature, which does not apply pad for each allocation from ARENA.
- Add an ordered double end linked list to track holes.
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
[package]
name = "skl"
version = "0.9.0"
version = "0.10.0"
edition = "2021"
rust-version = "1.56.0"
rust-version = "1.75.0"
repository = "https://github.com/al8n/skl-rs"
description = "A lock-free thread-safe concurrent ARENA based (heap backend or memory map backend) skiplist implementation which helps develop MVCC memtable for LSM-Tree."
documentation = "https://docs.rs/skl"
Expand Down Expand Up @@ -36,6 +36,7 @@ alloc = []
memmap = ["memmap2", "fs4", "std"]
std = ["rand/default", "either/default"]


[target.'cfg(loom)'.dependencies]
loom = "0.7"

Expand Down
16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
<img alt="wakatime" src="https://wakatime.com/badge/user/9203ce16-5227-4320-acdb-c8b6135e4730/project/9e07939d-5f12-4f23-bc46-5e1430137b9f.svg?style=for-the-badge&color=e5928d" height="22">
<img alt="license" src="https://img.shields.io/badge/License-Apache%202.0/MIT-blue.svg?style=for-the-badge&fontColor=white&logoColor=f5c076&logo=" height="22">

1. A lock-free thread-safe concurrent `SkipMap` and `SkipSet` implementation based on ARENA skiplist which helps develop MVCC memtable for LSM-Tree.
2. A lock-free thread-safe concurrent memory map based on-disk `SkipMap` and `SkipSet`.
1. A lock-free thread-safe concurrent `SkipMap` implementation based on ARENA skiplist which helps develop MVCC memtable for LSM-Tree.
2. A lock-free thread-safe concurrent memory map based on-disk `SkipMap`.

</div>

Expand All @@ -28,24 +28,24 @@

```toml
[dependencies]
skl = "0.9"
skl = "0.10"
```

- Enable memory map backend

```toml
[dependencies]
skl = { version = "0.9", features = ["memmap"] }
skl = { version = "0.10", features = ["memmap"] }
```

## Features

- **MVCC and 3D access**: Builtin MVCC (multiple versioning concurrency control) and key-value-version access support.
- **Lock-free and Concurrent-Safe:** SkipMap and SkipSet provide lock-free operations, ensuring efficient concurrent access without the need for explicit locking mechanisms.
- **Extensible for Key-Value Database Developers:** Designed as a low-level crate, SkipMap and SkipSet offer a flexible foundation for key-value database developers. You can easily build your own memtable or write-ahead-log (WAL) using these structures.
- **Lock-free and Concurrent-Safe:** SkipMap provide lock-free operations, ensuring efficient concurrent access without the need for explicit locking mechanisms.
- **Extensible for Key-Value Database Developers:** Designed as a low-level crate, SkipMap offer a flexible foundation for key-value database developers. You can easily build your own memtable or write-ahead-log (WAL) using these structures.
- **Memory Efficiency:** These data structures are optimized for minimal memory overhead. They operate around references, avoiding unnecessary allocations and deep copies, which can be crucial for efficient memory usage.
- **Efficient Iteration:** Enjoy fast forward and backward iteration through the elements in your SkipMap or SkipSet. Additionally, bounded iterators are supported, allowing you to traverse only a specified range of elements efficiently.
- **Snapshot Support:** Create snapshots of your SkipMap or SkipSet, offering a read-only view of the contents at a specific moment in time. Snapshots provide a consistent view of the data, enabling implementations of transactional semantics and other use cases where data consistency is crucial.
- **Efficient Iteration:** Enjoy fast forward and backward iteration through the elements in your SkipMap. Additionally, bounded iterators are supported, allowing you to traverse only a specified range of elements efficiently.
- **Snapshot Support:** Create snapshots of your SkipMap, offering a read-only view of the contents at a specific moment in time. Snapshots provide a consistent view of the data, enabling implementations of transactional semantics and other use cases where data consistency is crucial.
- **Memory Management Options:**
- **Heap Allocation:** Memory allocation is handled by Rust's allocator, ensuring all data resides in RAM.
- **Mmap:** Data can be mapped to a disk file by the operating system, making it suitable for write-ahead-logs (WAL) and durable storage.
Expand Down
65 changes: 65 additions & 0 deletions src/align8vp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use crate::sync::{AtomicU64, Ordering};

#[repr(C, align(8))]
pub(crate) struct Pointer(AtomicU64);

impl core::fmt::Debug for Pointer {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let (offset, len) = decode_value(self.0.load(Ordering::Relaxed));
f.debug_struct("Pointer")

Check warning on line 9 in src/align8vp.rs

View check run for this annotation

Codecov / codecov/patch

src/align8vp.rs#L7-L9

Added lines #L7 - L9 were not covered by tests
.field("offset", &offset)
.field("len", &len)
.finish()
}
}

impl Pointer {
#[inline]
pub(crate) fn new(offset: u32, len: u32) -> Self {
Self(AtomicU64::new(encode_value(offset, len)))
}

#[inline]
pub(crate) fn remove(offset: u32) -> Self {
Self(AtomicU64::new(encode_value(offset, u32::MAX)))
}

#[cfg(not(feature = "unaligned"))]
#[inline]
pub(crate) fn load(&self, ordering: Ordering) -> (u32, u32) {
decode_value(self.0.load(ordering))
}

#[inline]
pub(crate) fn store(&self, offset: u32, len: u32, ordering: Ordering) {
self.0.store(encode_value(offset, len), ordering);
}

#[inline]
pub(crate) fn compare_remove(
&self,
success: Ordering,
failure: Ordering,
) -> Result<(u32, u32), (u32, u32)> {
let old = self.0.load(Ordering::Acquire);
let (offset, _) = decode_value(old);
let new = encode_value(offset, u32::MAX);
self
.0
.compare_exchange(old, new, success, failure)
.map(decode_value)
.map_err(decode_value)

Check warning on line 51 in src/align8vp.rs

View check run for this annotation

Codecov / codecov/patch

src/align8vp.rs#L48-L51

Added lines #L48 - L51 were not covered by tests
}
}

#[inline]
const fn encode_value(offset: u32, val_size: u32) -> u64 {
(val_size as u64) << 32 | offset as u64
}

#[inline]
const fn decode_value(value: u64) -> (u32, u32) {
let offset = value as u32;
let val_size = (value >> 32) as u32;
(offset, val_size)

Check warning on line 64 in src/align8vp.rs

View check run for this annotation

Codecov / codecov/patch

src/align8vp.rs#L64

Added line #L64 was not covered by tests
}
104 changes: 84 additions & 20 deletions src/arena.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::sync::{AtomicPtr, AtomicU32, AtomicU64, Ordering};
#[allow(unused_imports)]
use crate::sync::Box;
use crate::sync::{AtomicMut, AtomicPtr, AtomicU32, AtomicU64, Ordering};
use std::boxed::Box;

#[cfg(not(loom))]
use crate::sync::AtomicMut;

use core::{
mem,
Expand Down Expand Up @@ -32,6 +35,10 @@
/// Current height. 1 <= height <= kMaxHeight. CAS.
height: AtomicU32,
len: AtomicU32,

// Reserved for segmented list.
segmented_head_ptr: AtomicU32,
segmented_tail_ptr: AtomicU32,
}

impl Header {
Expand All @@ -44,6 +51,8 @@
// Don't store data at position 0 in order to reserve offset=0 as a kind
// of nil pointer.
allocated: AtomicU64::new(size + 1),
segmented_head_ptr: AtomicU32::new(0),
segmented_tail_ptr: AtomicU32::new(0),
}
}
}
Expand Down Expand Up @@ -290,29 +299,35 @@
}

#[inline]
fn head_offset(&self, max_node_size: u32, align: u32) -> u32 {
fn head_offset<T>(&self, max_node_size: u32, align: u32) -> (u32, u32) {
let trailer_size = mem::size_of::<T>();
let trailer_align = mem::align_of::<T>();

// Pad the allocation with enough bytes to ensure the requested alignment.
let padded = max_node_size as u64 + align as u64 - 1;
let new_size = 1 + self.data_offset + padded;
let trailer_padded = trailer_size as u64 + trailer_align as u64 - 1;
let allocated = 1 + self.data_offset + padded;

// Return the aligned offset.
(new_size as u32 - max_node_size) & !(align - 1)
let node_offset = (allocated as u32 - max_node_size) & !(align - 1);
let total_allocated = allocated + trailer_padded;
(node_offset, total_allocated as u32)
}

pub(super) fn head_ptr(&self, max_node_size: u32, align: u32) -> (*const u8, u32) {
pub(super) fn head_ptr<T>(&self, max_node_size: u32, align: u32) -> (*const u8, u32) {
// Safety: this method is only invoked when we want a readonly,
// in readonly mode, we must have the head_ptr valid.
let offset = self.head_offset(max_node_size, align);
let (offset, _) = self.head_offset::<T>(max_node_size, align);
(unsafe { self.get_pointer(offset as usize) }, offset)
}

pub(super) fn tail_ptr(&self, max_node_size: u32, align: u32) -> (*const u8, u32) {
pub(super) fn tail_ptr<T>(&self, max_node_size: u32, align: u32) -> (*const u8, u32) {
// Pad the allocation with enough bytes to ensure the requested alignment.
let padded = max_node_size as u64 + align as u64 - 1;
let new_size = self.head_offset(max_node_size, align) as u64 + padded + max_node_size as u64;
let (_, current_allocated) = self.head_offset::<T>(max_node_size, align);

// Return the aligned offset.
let offset = (new_size as u32 - max_node_size) & !(align - 1);
let allocated = current_allocated as u64 + padded;
let offset = (allocated as u32 - max_node_size) & !(align - 1);

// Safety: this method is only invoked when we want a readonly,
// in readonly mode, we must have the head_ptr valid.
Expand Down Expand Up @@ -360,19 +375,68 @@
}
}

#[inline]
pub(super) fn alloc(
#[cfg(not(feature = "unaligned"))]
pub(super) fn alloc<T>(
&self,
size: u32,
value_size: u32,
align: u32,
overflow: u32,
) -> Result<(u32, u32), ArenaError> {
let trailer_size = mem::size_of::<T>();
let trailer_align = mem::align_of::<T>();

// Pad the allocation with enough bytes to ensure the requested alignment.
let padded = size as u64 + align as u64 - 1;
let trailer_padded = trailer_size as u64 + trailer_align as u64 - 1;
let header = self.header();
let mut current_allocated = header.allocated.load(Ordering::Acquire);
if current_allocated + padded + overflow as u64 + trailer_padded + value_size as u64
> self.cap as u64

Check warning on line 395 in src/arena.rs

View check run for this annotation

Codecov / codecov/patch

src/arena.rs#L395

Added line #L395 was not covered by tests
{
return Err(ArenaError);
}

loop {

Check warning on line 400 in src/arena.rs

View check run for this annotation

Codecov / codecov/patch

src/arena.rs#L400

Added line #L400 was not covered by tests
let want = current_allocated + padded + trailer_padded + value_size as u64;
match header.allocated.compare_exchange_weak(
current_allocated,
want,

Check warning on line 404 in src/arena.rs

View check run for this annotation

Codecov / codecov/patch

src/arena.rs#L404

Added line #L404 was not covered by tests
Ordering::SeqCst,
Ordering::Acquire,
) {
Ok(current) => {
// Return the aligned offset.
let allocated = current + padded;
let node_offset = (allocated as u32 - size) & !(align - 1);

let allocated_for_trailer = allocated + trailer_padded;
let value_offset =
(allocated_for_trailer as u32 - trailer_size as u32) & !(trailer_align as u32 - 1);

Check warning on line 415 in src/arena.rs

View check run for this annotation

Codecov / codecov/patch

src/arena.rs#L415

Added line #L415 was not covered by tests

return Ok((node_offset, value_offset));
}
Err(x) => {
if x + padded + overflow as u64 + trailer_padded + value_size as u64 > self.cap as u64 {
return Err(ArenaError);

Check warning on line 421 in src/arena.rs

View check run for this annotation

Codecov / codecov/patch

src/arena.rs#L419-L421

Added lines #L419 - L421 were not covered by tests
}

current_allocated = x;

Check warning on line 424 in src/arena.rs

View check run for this annotation

Codecov / codecov/patch

src/arena.rs#L424

Added line #L424 was not covered by tests
}
}
}
}

#[cfg(not(feature = "unaligned"))]
pub(super) fn alloc_value<T>(&self, size: u32) -> Result<u32, ArenaError> {
let trailer_size = mem::size_of::<T>();
let align = mem::align_of::<T>();
let size = size + trailer_size as u32;
let padded = size as u64 + align as u64 - 1;

let header = self.header();
let mut current_allocated = header.allocated.load(Ordering::Acquire);
if current_allocated + padded + overflow as u64 > self.cap as u64 {
if current_allocated + padded > self.cap as u64 {
return Err(ArenaError);
}

Expand All @@ -386,12 +450,12 @@
) {
Ok(current) => {
// Return the aligned offset.
let new_size = current + padded;
let offset = (new_size as u32 - size) & !(align - 1);
return Ok((offset, padded as u32));
let allocated = current + padded;
let value_offset = (allocated as u32 - size) & !(align as u32 - 1);
return Ok(value_offset);
}
Err(x) => {
if x + padded + overflow as u64 > self.cap as u64 {
if x + padded > self.cap as u64 {

Check warning on line 458 in src/arena.rs

View check run for this annotation

Codecov / codecov/patch

src/arena.rs#L458

Added line #L458 was not covered by tests
return Err(ArenaError);
}

Expand Down Expand Up @@ -494,12 +558,12 @@
}

#[test]
#[cfg(test)]
#[cfg(all(not(loom), test))]
fn test_debug() {
let arena = Arena::new_vec(1024, 1024, 8);
assert_eq!(
std::format!("{:?}", arena),
"Arena { cap: 1056, header: Header { max_version: 0, min_version: 0, allocated: 33, height: 1, len: 0 }, data: [0] }"
"Arena { cap: 1064, header: Header { max_version: 0, min_version: 0, allocated: 41, height: 1, len: 0, segmented_head_ptr: 0, segmented_tail_ptr: 0 }, data: [0] }"
);

let err = ArenaError;
Expand Down
13 changes: 5 additions & 8 deletions src/arena/shared.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::*;

use crate::sync::AtomicUsize;
use crate::{alloc::*, sync::AtomicUsize};

#[derive(Debug)]
struct AlignedVec {
Expand All @@ -14,7 +14,7 @@ impl Drop for AlignedVec {
fn drop(&mut self) {
if self.cap != 0 {
unsafe {
std::alloc::dealloc(self.ptr.as_ptr(), self.layout());
dealloc(self.ptr.as_ptr(), self.layout());
}
}
}
Expand All @@ -29,17 +29,14 @@ impl AlignedVec {
align - 1
);
let ptr = unsafe {
let layout = std::alloc::Layout::from_size_align_unchecked(capacity, align);
let ptr = std::alloc::alloc(layout);
let layout = Layout::from_size_align_unchecked(capacity, align);
let ptr = alloc_zeroed(layout);
if ptr.is_null() {
std::alloc::handle_alloc_error(layout);
}
ptr::NonNull::new_unchecked(ptr)
};

unsafe {
core::ptr::write_bytes(ptr.as_ptr(), 0, capacity);
}
Self {
ptr,
cap: capacity,
Expand Down Expand Up @@ -402,7 +399,7 @@ impl Shared {
}
}

#[cfg(all(test, feature = "std"))]
#[cfg(all(test, feature = "std", not(loom)))]
mod tests {
use super::*;

Expand Down
Loading
Loading