Skip to content

Commit

Permalink
Finish
Browse files Browse the repository at this point in the history
  • Loading branch information
al8n committed May 15, 2024
1 parent 8b19ca5 commit 42d817f
Show file tree
Hide file tree
Showing 13 changed files with 1,277 additions and 274 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

## 0.9.0

- Make file backed mmap `Skip*` still work when aborting.
- Make file backed mmap `SkipMap` and `SkipSet` still can be reopened even last time the program was aborted.
- Remove checksum validation, users should take care of data integrity by themselves.
- Support `Clone` directly, no need to use `Arc` wrapper anymore.
- Add `OpenOptions` and `MmapOptions` to support better controls on file mmap backed `SkipMap` and `SkipSet`.

## 0.8.6

Expand Down
8 changes: 7 additions & 1 deletion examples/mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,13 @@ pub fn new_value(i: usize) -> Vec<u8> {
fn main() {
const N: usize = 1000;

let l = Arc::new(SkipMap::mmap_mut("test.wal", 1 << 20, true).unwrap());
let mmap_options = skl::MmapOptions::default();
let open_options = skl::OpenOptions::default()
.create_new(Some(1 << 20))
.read(true)
.write(true);

let l = Arc::new(SkipMap::mmap_mut("test.wal", open_options, mmap_options).unwrap());
let wg = Arc::new(());
for i in 0..N {
let w = wg.clone();
Expand Down
4 changes: 3 additions & 1 deletion examples/mmap_anon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ pub fn new_value(i: usize) -> Vec<u8> {

fn main() {
const N: usize = 1000;
let l = Arc::new(SkipMap::mmap_anon(1 << 20).unwrap());

let mmap_options = skl::MmapOptions::default().len(1 << 20);
let l = Arc::new(SkipMap::mmap_anon(mmap_options).unwrap());
let wg = Arc::new(());
for i in 0..N {
let w = wg.clone();
Expand Down
8 changes: 6 additions & 2 deletions integration/src/bin/test-mmap-anon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use std::sync::Arc;
fn main() {
{
const N: usize = 10;
let l = Arc::new(SkipMap::mmap_anon(1 << 20).unwrap());

let mmap_options = MmapOptions::default().len(1 << 20);
let l = Arc::new(SkipMap::mmap_anon(mmap_options).unwrap());
for i in 0..N {
let l = l.clone();
std::thread::spawn(move || {
Expand All @@ -27,7 +29,9 @@ fn main() {

{
const N2: usize = 100;
let l = Arc::new(SkipMap::mmap_anon(120 << 20).unwrap());

let mmap_options = MmapOptions::default().len(120 << 20);
let l = Arc::new(SkipMap::mmap_anon(mmap_options).unwrap());
for i in 0..N2 {
let l = l.clone();
std::thread::spawn(move || {
Expand Down
10 changes: 8 additions & 2 deletions integration/src/bin/test-mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ fn main() {
let p = dir.path().join("test_mmap");
{
const N: usize = 10;
let l = Arc::new(SkipMap::mmap_mut(&p, 1 << 20, true).unwrap());

let open_options = OpenOptions::default().create_new(Some(1 << 20)).read(true);
let mmap_options = MmapOptions::default();
let l = Arc::new(SkipMap::mmap_mut(&p, open_options, mmap_options).unwrap());
for i in 0..N {
let l = l.clone();
std::thread::spawn(move || {
Expand All @@ -29,7 +32,10 @@ fn main() {

{
const N2: usize = 10;
let l = Arc::new(SkipMap::<u64>::mmap(&p, false).unwrap());

let open_options = OpenOptions::default().read(true);
let mmap_options = MmapOptions::default();
let l = Arc::new(SkipMap::<u64>::mmap(&p, open_options, mmap_options).unwrap());
assert_eq!(N2, l.len());
for i in 0..N2 {
let l = l.clone();
Expand Down
121 changes: 83 additions & 38 deletions src/arena.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@ use core::{
#[allow(unused_imports)]
use std::boxed::Box;

use crossbeam_utils::CachePadded;
#[cfg(all(feature = "memmap", not(target_family = "wasm")))]
use crate::{MmapOptions, OpenOptions};

mod shared;
use shared::Shared;

/// The overhead of the memory-mapped file.
///
/// ```text
/// +---------------+------------+--------------------+--------------------+
/// | 32-bit height | 32-bit len | 64-bit max version | 64-bit min version |
/// +---------------+------------+--------------------+--------------------+
/// +----------------+------------+--------------------+--------------------+---------------------+
/// | 32-bit height | 32-bit len | 64-bit max version | 64-bit min version | 64-bit allocated |
/// +----------------+------------+--------------------+--------------------+---------------------+
/// ```
const MMAP_OVERHEAD: usize = mem::size_of::<Header>();

Expand All @@ -29,15 +30,19 @@ pub(super) struct Header {
len: AtomicU32,
max_version: AtomicU64,
min_version: AtomicU64,
allocated: AtomicU64,
}

impl Default for Header {
fn default() -> Self {
impl Header {
fn new(size: u64) -> Self {
Self {
height: AtomicU32::new(1),
len: AtomicU32::new(0),
max_version: AtomicU64::new(0),
min_version: AtomicU64::new(0),
// Don't store data at position 0 in order to reserve offset=0 as a kind
// of nil pointer.
allocated: AtomicU64::new(size + 1),
}
}
}
Expand All @@ -61,7 +66,6 @@ pub struct Arena {
read_data_ptr: *const u8,
header_ptr: *const Header,
data_offset: u64,
n: CachePadded<AtomicU64>,
inner: AtomicPtr<()>,
cap: usize,
}
Expand All @@ -76,18 +80,63 @@ impl core::fmt::Debug for Arena {
let header = self.header();
f.debug_struct("Arena")
.field("cap", &self.cap)
.field("allocated", &allocated)
.field("header", header)
.field("data", &data)
.finish()
}
}

impl Clone for Arena {
fn clone(&self) -> Self {
unsafe {
let shared: *mut Shared = self.inner.load(Ordering::Relaxed).cast();

let old_size = (*shared).refs.fetch_add(1, Ordering::Release);
if old_size > usize::MAX >> 1 {
abort();
}

// Safety:
// The ptr is always non-null, and the data is only deallocated when the
// last Arena is dropped.
Self {
write_data_ptr: self.write_data_ptr,
read_data_ptr: self.read_data_ptr,
header_ptr: self.header_ptr,
data_offset: self.data_offset,
inner: AtomicPtr::new(shared as _),
cap: self.cap,
}
}
}
}

#[inline(never)]
#[cold]
fn abort() -> ! {
#[cfg(feature = "std")]
{
std::process::abort()
}

#[cfg(not(feature = "std"))]
{
struct Abort;
impl Drop for Abort {
fn drop(&mut self) {
panic!();
}
}
let _a = Abort;
panic!("abort");
}
}

impl Arena {
/// Returns the number of bytes allocated by the arena.
#[inline]
pub fn size(&self) -> usize {
self.n.load(Ordering::Acquire) as usize
self.header().allocated.load(Ordering::Acquire) as usize
}

/// Returns the capacity of the arena.
Expand Down Expand Up @@ -196,52 +245,51 @@ impl Arena {
impl Arena {
#[inline]
pub(super) fn new_vec(n: usize, min_cap: usize, alignment: usize) -> Self {
Self::new(
Shared::new_vec(n.max(min_cap.saturating_add(MMAP_OVERHEAD)), alignment),
None,
)
Self::new(Shared::new_vec(
n.max(min_cap.saturating_add(MMAP_OVERHEAD)),
alignment,
))
}

#[cfg(all(feature = "memmap", not(target_family = "wasm")))]
#[inline]
pub(super) fn mmap_mut<P: AsRef<std::path::Path>>(
path: P,
n: usize,
open_options: OpenOptions,
mmap_options: MmapOptions,
min_cap: usize,
alignment: usize,
lock: bool,
) -> std::io::Result<Self> {
let n = n.saturating_add(MMAP_OVERHEAD);
Shared::mmap_mut(
n.max(min_cap.saturating_add(MMAP_OVERHEAD)),
path,
alignment,
lock,
)
.map(|shared| Self::new(shared, None))
let min_cap = min_cap.saturating_add(MMAP_OVERHEAD);
Shared::mmap_mut(path, open_options, mmap_options, min_cap, alignment).map(Self::new)
}

#[cfg(all(feature = "memmap", not(target_family = "wasm")))]
#[inline]
pub(super) fn mmap<P: AsRef<std::path::Path>>(
path: P,
open_options: OpenOptions,
mmap_options: MmapOptions,
min_cap: usize,
alignment: usize,
lock: bool,
) -> std::io::Result<Self> {
Shared::mmap(min_cap.saturating_add(MMAP_OVERHEAD), alignment, path, lock)
.map(|(allocated, shared)| Self::new(shared, Some(allocated)))
let min_cap = min_cap.saturating_add(MMAP_OVERHEAD);
Shared::mmap(path, open_options, mmap_options, min_cap, alignment).map(Self::new)
}

#[cfg(all(feature = "memmap", not(target_family = "wasm")))]
#[inline]
pub(super) fn new_anonymous_mmap(
n: usize,
mmap_options: MmapOptions,
min_cap: usize,
alignment: usize,
) -> std::io::Result<Self> {
Shared::new_mmaped_anon(n.max(min_cap.saturating_add(MMAP_OVERHEAD)), alignment)
.map(|shared| Self::new(shared, None))
Shared::new_mmaped_anon(
mmap_options,
min_cap.saturating_add(MMAP_OVERHEAD),
alignment,
)
.map(Self::new)
}

#[inline]
Expand Down Expand Up @@ -275,7 +323,7 @@ impl Arena {
}

#[inline]
fn new(mut shared: Shared, allocated: Option<u64>) -> Self {
fn new(mut shared: Shared) -> Self {
// Safety:
// The ptr is always non-null, we just initialized it.
// And this ptr is only deallocated when the arena is dropped.
Expand All @@ -286,17 +334,13 @@ impl Arena {
.map(|p| unsafe { NonNull::new_unchecked(p) })
.unwrap_or_else(NonNull::dangling);
let data_offset = shared.data_offset as u64;
let allocated = allocated.unwrap_or(1u64 + data_offset);

Self {
cap: shared.cap(),
inner: AtomicPtr::new(Box::into_raw(Box::new(shared)) as _),
write_data_ptr,
read_data_ptr,
header_ptr,
// Don't store data at position 0 in order to reserve offset=0 as a kind
// of nil pointer.
n: CachePadded::new(AtomicU64::new(allocated)),
data_offset,
}
}
Expand Down Expand Up @@ -329,14 +373,15 @@ impl Arena {
// Pad the allocation with enough bytes to ensure the requested alignment.
let padded = size as u64 + align as u64 - 1;

let mut current_allocated = self.n.load(Ordering::Acquire);
let header = self.header();
let mut current_allocated = header.allocated.load(Ordering::Acquire);
if current_allocated + padded + overflow as u64 > self.cap as u64 {
return Err(ArenaError);
}

loop {
let want = current_allocated + padded;
match self.n.compare_exchange_weak(
match header.allocated.compare_exchange_weak(
current_allocated,
want,
Ordering::SeqCst,
Expand Down Expand Up @@ -445,7 +490,7 @@ impl Drop for Arena {

// Relaxed is enough here as we're in a drop, no one else can
// access this memory anymore.
shared.unmount(self.n.load(Ordering::Acquire));
shared.unmount();
});
}
}
Expand All @@ -457,7 +502,7 @@ fn test_debug() {
let arena = Arena::new_vec(1024, 1024, 8);
assert_eq!(
std::format!("{:?}", arena),
"Arena { cap: 1048, allocated: 25, header: Header { height: 1, len: 0, max_version: 0, min_version: 0 }, data: [0] }"
"Arena { cap: 1056, header: Header { height: 1, len: 0, max_version: 0, min_version: 0, allocated: 33 }, data: [0] }"
);

let err = ArenaError;
Expand Down
Loading

0 comments on commit 42d817f

Please sign in to comment.