Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce tls in-memory cache to reduce lock compete #580

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions foyer-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@
#[arg(long, default_value = "lfu")]
eviction: String,

/// Thread local cache capacity per thread. (MB)
#[arg(long, default_value_t = 0)]
thread_local_cache_capacity: usize,

Check warning on line 185 in foyer-bench/src/main.rs

View check run for this annotation

Codecov / codecov/patch

foyer-bench/src/main.rs#L185

Added line #L185 was not covered by tests

/// Record insert trace threshold. Only effective with "mtrace" feature.
#[arg(long, default_value_t = 1000 * 1000)]
trace_insert_us: usize,
Expand Down Expand Up @@ -423,6 +427,7 @@

let mut builder = builder
.with_weighter(|_: &u64, value: &Value| u64::BITS as usize / 8 + value.len())
.with_thread_local_cache_capacity(args.thread_local_cache_capacity * MIB as usize)
.storage()
.with_device_config(
DirectFsDeviceOptionsBuilder::new(&args.dir)
Expand Down
1 change: 1 addition & 0 deletions foyer-memory/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ libc = "0.2"
minitrace = { workspace = true }
parking_lot = "0.12"
serde = { workspace = true }
thread_local = "1.1"
tokio = { workspace = true }
tracing = "0.1"

Expand Down
14 changes: 14 additions & 0 deletions foyer-memory/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,8 @@ where
weighter: Arc<dyn Weighter<K, V>>,

event_listener: Option<Arc<dyn EventListener<Key = K, Value = V>>>,

thread_local_cache_capacity: usize,
}

impl<K, V> CacheBuilder<K, V, RandomState>
Expand All @@ -324,6 +326,7 @@ where
hash_builder: RandomState::default(),
weighter: Arc::new(|_, _| 1),
event_listener: None,
thread_local_cache_capacity: 0,
}
}
}
Expand Down Expand Up @@ -383,6 +386,7 @@ where
hash_builder,
weighter: self.weighter,
event_listener: self.event_listener,
thread_local_cache_capacity: self.thread_local_cache_capacity,
}
}

Expand All @@ -398,6 +402,12 @@ where
self
}

/// Set thread local cache capacity.
pub fn with_thread_local_cache_capacity(mut self, thread_local_cache_capacity: usize) -> Self {
self.thread_local_cache_capacity = thread_local_cache_capacity;
self
}

/// Build in-memory cache with the given configuration.
pub fn build(self) -> Cache<K, V, S> {
match self.eviction_config {
Expand All @@ -410,6 +420,7 @@ where
hash_builder: self.hash_builder,
weighter: self.weighter,
event_listener: self.event_listener,
thread_local_cache_capacity: self.thread_local_cache_capacity,
}))),
EvictionConfig::Lru(eviction_config) => Cache::Lru(Arc::new(GenericCache::new(GenericCacheConfig {
name: self.name,
Expand All @@ -420,6 +431,7 @@ where
hash_builder: self.hash_builder,
weighter: self.weighter,
event_listener: self.event_listener,
thread_local_cache_capacity: self.thread_local_cache_capacity,
}))),
EvictionConfig::Lfu(eviction_config) => Cache::Lfu(Arc::new(GenericCache::new(GenericCacheConfig {
name: self.name,
Expand All @@ -430,6 +442,7 @@ where
hash_builder: self.hash_builder,
weighter: self.weighter,
event_listener: self.event_listener,
thread_local_cache_capacity: self.thread_local_cache_capacity,
}))),
EvictionConfig::S3Fifo(eviction_config) => Cache::S3Fifo(Arc::new(GenericCache::new(GenericCacheConfig {
name: self.name,
Expand All @@ -440,6 +453,7 @@ where
hash_builder: self.hash_builder,
weighter: self.weighter,
event_listener: self.event_listener,
thread_local_cache_capacity: self.thread_local_cache_capacity,
}))),
}
}
Expand Down
44 changes: 43 additions & 1 deletion foyer-memory/src/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::{
borrow::Borrow,
cell::RefCell,
fmt::Debug,
future::Future,
hash::Hash,
Expand All @@ -37,12 +38,14 @@
use hashbrown::hash_map::{Entry as HashMapEntry, HashMap};
use itertools::Itertools;
use parking_lot::{lock_api::MutexGuard, Mutex, RawMutex};
use thread_local::ThreadLocal;
use tokio::{sync::oneshot, task::JoinHandle};

use crate::{
eviction::Eviction,
handle::{Handle, HandleExt, KeyedHandle},
indexer::Indexer,
local::LocalCache,
CacheContext,
};

Expand Down Expand Up @@ -390,6 +393,7 @@
pub hash_builder: S,
pub weighter: Arc<dyn Weighter<K, V>>,
pub event_listener: Option<Arc<dyn EventListener<Key = K, Value = V>>>,
pub thread_local_cache_capacity: usize,
}

