Skip to content

Commit

Permalink
Merge pull request #57 from dcSpark/egostkin/fix-fraos-shrink
Browse files Browse the repository at this point in the history
Fix missing shrink at the end of seqno
  • Loading branch information
gostkin authored Nov 14, 2023
2 parents 33ad7b7 + df20fd7 commit 3e603b2
Show file tree
Hide file tree
Showing 6 changed files with 387 additions and 20 deletions.
113 changes: 104 additions & 9 deletions fraos/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

Guarantees:
* thread-safe
* changes are dumped to the disk at the moment of insertion
* changes are dumped to the disk at the moment of insertion or the insertion is failed
* the records are stored / iterated in the order of insertion

This work was impressed by the original works of data-pile by Eugene Babichenko. Currently maintained by Eugene Gostkin and dcSpark team.

Expand All @@ -14,18 +15,112 @@ This work was impressed by the original works of data-pile by Eugene Babichenko.
* The storage should have a minimal dependency footprint.
* Thread-safety

## Usage guide

### Example
## Example

```rust
use data_pile::Database;
let db = Database::file("./pile").unwrap();
let storage = Database::file(path).unwrap()?;
let value = b"some data";
db.put(&value).unwrap();
```

### Notes
## How it works

### Field schema
```rust
pub struct Database {
flatfile: Arc<FlatFile>,
seqno_index: Arc<SeqNoIndex>,
write_lock: Arc<Mutex<()>>,
}
```

Field roles:
* `flatfile` - the **raw data file**, where the bytes are stored sequentially
* `seqno_index` - sequentially stored pairs `(offset, length)` that point to records stored in **raw data file**
* can be accessed by the `sequential index` (the right offset is `2 * size_of::<usize>() * n`)
* `write_lock` - handles concurrency

### Memory allocation

Both `flatfile` and `seqno_index` use `Appender` concept inside:
```rust
pub(crate) struct Appender {
// This is used to trick the compiler so that we have parallel reads and
// writes.
mmap: UnsafeCell<GrowableMmap>,
// Atomic is used to ensure that we can have lock-free and memory-safe
// reads. Since this value is updated only after the write has finished it
// is safe to use it as the upper boundary for reads.
actual_size: AtomicUsize,
}

impl Appender {
/// Open a flatfile.
///
/// # Arguments
///
/// * `path` - the path to the file. It will be created if not exists.
/// * `writable` - flag that indicates whether the storage is read-only
pub fn new(
path: Option<PathBuf>,
existing_length: Option<usize>,
writable: bool,
) -> Result<Self, FraosError> { ... }

/// Append data to the file. The mutable pointer to the new data location is
/// given to `f` which should write the data. This function will block if
/// another write is in progress.
pub fn append<F>(&self, size_inc: usize, f: F) -> Result<(), FraosError>
where
F: Fn(&mut [u8]) -> Result<(), FraosError>,
{ ... }

/// The whole data buffer is given to `f` which should return the data back
/// or return None if something went wrong.
pub fn get_data<F, U>(&self, offset: usize, f: F) -> Result<Option<U>, FraosError>
where
F: Fn(&[u8]) -> Result<Option<U>, FraosError>,
{ ... }

pub fn memory_size(&self) -> usize { ... }

pub fn shrink_to_size(&self) -> Result<(), FraosError> { ... }
}

```

```rust
pub(crate) struct GrowableMmap {
storage: RwLock<Storage>,
file: Option<File>,
}

struct Storage {
inactive_mmaps: InactiveMmaps,
active_map: Option<ActiveMmap>,
}
```

`GrowableMmap` has an active mutable mmap tail (`active_map`) and inactive prefix (`inactive_mmaps`).
* If we have enough space we add records to the active mmap
* If we don't have enough space:
* we slice the active mmap to the actual end of writes
* put it to inactive mmaps
* create a new mmap either of size of the data or `MIN_MMAP_BYTES`
* If `inactive_mmaps` has more than `MAX_MMAPS_COUNT` mmaps we remap the existing data and create a single mmap for that data
* This is needed, since on UNIX-like systems there's a limit on how much mmaps a process can have at a time. If the limit is exceeded the storage will stop working

When the data is being appended:
* We try check if `GrowableMmap` of `flatfile` has an active section.
* If free space in active section is enough, then the data is written into the free section and dumped to disk
* If free space is not enough the current active mmap is cut, added to list of inactive mmaps and new chunk is allocated. The data is written to the allocated section and dumped to disk. If the record is too big the active mmap size is equal to the record size
* Same applies to `GrowableMmap` of `seqno_index`:
* We append the pair of `offset` and `length` to the active section, so the index know where to search the data in `flatfile`

Reload note:

Values are accessible only by their sequential numbers. You will need an
external index if you want any other kind of keys.
* If the storage is reloaded without proper `drop` it might be the case when the end of storage is filled with zeros. So the actual amount of stored data is less than amount of allocated memory. This way:
* for `flatfile` this is no problem: we never go there if we don't have a link from `seqno_index`
* for `seqno_index` this is a problem: we need to identify where is the actual end of the data:
* you can see that `(offset, length)` pairs have a special structure: `offset` is monotonically increasing sequence, `length` is always non-zero
* when we reload the `seqno_index` and see that it is not empty, but has zeros in the end we use binary search to find the actual storage end and reload the storage knowing the size already
2 changes: 1 addition & 1 deletion fraos/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ impl Database {
offset += record.len();
}

let seqno = self.seqno_index.append(&seqno_index_update)?;
self.flatfile.append(records)?;
let seqno = self.seqno_index.append(&seqno_index_update)?;

Ok(seqno)
}
Expand Down
11 changes: 6 additions & 5 deletions fraos/src/growable_mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,10 @@ struct Storage {
active_map: Option<ActiveMmap>,
}

/// the struct has an active mutable mmap and inactive tail
/// the struct has an active mutable mmap (tail) and inactive prefix
/// if we have enough space we add records to the active mmap
/// if not we slice the active mmap to the actual end of writes and put it to inactive mmaps
/// then we create a new mmap with 2x size from previous
/// if 2x is not enough we create an mmap with size of the data
/// then we create a new mmap either of size of the data or MIN_MMAP_BYTES
///
pub(crate) struct GrowableMmap {
storage: RwLock<Storage>,
Expand All @@ -57,8 +56,9 @@ impl GrowableMmap {
}
}

