Skip to content
This repository has been archived by the owner on Aug 4, 2024. It is now read-only.

Commit

Permalink
perf: remove unnecessary nesting of merging iter and reduce clone dur…
Browse files Browse the repository at this point in the history
…ing iteration
  • Loading branch information
KKould committed Apr 23, 2024
1 parent a0699b2 commit 330aff7
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 58 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "kip_db"
version = "0.1.2-alpha.25.fix2"
version = "0.1.2-alpha.26"
edition = "2021"
authors = ["Kould <[email protected]>"]
description = "轻量级、异步 基于LSM Leveled Compaction K-V数据库"
Expand Down
100 changes: 47 additions & 53 deletions src/kernel/lsm/iterator/merging_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl Ord for IterKey {

struct InnerIter {
map_buf: BTreeMap<IterKey, Option<Bytes>>,
pre_key: Option<Bytes>,
next_buf: Option<KeyValue>,
}

pub(crate) struct MergingIter<'a> {
Expand All @@ -42,52 +42,39 @@ pub(crate) struct SeekMergingIter<'a> {
inner: InnerIter,
}

impl<'a> MergingIter<'a> {
#[allow(clippy::mutable_key_type)]
pub(crate) fn new(
mut vec_iter: Vec<Box<dyn Iter<'a, Item = KeyValue> + 'a + Send + Sync>>,
) -> KernelResult<Self> {
let mut map_buf = BTreeMap::new();

for (num, iter) in vec_iter.iter_mut().enumerate() {
if let Some(item) = iter.try_next()? {
InnerIter::buf_map_insert(&mut map_buf, num, item);
}
}
macro_rules! impl_new {
($struct_name:ident, $vec_iter_type:ty) => {
impl<'a> $struct_name<'a> {
#[allow(clippy::mutable_key_type)]
pub(crate) fn new(mut vec_iter: $vec_iter_type) -> KernelResult<Self> {
let mut map_buf = BTreeMap::new();

Ok(MergingIter {
vec_iter,
inner: InnerIter {
map_buf,
pre_key: None,
},
})
}
}
for (num, iter) in vec_iter.iter_mut().enumerate() {
if let Some(item) = iter.try_next()? {
InnerIter::buf_map_insert(&mut map_buf, num, item);
}
}
let mut inner = InnerIter {
map_buf,
next_buf: None,
};
inner.init_next_buf();

impl<'a> SeekMergingIter<'a> {
#[allow(clippy::mutable_key_type)]
pub(crate) fn new(
mut vec_iter: Vec<Box<dyn SeekIter<'a, Item = KeyValue> + 'a + Send + Sync>>,
) -> KernelResult<Self> {
let mut map_buf = BTreeMap::new();

for (num, iter) in vec_iter.iter_mut().enumerate() {
if let Some(item) = iter.try_next()? {
InnerIter::buf_map_insert(&mut map_buf, num, item);
Ok($struct_name { vec_iter, inner })
}
}

Ok(SeekMergingIter {
vec_iter,
inner: InnerIter {
map_buf,
pre_key: None,
},
})
}
};
}

impl_new!(
MergingIter,
Vec<Box<dyn Iter<'a, Item = KeyValue> + 'a + Send + Sync>>
);
impl_new!(
SeekMergingIter,
Vec<Box<dyn SeekIter<'a, Item = KeyValue> + 'a + Send + Sync>>
);

macro_rules! is_valid {
($vec_iter:expr) => {
$vec_iter
Expand Down Expand Up @@ -121,26 +108,32 @@ impl<'a> Iter<'a> for SeekMergingIter<'a> {
}
}

impl InnerIter {
fn init_next_buf(&mut self) {
self.next_buf = self
.map_buf
.first_key_value()
.map(|(key, value)| (key.key.clone(), value.clone()));
}
}

macro_rules! impl_try_next {
($func:ident, $vec_iter:ty) => {
impl InnerIter {
fn $func(&mut self, vec_iter: &mut [$vec_iter]) -> KernelResult<Option<KeyValue>> {
while let Some((IterKey { num, key }, value)) = self.map_buf.pop_first() {
if let Some(item) = vec_iter[num].try_next()? {
Self::buf_map_insert(&mut self.map_buf, num, item);
}

// 跳过重复元素
if let Some(pre_key) = &self.pre_key {
if pre_key == &key {
if let Some(item_buf) = self.next_buf.take() {
while let Some((IterKey { num, key }, value)) = self.map_buf.pop_first() {
if let Some(item) = vec_iter[num].try_next()? {
Self::buf_map_insert(&mut self.map_buf, num, item);
}
if item_buf.0 == key {
continue;
}
self.next_buf = Some((key, value));
break;
}
self.pre_key = Some(key.clone());

return Ok(Some((key, value)));
return Ok(Some(item_buf));
}

Ok(None)
}
}
Expand Down Expand Up @@ -185,6 +178,7 @@ impl<'a> SeekIter<'a> for SeekMergingIter<'a> {

self.inner.map_buf = seek_map;
}
self.inner.init_next_buf();

Ok(())
}
Expand Down
11 changes: 7 additions & 4 deletions src/kernel/lsm/mvcc.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::kernel::lsm::compactor::CompactTask;
use crate::kernel::lsm::iterator::merging_iter::MergingIter;
use crate::kernel::lsm::iterator::{Iter, Seek, SeekIter};
use crate::kernel::lsm::iterator::{Iter, Seek};
use crate::kernel::lsm::mem_table::{KeyValue, MemTable};
use crate::kernel::lsm::query_and_compaction;
use crate::kernel::lsm::storage::{KipStorage, Sequence, StoreInner};
Expand Down Expand Up @@ -161,15 +161,18 @@ impl Transaction {
pos: 0,
}
}));
let mut ver_iter = VersionIter::new(&self.version)?;
let mut vec_seek_iter = Vec::new();
VersionIter::merging_with_version(&self.version, &mut vec_seek_iter)?;

match &min {
Bound::Included(key) | Bound::Excluded(key) => {
ver_iter.seek(Seek::Backward(key.as_slice()))?;
for mut seek_iter in vec_seek_iter {
seek_iter.seek(Seek::Backward(key.as_slice()))?;
vec_iter.push(seek_iter as Box<dyn Iter<Item = KeyValue> + Send + Sync>)
}
}
Bound::Unbounded => (),
};
vec_iter.push(Box::new(ver_iter));

Ok(TransactionIter {
inner: MergingIter::new(vec_iter)?,
Expand Down

0 comments on commit 330aff7

Please sign in to comment.