// TODO(MrCroxx): use `expect` after `lint_reasons` is stable.
Expand Down Expand Up @@ -459,6 +463,9 @@
I: Indexer<Key = K, Handle = E::Handle>,
S: HashBuilder,
{
thread_local_cache: Option<ThreadLocal<RefCell<LocalCache<K, V, E, I, S>>>>,
thread_local_cache_capacity: usize,

shards: Vec<Mutex<GenericCacheShard<K, V, E, I, S>>>,

capacity: usize,
Expand Down Expand Up @@ -501,7 +508,15 @@
.map(Mutex::new)
.collect_vec();

let thread_local_cache = if config.thread_local_cache_capacity > 0 {
Some(ThreadLocal::new())

Check warning on line 512 in foyer-memory/src/generic.rs

View check run for this annotation

Codecov / codecov/patch

foyer-memory/src/generic.rs#L512

Added line #L512 was not covered by tests
} else {
None
};

Self {
thread_local_cache,
thread_local_cache_capacity: config.thread_local_cache_capacity,
shards,
capacity: config.capacity,
usages,
Expand Down Expand Up @@ -615,13 +630,34 @@
{
let hash = self.hash_builder.hash_one(key);

unsafe {
if let Some(local) = self.thread_local_cache.as_ref() {
if let Some(entry) = local
.get_or(|| RefCell::new(LocalCache::new(self.thread_local_cache_capacity)))

Check warning on line 635 in foyer-memory/src/generic.rs

View check run for this annotation

Codecov / codecov/patch

foyer-memory/src/generic.rs#L635

Added line #L635 was not covered by tests
.borrow()
.get(hash, key)
{
return Some(entry);
}
}

let ret = unsafe {
let mut shard = self.shard(hash as usize % self.shards.len());
shard.get(hash, key).map(|ptr| GenericCacheEntry {
cache: self.clone(),
ptr,
})
};

if let Some(entry) = ret.as_ref() {
if let Some(local) = self.thread_local_cache.as_ref() {
local
.get_or(|| RefCell::new(LocalCache::new(self.thread_local_cache_capacity)))

Check warning on line 654 in foyer-memory/src/generic.rs

View check run for this annotation

Codecov / codecov/patch

foyer-memory/src/generic.rs#L654

Added line #L654 was not covered by tests
.borrow_mut()
.insert(entry);
}
}

ret
}

pub fn contains<Q>(self: &Arc<Self>, key: &Q) -> bool
Expand Down Expand Up @@ -1019,6 +1055,7 @@
hash_builder: RandomState::default(),
weighter: Arc::new(|_, _| 1),
event_listener: None,
thread_local_cache_capacity: 0,
})))
}

Expand All @@ -1033,6 +1070,7 @@
hash_builder: RandomState::default(),
weighter: Arc::new(|_, _| 1),
event_listener: None,
thread_local_cache_capacity: 0,
})))
}

Expand All @@ -1047,6 +1085,7 @@
hash_builder: RandomState::default(),
weighter: Arc::new(|_, _| 1),
event_listener: None,
thread_local_cache_capacity: 0,
})))
}

Expand All @@ -1061,6 +1100,7 @@
hash_builder: RandomState::default(),
weighter: Arc::new(|_, _| 1),
event_listener: None,
thread_local_cache_capacity: 0,
})))
}