if file_length > 0 {
let upper_cap = existing_length.unwrap_or(file_length);
let upper_cap = existing_length.unwrap_or(file_length);

if upper_cap > 0 {
let mmap = SharedMmap::new(
unsafe { MmapOptions::new().offset(0).len(upper_cap).map(file) }
.map_err(|err| FraosError::MmapError(MmapError::Mmap(err)))?,
Expand Down Expand Up @@ -327,6 +327,7 @@ impl GrowableMmap {

fn create_mmap(&self, new_mmap_size: usize, offset: usize) -> Result<MmapMut, FraosError> {
if let Some(file) = &self.file {
// that fills the file with zeros
file.set_len((offset + new_mmap_size) as u64)
.map_err(|err| FraosError::FileError(FileError::Extend(err)))?;
unsafe {
Expand Down
183 changes: 181 additions & 2 deletions fraos/src/seqno.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,21 @@ impl SeqNoIndex {
///
/// * `path` - the path to the file. It will be created if not exists.
pub fn new(path: Option<PathBuf>, writable: bool) -> Result<Self, FraosError> {
Appender::new(path, None, writable).map(|inner| Self { inner })
let mut appender =
Appender::new(path.clone(), None, writable).map(|inner| Self { inner })?;
let (_, last_len) = match appender.last()? {
None => return Ok(appender),
Some(some) => some,
};

if last_len == 0 {
// the storage wasn't shrink to fit and we need to find where the index ends
let actual_len = appender.find_actual_end()?;
appender = Appender::new(path, Some(2 * Self::SIZE_OF_USIZE * actual_len), writable)
.map(|inner| Self { inner })?;
}

Ok(appender)
}

/// Add records to index. This function will block if another write is still
Expand Down Expand Up @@ -54,6 +68,21 @@ impl SeqNoIndex {
Ok(Some(current_seqno))
}

pub fn get_length_at(&self, at: usize) -> Result<usize, FraosError> {
Ok(self
.get_offset_and_length(at)?
.ok_or(FraosError::IndexFileDamaged)?
.1)
}

#[allow(unused)]
pub fn get_offset_at(&self, at: usize) -> Result<usize, FraosError> {
Ok(self
.get_offset_and_length(at)?
.ok_or(FraosError::IndexFileDamaged)?
.0)
}

/// Get the location of a record with the given number.
pub fn get_offset_and_length(
&self,
Expand Down Expand Up @@ -114,12 +143,53 @@ impl SeqNoIndex {
pub(crate) fn mmaps_count(&self) -> Result<usize, FraosError> {
self.inner.mmaps_count()
}

// The seqno index contains pairs (offset, length), offsets grow monotonically, lengths are always non-zero
// If the storage is still open or the storage wasn't shrink to fit properly while dropped it might have tailing zeros
// This way to find out what is the actual storage size and what is indexed we need to find the actual end if seqno is not empty.
// We utilize binary search to find last non zero value par. This is the actual end
pub(crate) fn find_actual_end(&self) -> Result<usize, FraosError> {
let mut start = 0;
let len = self.len();
let mut end = self.len();

// empty index was created or index is empty
if self.get_length_at(start)? == 0 || end == 0 {
return Ok(0);
}

// all elements are non-zero
if self.get_length_at(end.saturating_sub(1))? != 0 {
return Ok(end);
}

// if index is empty we checked already
while start < len.saturating_sub(1) {
// we checked before that we have at least one zero and it is ok to access start + 1
if self.get_length_at(start + 1)? == 0 {
return Ok(start + 1);
}
let mid = (start + end) / 2;
if self.get_length_at(mid)? == 0 {
end = mid;
} else {
start = mid;
}
}

Err(FraosError::IndexFileDamaged)
}
}

#[cfg(test)]
mod tests {
use super::SeqNoIndex;
use crate::FraosError;

use crate::{FileError, FraosError, MmapError};
use memmap2::{MmapMut, MmapOptions};
use std::fs::{File, OpenOptions};
use std::mem::size_of;
use std::path::PathBuf;

#[quickcheck]
fn test_read_write(records: Vec<(usize, usize)>) {
Expand Down Expand Up @@ -186,4 +256,113 @@ mod tests {
FraosError::EmptyRecordAppended
));
}

fn get_file(path: PathBuf, writable: bool) -> Result<File, FraosError> {
let mut options = OpenOptions::new();
options.read(true);
if writable {
options.write(true).create(true);
};

options
.open(&path)
.map_err(|err| FraosError::FileError(FileError::FileOpen(path.clone(), err)))
}

fn allocate_mmap(file: &File, size: usize) -> Result<MmapMut, FraosError> {
// that fills the file with zeros
file.set_len(size as u64)
.map_err(|err| FraosError::FileError(FileError::Extend(err)))?;
unsafe { MmapOptions::new().len(size).offset(0u64).map_mut(file) }
.map_err(|err| FraosError::MmapError(MmapError::Mmap(err)))
}

#[test]
fn check_index_recovery_zero_length() {
for i in 0..20 {
let tmp = tempfile::NamedTempFile::new().unwrap();

let file = get_file(tmp.path().to_path_buf(), true).unwrap();

if i != 0 {
let mmap = allocate_mmap(&file, size_of::<usize>() * i).unwrap();
mmap.flush().unwrap();
}

let index = SeqNoIndex::new(Some(tmp.path().to_path_buf()), true);
assert!(index.is_ok(), "can't create seqno index with {} usizes", i);
let index = index.unwrap();
assert!(index.is_empty());

index.append(&[(5000, 5), (5005, 6)]).unwrap();
drop(index);

let index = SeqNoIndex::new(Some(tmp.path().to_path_buf()), true);
assert!(
index.is_ok(),
"can't create seqno index with {} usizes after append",
i,
);
let index = index.unwrap();
assert_eq!(
index.len(),
2,
"seqno index should have len 2 after append at {}",
i
);
}
}

#[test]
fn check_index_recovery_non_zero_length() {
for (non_zeros, zeros) in [(2, 0), (100, 0), (2, 1), (2, 5), (2, 10), (258, 400)] {
let tmp = tempfile::NamedTempFile::new().unwrap();

let file = get_file(tmp.path().to_path_buf(), true).unwrap();

let mut mmap = allocate_mmap(&file, size_of::<usize>() * (non_zeros + zeros)).unwrap();
for i in 0..non_zeros {
mmap.as_mut()[i * size_of::<usize>()..(i + 1) * size_of::<usize>()]
.copy_from_slice(&i.to_le_bytes()[..]);
}
mmap.flush().unwrap();

let index = SeqNoIndex::new(Some(tmp.path().to_path_buf()), true);
assert!(
index.is_ok(),
"can't create seqno index with {} non zeros and {} zeros",
non_zeros,
zeros
);
let index = index.unwrap();
assert_eq!(
index.len(),
non_zeros / 2,
"seqno index with {} non zeros and {} zeros should have len {}",
non_zeros,
zeros,
non_zeros / 2
);

index.append(&[(5000, 5), (5005, 6)]).unwrap();
drop(index);

let index = SeqNoIndex::new(Some(tmp.path().to_path_buf()), true);
assert!(
index.is_ok(),
"can't create seqno index with {} non zeros and {} zeros after append",
non_zeros,
zeros
);
let index = index.unwrap();
assert_eq!(
index.len(),
non_zeros / 2 + 2,
"seqno index with {} non zeros and {} zeros should have len {} after append",
non_zeros,
zeros,
non_zeros / 2 + 2
);
}
}
}
Loading

0 comments on commit 3e603b2

Please sign in to comment.