diff --git a/foyer-bench/src/main.rs b/foyer-bench/src/main.rs index ab6814c9..dbfef1e2 100644 --- a/foyer-bench/src/main.rs +++ b/foyer-bench/src/main.rs @@ -180,6 +180,10 @@ pub struct Args { #[arg(long, default_value = "lfu")] eviction: String, + /// Thread local cache capacity per thread. (MB) + #[arg(long, default_value_t = 0)] + thread_local_cache_capacity: usize, + /// Record insert trace threshold. Only effective with "mtrace" feature. #[arg(long, default_value_t = 1000 * 1000)] trace_insert_us: usize, @@ -423,6 +427,7 @@ async fn main() { let mut builder = builder .with_weighter(|_: &u64, value: &Value| u64::BITS as usize / 8 + value.len()) + .with_thread_local_cache_capacity(args.thread_local_cache_capacity * MIB as usize) .storage() .with_device_config( DirectFsDeviceOptionsBuilder::new(&args.dir) diff --git a/foyer-memory/Cargo.toml b/foyer-memory/Cargo.toml index 96e0977c..b7475327 100644 --- a/foyer-memory/Cargo.toml +++ b/foyer-memory/Cargo.toml @@ -23,6 +23,7 @@ libc = "0.2" minitrace = { workspace = true } parking_lot = "0.12" serde = { workspace = true } +thread_local = "1.1" tokio = { workspace = true } tracing = "0.1" diff --git a/foyer-memory/src/cache.rs b/foyer-memory/src/cache.rs index 5e7b42c2..a8664dfa 100644 --- a/foyer-memory/src/cache.rs +++ b/foyer-memory/src/cache.rs @@ -299,6 +299,8 @@ where weighter: Arc>, event_listener: Option>>, + + thread_local_cache_capacity: usize, } impl CacheBuilder @@ -324,6 +326,7 @@ where hash_builder: RandomState::default(), weighter: Arc::new(|_, _| 1), event_listener: None, + thread_local_cache_capacity: 0, } } } @@ -383,6 +386,7 @@ where hash_builder, weighter: self.weighter, event_listener: self.event_listener, + thread_local_cache_capacity: self.thread_local_cache_capacity, } } @@ -398,6 +402,12 @@ where self } + /// Set thread local cache capacity. + pub fn with_thread_local_cache_capacity(mut self, thread_local_cache_capacity: usize) -> Self { + self.thread_local_cache_capacity = thread_local_cache_capacity; + self + } + /// Build in-memory cache with the given configuration. pub fn build(self) -> Cache { match self.eviction_config { @@ -410,6 +420,7 @@ where hash_builder: self.hash_builder, weighter: self.weighter, event_listener: self.event_listener, + thread_local_cache_capacity: self.thread_local_cache_capacity, }))), EvictionConfig::Lru(eviction_config) => Cache::Lru(Arc::new(GenericCache::new(GenericCacheConfig { name: self.name, @@ -420,6 +431,7 @@ where hash_builder: self.hash_builder, weighter: self.weighter, event_listener: self.event_listener, + thread_local_cache_capacity: self.thread_local_cache_capacity, }))), EvictionConfig::Lfu(eviction_config) => Cache::Lfu(Arc::new(GenericCache::new(GenericCacheConfig { name: self.name, @@ -430,6 +442,7 @@ where hash_builder: self.hash_builder, weighter: self.weighter, event_listener: self.event_listener, + thread_local_cache_capacity: self.thread_local_cache_capacity, }))), EvictionConfig::S3Fifo(eviction_config) => Cache::S3Fifo(Arc::new(GenericCache::new(GenericCacheConfig { name: self.name, @@ -440,6 +453,7 @@ where hash_builder: self.hash_builder, weighter: self.weighter, event_listener: self.event_listener, + thread_local_cache_capacity: self.thread_local_cache_capacity, }))), } } diff --git a/foyer-memory/src/generic.rs b/foyer-memory/src/generic.rs index 1cbfb6bf..d7b8deae 100644 --- a/foyer-memory/src/generic.rs +++ b/foyer-memory/src/generic.rs @@ -14,6 +14,7 @@ use std::{ borrow::Borrow, + cell::RefCell, fmt::Debug, future::Future, hash::Hash, @@ -37,12 +38,14 @@ use futures::FutureExt; use hashbrown::hash_map::{Entry as HashMapEntry, HashMap}; use itertools::Itertools; use parking_lot::{lock_api::MutexGuard, Mutex, RawMutex}; +use thread_local::ThreadLocal; use tokio::{sync::oneshot, task::JoinHandle}; use crate::{ eviction::Eviction, handle::{Handle, HandleExt, KeyedHandle}, indexer::Indexer, + local::LocalCache, CacheContext, }; @@ -390,6 +393,7 @@ where pub hash_builder: S, pub weighter: Arc>, pub event_listener: Option>>, + pub thread_local_cache_capacity: usize, } // TODO(MrCroxx): use `expect` after `lint_reasons` is stable. @@ -459,6 +463,9 @@ where I: Indexer, S: HashBuilder, { + thread_local_cache: Option>>>, + thread_local_cache_capacity: usize, + shards: Vec>>, capacity: usize, @@ -501,7 +508,15 @@ where .map(Mutex::new) .collect_vec(); + let thread_local_cache = if config.thread_local_cache_capacity > 0 { + Some(ThreadLocal::new()) + } else { + None + }; + Self { + thread_local_cache, + thread_local_cache_capacity: config.thread_local_cache_capacity, shards, capacity: config.capacity, usages, @@ -615,13 +630,34 @@ where { let hash = self.hash_builder.hash_one(key); - unsafe { + if let Some(local) = self.thread_local_cache.as_ref() { + if let Some(entry) = local + .get_or(|| RefCell::new(LocalCache::new(self.thread_local_cache_capacity))) + .borrow() + .get(hash, key) + { + return Some(entry); + } + } + + let ret = unsafe { let mut shard = self.shard(hash as usize % self.shards.len()); shard.get(hash, key).map(|ptr| GenericCacheEntry { cache: self.clone(), ptr, }) + }; + + if let Some(entry) = ret.as_ref() { + if let Some(local) = self.thread_local_cache.as_ref() { + local + .get_or(|| RefCell::new(LocalCache::new(self.thread_local_cache_capacity))) + .borrow_mut() + .insert(entry); + } } + + ret } pub fn contains(self: &Arc, key: &Q) -> bool @@ -1019,6 +1055,7 @@ mod tests { hash_builder: RandomState::default(), weighter: Arc::new(|_, _| 1), event_listener: None, + thread_local_cache_capacity: 0, }))) } @@ -1033,6 +1070,7 @@ mod tests { hash_builder: RandomState::default(), weighter: Arc::new(|_, _| 1), event_listener: None, + thread_local_cache_capacity: 0, }))) } @@ -1047,6 +1085,7 @@ mod tests { hash_builder: RandomState::default(), weighter: Arc::new(|_, _| 1), event_listener: None, + thread_local_cache_capacity: 0, }))) } @@ -1061,6 +1100,7 @@ mod tests { hash_builder: RandomState::default(), weighter: Arc::new(|_, _| 1), event_listener: None, + thread_local_cache_capacity: 0, }))) } @@ -1074,6 +1114,7 @@ mod tests { hash_builder: RandomState::default(), weighter: Arc::new(|_, v: &String| v.len()), event_listener: None, + thread_local_cache_capacity: 0, }; Arc::new(FifoCache::::new(config)) } @@ -1090,6 +1131,7 @@ mod tests { hash_builder: RandomState::default(), weighter: Arc::new(|_, v: &String| v.len()), event_listener: None, + thread_local_cache_capacity: 0, }; Arc::new(LruCache::::new(config)) } diff --git a/foyer-memory/src/lib.rs b/foyer-memory/src/lib.rs index 47df1c63..c185bdbc 100644 --- a/foyer-memory/src/lib.rs +++ b/foyer-memory/src/lib.rs @@ -70,6 +70,7 @@ mod eviction; mod generic; mod handle; mod indexer; +mod local; mod prelude; pub use prelude::*; diff --git a/foyer-memory/src/local.rs b/foyer-memory/src/local.rs new file mode 100644 index 00000000..1142c21a --- /dev/null +++ b/foyer-memory/src/local.rs @@ -0,0 +1,79 @@ +// Copyright 2024 Foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{borrow::Borrow, collections::VecDeque, hash::Hash}; + +use foyer_common::code::{HashBuilder, Key, Value}; +use hashbrown::{hash_table::Entry, HashTable}; + +use crate::{eviction::Eviction, generic::GenericCacheEntry, handle::KeyedHandle, indexer::Indexer}; + +/// Thread-local cache. +pub struct LocalCache +where + K: Key, + V: Value, + E: Eviction, + E::Handle: KeyedHandle, + I: Indexer, + S: HashBuilder, +{ + map: HashTable>, + queue: VecDeque>, + + capacity: usize, + weight: usize, +} + +impl LocalCache +where + K: Key, + V: Value, + E: Eviction, + E::Handle: KeyedHandle, + I: Indexer, + S: HashBuilder, +{ + pub fn new(capacity: usize) -> Self { + Self { + map: HashTable::default(), + queue: VecDeque::default(), + capacity, + weight: 0, + } + } + + pub fn insert(&mut self, entry: &GenericCacheEntry) { + match self.map.entry(entry.hash(), |e| e.key() == entry.key(), |e| e.hash()) { + Entry::Occupied(_) => return, + Entry::Vacant(v) => { + v.insert(entry.clone()); + } + } + self.weight += entry.weight(); + self.queue.push_back(entry.clone()); + while self.weight > self.capacity { + let e = self.queue.pop_front().unwrap(); + self.weight -= e.weight(); + } + } + + pub fn get(&self, hash: u64, key: &Q) -> Option> + where + K: Borrow, + Q: Hash + Eq + ?Sized, + { + self.map.find(hash, |e| e.key().borrow() == key).cloned() + } +} diff --git a/foyer/src/hybrid/builder.rs b/foyer/src/hybrid/builder.rs index c905cffc..f5c5e475 100644 --- a/foyer/src/hybrid/builder.rs +++ b/foyer/src/hybrid/builder.rs @@ -173,6 +173,18 @@ where } } + /// Set thread local cache capacity. + pub fn with_thread_local_cache_capacity(self, thread_local_cache_capacity: usize) -> Self { + let builder = self + .builder + .with_thread_local_cache_capacity(thread_local_cache_capacity); + HybridCacheBuilderPhaseMemory { + name: self.name, + trace_config: self.trace_config, + builder, + } + } + /// Continue to modify the in-memory cache configurations. pub fn storage(self) -> HybridCacheBuilderPhaseStorage { let memory = self.builder.build();