Expand All @@ -1074,6 +1114,7 @@
hash_builder: RandomState::default(),
weighter: Arc::new(|_, v: &String| v.len()),
event_listener: None,
thread_local_cache_capacity: 0,
};
Arc::new(FifoCache::<u64, String>::new(config))
}
Expand All @@ -1090,6 +1131,7 @@
hash_builder: RandomState::default(),
weighter: Arc::new(|_, v: &String| v.len()),
event_listener: None,
thread_local_cache_capacity: 0,
};
Arc::new(LruCache::<u64, String>::new(config))
}
Expand Down
1 change: 1 addition & 0 deletions foyer-memory/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ mod eviction;
mod generic;
mod handle;
mod indexer;
mod local;
mod prelude;

pub use prelude::*;
79 changes: 79 additions & 0 deletions foyer-memory/src/local.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2024 Foyer Project Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{borrow::Borrow, collections::VecDeque, hash::Hash};

use foyer_common::code::{HashBuilder, Key, Value};
use hashbrown::{hash_table::Entry, HashTable};

use crate::{eviction::Eviction, generic::GenericCacheEntry, handle::KeyedHandle, indexer::Indexer};

/// Thread-local cache.
pub struct LocalCache<K, V, E, I, S>
where
K: Key,
V: Value,
E: Eviction,
E::Handle: KeyedHandle<Key = K, Data = (K, V)>,
I: Indexer<Key = K, Handle = E::Handle>,
S: HashBuilder,
{
map: HashTable<GenericCacheEntry<K, V, E, I, S>>,
queue: VecDeque<GenericCacheEntry<K, V, E, I, S>>,

capacity: usize,
weight: usize,
}

impl<K, V, E, I, S> LocalCache<K, V, E, I, S>
where
K: Key,
V: Value,
E: Eviction,
E::Handle: KeyedHandle<Key = K, Data = (K, V)>,
I: Indexer<Key = K, Handle = E::Handle>,
S: HashBuilder,
{
pub fn new(capacity: usize) -> Self {
Self {
map: HashTable::default(),
queue: VecDeque::default(),
capacity,
weight: 0,
}
}

Check warning on line 55 in foyer-memory/src/local.rs

View check run for this annotation

Codecov / codecov/patch

foyer-memory/src/local.rs#L48-L55

Added lines #L48 - L55 were not covered by tests

pub fn insert(&mut self, entry: &GenericCacheEntry<K, V, E, I, S>) {
match self.map.entry(entry.hash(), |e| e.key() == entry.key(), |e| e.hash()) {
Entry::Occupied(_) => return,
Entry::Vacant(v) => {
v.insert(entry.clone());
}
}
self.weight += entry.weight();
self.queue.push_back(entry.clone());
while self.weight > self.capacity {
let e = self.queue.pop_front().unwrap();
self.weight -= e.weight();
}
}

Check warning on line 70 in foyer-memory/src/local.rs

View check run for this annotation

Codecov / codecov/patch

foyer-memory/src/local.rs#L57-L70

Added lines #L57 - L70 were not covered by tests

pub fn get<Q>(&self, hash: u64, key: &Q) -> Option<GenericCacheEntry<K, V, E, I, S>>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self.map.find(hash, |e| e.key().borrow() == key).cloned()
}

Check warning on line 78 in foyer-memory/src/local.rs

View check run for this annotation

Codecov / codecov/patch

foyer-memory/src/local.rs#L72-L78

Added lines #L72 - L78 were not covered by tests
}
12 changes: 12 additions & 0 deletions foyer/src/hybrid/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,18 @@ where
}
}

/// Set thread local cache capacity.
pub fn with_thread_local_cache_capacity(self, thread_local_cache_capacity: usize) -> Self {
let builder = self
.builder
.with_thread_local_cache_capacity(thread_local_cache_capacity);
HybridCacheBuilderPhaseMemory {
name: self.name,
trace_config: self.trace_config,
builder,
}
}

/// Continue to modify the in-memory cache configurations.
pub fn storage(self) -> HybridCacheBuilderPhaseStorage<K, V, S> {
let memory = self.builder.build();
Expand Down
Loading