From 72a31b80fd834429cf771dfe8f143bd70aabdd14 Mon Sep 17 00:00:00 2001 From: caijieming Date: Fri, 19 May 2017 02:23:28 +0800 Subject: [PATCH] issue=1258, Tcache support block-level cache evict --- src/io/tablet_io.cc | 17 +- src/io/utils_leveldb.cc | 17 + src/io/utils_leveldb.h | 2 + src/leveldb/include/leveldb/block_cache.h | 99 ++ src/leveldb/include/leveldb/cache.h | 6 +- src/leveldb/util/block_cache.cc | 1035 +++++++++++++++++++++ src/leveldb/util/cache.cc | 166 +++- src/leveldb/util/coding_test.cc | 11 + src/tabletnode/tabletnode_impl.cc | 54 +- src/tera_flags.cc | 3 +- 10 files changed, 1372 insertions(+), 38 deletions(-) create mode 100644 src/leveldb/include/leveldb/block_cache.h create mode 100644 src/leveldb/util/block_cache.cc diff --git a/src/io/tablet_io.cc b/src/io/tablet_io.cc index 9e92c121b..92adb0deb 100644 --- a/src/io/tablet_io.cc +++ b/src/io/tablet_io.cc @@ -1668,18 +1668,25 @@ void TabletIO::SetupOptionsForLG() { lg_info->env = LeveldbMockEnv(); } else if (store == MemoryStore) { if (FLAGS_tera_use_flash_for_memenv) { - lg_info->env = LeveldbFlashEnv(); + if (FLAGS_tera_tabletnode_block_cache_enabled) { + LOG(INFO) << "MemLG[" << lg_i << "] activate TCache"; + lg_info->env = io::DefaultBlockCacheEnv(); + } else { + lg_info->env = LeveldbFlashEnv(); + } } else { lg_info->env = LeveldbMemEnv(); } lg_info->seek_latency = 0; lg_info->block_cache = m_memory_cache; } else if (store == FlashStore) { - if (!FLAGS_tera_tabletnode_cache_enabled) { - lg_info->env = LeveldbFlashEnv(); + if (FLAGS_tera_tabletnode_block_cache_enabled) { + //LOG(INFO) << "activate block-level Cache store"; + //lg_info->env = leveldb::EnvThreeLevelCache(); + LOG(INFO) << "FlashLG[" << lg_i << "] activate TCache"; + lg_info->env = io::DefaultBlockCacheEnv(); } else { - LOG(INFO) << "activate block-level Cache store"; - lg_info->env = leveldb::EnvThreeLevelCache(); + lg_info->env = LeveldbFlashEnv(); } lg_info->seek_latency = FLAGS_tera_leveldb_env_local_seek_latency; } else { diff --git a/src/io/utils_leveldb.cc b/src/io/utils_leveldb.cc index 253e23f56..967d42065 100644 --- a/src/io/utils_leveldb.cc +++ b/src/io/utils_leveldb.cc @@ -18,6 +18,7 @@ #include "leveldb/comparator.h" #include "leveldb/env_dfs.h" #include "leveldb/env_flash.h" +#include "leveldb/block_cache.h" #include "leveldb/env_inmem.h" #include "leveldb/env_mock.h" #include "leveldb/table_utils.h" @@ -31,6 +32,7 @@ DECLARE_string(tera_leveldb_env_hdfs2_nameservice_list); DECLARE_string(tera_tabletnode_path_prefix); DECLARE_string(tera_dfs_so_path); DECLARE_string(tera_dfs_conf); +DECLARE_int32(tera_leveldb_block_cache_env_num_thread); namespace tera { namespace io { @@ -66,6 +68,20 @@ leveldb::Env* LeveldbBaseEnv() { } } +// Tcache: default env +static pthread_once_t block_cache_once = PTHREAD_ONCE_INIT; +static Env* default_block_cache_env; +static void InitDefaultBlockCacheEnv() { + default_block_cache_env = new BlockCacheEnv(leveldbBaseEnv); + default_block_cache_env->SetBackgroundThreads(FLAGS_tera_leveldb_block_cache_env_num_thread); +} + +leveldb::Env* DefaultBlockCacheEnv() { + pthread_once(&block_cache_once, InitDefaultBlockCacheEnv); + return default_block_cache_env; +} + +// mem env leveldb::Env* LeveldbMemEnv() { static Mutex mutex; static leveldb::Env* mem_env = NULL; @@ -78,6 +94,7 @@ leveldb::Env* LeveldbMemEnv() { return mem_env; } +// flash env leveldb::Env* LeveldbFlashEnv() { static Mutex mutex; static leveldb::Env* flash_env = NULL; diff --git a/src/io/utils_leveldb.h b/src/io/utils_leveldb.h index f77847db9..39e5d73c1 100644 --- a/src/io/utils_leveldb.h +++ b/src/io/utils_leveldb.h @@ -18,6 +18,8 @@ void InitDfsEnv(); // return the base env leveldb used (dfs/local), singleton leveldb::Env* LeveldbBaseEnv(); +leveldb::Env* DefaultBlockCacheEnv(); // ssd + base + // return the mem env leveldb used, singleton leveldb::Env* LeveldbMemEnv(); diff --git a/src/leveldb/include/leveldb/block_cache.h b/src/leveldb/include/leveldb/block_cache.h new file mode 100644 index 000000000..200f2cb01 --- /dev/null +++ b/src/leveldb/include/leveldb/block_cache.h @@ -0,0 +1,99 @@ +// Copyright (c) 2015, Baidu.com, Inc. All Rights Reserved +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef STOREAGE_LEVELDB_UTIL_BLOCK_CACHE_H +#define STOREAGE_LEVELDB_UTIL_BLOCK_CACHE_H +#include "leveldb/status.h" +#include "leveldb/options.h" + +namespace leveldb { +///////////////////////////////////////////// +// Tcache +///////////////////////////////////////////// +uint64_t kBlockSize = 8192UL; +uint64_t kDataSetSize = 134217728UL; +uint64_t kFidBatchNum = 200000UL; +uint64_t kCacheSize = 350000000000UL; +uint64_t kMetaBlockSize = 2000UL; +uint64_t kMetaTableSize = 500UL; +uint64_t kWriteBufferSize = 1048576UL; + +struct BlockCacheOptions { + Options opts; + std::string cache_dir; + uint64_t block_size; + uint64_t dataset_size; + uint64_t fid_batch_num; + uint64_t cache_size; + uint64_t dataset_num; + uint64_t meta_block_cache_size; + uint64_t meta_table_cache_size; + uint64_t write_buffer_size; + Env* env; + + BlockCacheOptions() + : block_size(kBlockSize), + dataset_size(kDataSetSize), + fid_batch_num(kFidBatchNum), + cache_size(kCacheSize), + meta_block_cache_size(kMetaBlockSize), + meta_table_cache_size(kMetaTableSize), + write_buffer_size(kWriteBufferSize), + env(NULL) { + dataset_num = cache_size / dataset_size + 1; + } +}; + +class BlockCacheWritableFile; +class BlockCacheRandomAccessFile; +class BlockCacheImpl; + +class BlockCacheEnv : public EnvWrapper { +public: + BlockCacheEnv(Env* base); + + ~BlockCacheEnv(); + + virtual Status FileExists(const std::string& fname); + + virtual Status GetChildren(const std::string& path, + std::vector* result); + + virtual Status DeleteFile(const std::string& fname); + + virtual Status CreateDir(const std::string& name); + + virtual Status DeleteDir(const std::string& name); + + virtual Status CopyFile(const std::string& from, + const std::string& to); + + virtual Status GetFileSize(const std::string& fname, uint64_t* size); + + virtual Status RenameFile(const std::string& src, const std::string& target); + + virtual Status LockFile(const std::string& fname, FileLock** lock); + + virtual Status UnlockFile(FileLock* lock); + + virtual Status NewSequentialFile(const std::string& fname, + SequentialFile** result); // never cache log + + // cache relatively + virtual Status NewRandomAccessFile(const std::string& fname, + RandomAccessFile** result); // cache Pread + + virtual Status NewWritableFile(const std::string& fname, + WritableFile** result); // cache Append + Status LoadCache(const BlockCacheOptions& opts, const std::string& cache_dir); + +private: + std::vector cache_vec_; + Env* dfs_env_; +}; + +Env* NewBlockCacheEnv(Env* base); + +} // leveldb + diff --git a/src/leveldb/include/leveldb/cache.h b/src/leveldb/include/leveldb/cache.h index 636811b65..40fbaf154 100644 --- a/src/leveldb/include/leveldb/cache.h +++ b/src/leveldb/include/leveldb/cache.h @@ -31,7 +31,11 @@ class Cache; // Create a new cache with a fixed size capacity. This implementation // of Cache uses a least-recently-used eviction policy. -extern Cache* NewLRUCache(size_t capacity); +enum LRUCacheType { + kLRUCacheDefault = 0, + kLRUCache2Q = 1, +}; +extern Cache* NewLRUCache(size_t capacityint, LRUCacheType type = kLRUCacheDefault); class Cache { public: diff --git a/src/leveldb/util/block_cache.cc b/src/leveldb/util/block_cache.cc new file mode 100644 index 000000000..7e3ea2828 --- /dev/null +++ b/src/leveldb/util/block_cache.cc @@ -0,0 +1,1035 @@ +// Copyright (c) 2015, Baidu.com, Inc. All Rights Reserved +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "leveldb/block_cache.h" + +#include +#include +#include + +#include "leveldb/cache.h" +#include "leveldb/env.h" +#include "leveldb/env_dfs.h" +#include "leveldb/slog.h" +#include "leveldb/status.h" +#include "leveldb/table_utils.h" +#include "util/coding.h" +#include "util/crc32c.h" +#include "util/hash.h" +// #include "util/md5.h" // just for debug, so not svn-ci +#include "util/mutexlock.h" +#include "util/string_ext.h" + +namespace leveldb { + +///////////////////////////////////////////// +// Tcache +///////////////////////////////////////////// +class BlockCacheWritableFile; +class BlockCacheRandomAccessFile; +class BlockCacheImpl; + +// Must insure not init more than twice +Env* NewBlockCacheEnv(Env* base) { + return new BlockCacheEnv(base); +} + +BlockCacheEnv::BlockCacheEnv(Env* base) + : EnvWrapper(NewPosixEnv()), dfs_env_(base_env) { + target()->SetBackgroundThreads(30); +} + +~BlockCacheEnv::BlockCacheEnv() {} + +Status BlockCacheEnv::FileExists(const std::string& fname) { + return dfs_env_->FileExists(fname); +} + +Status BlockCacheEnv::GetChildren(const std::string& path, + std::vector* result) { + return dfs_env_->GetChildren(path, result); +} + +Status BlockCacheEnv::DeleteFile(const std::string& fname) { + return dfs_env_->DeleteFile(fname); +} + +Status BlockCacheEnv::CreateDir(const std::string& name) { + return dfs_env_->CreateDir(name); +} + +Status BlockCacheEnv::DeleteDir(const std::string& name) { + return dfs_env_->DeleteDir(name); +} + +Status BlockCacheEnv::CopyFile(const std::string& from, + const std::string& to) { + return dfs_env_->CopyFile(from, to); +} + +Status BlockCacheEnv::GetFileSize(const std::string& fname, uint64_t* size) { + return dfs_env_->GetFileSize(fname, size); +} + +Status BlockCacheEnv::RenameFile(const std::string& src, const std::string& target) { + return dfs_env_->RenameFile(src, target); +} + +Status BlockCacheEnv::LockFile(const std::string& fname, FileLock** lock) { + return dfs_env_->LockFile(fname, lock); +} + +Status BlockCacheEnv::UnlockFile(FileLock* lock) { + return dfs_env_->UnlockFile(lock); +} + +Status BlockCacheEnv::LoadCache(const BlockCacheOptions& opts, const std::string& cache_dir) { + BlockCacheOptions options = opts; + options.cache_dir = cache_dir; + options.env = dfs_env_; + BlockCacheImpl* cache = new BlockCacheImpl(options); + Status s = cache->LoadCache(); + assert(s.ok()); + cache_vec_.push_back(cache); // no need lock + return s; +} + +Status BlockCacheEnv::NewSequentialFile(const std::string& fname, + SequentialFile** result) { + return dfs_env_->NewSequentialFile(fname, result); +} + +Status BlockCacheEnv::NewWritableFile(const std::string& fname, + WritableFile** result) { + if (fname.rfind(".sst") != fname.size() - 4) { + return dfs_env_->NewWritableFile(fname, result); + } + + // cache sst file + uint32_t hash = (Hash(fname.c_str(), fname.size(), 13)) % cache_vec_.size(); + assert(cache_map_.find(hash) != cache_map_.end()); + BlockCacheImpl* cache = cache_vec_[hash]; + Status s = cache->NewWritableFile(fname, result); + return s; +} + +Status BlockCacheEnv::NewRandomAccessFile(const std::string& fname, + WritableFile** result) { + uint32_t hash = (Hash(fname.c_str(), fname.size(), 13)) % cache_vec_.size(); + assert(cache_map_.find(hash) != cache_map_.end()); + BlockCacheImpl* cache = cache_vec_[hash]; + Status s = cache->NewRandomAccessFile(fname, result); + return s; +} + +// Each SSD will New a BlockCache +// block state +uint64_t kCacheBlockValid = 1; +struct CacheBlock { + uint64_t fid; + uint64_t block_idx; + uint64_t sid; + uint64_t cache_block_idx; + uint64_t state; + port::CondVar cv; + Slice data_block; + bool data_block_alloc; + Cache::Handle* handle; + Status s; + + CacheBlock(CacheBlockCacheImpl* c) + : fid(0), + block_idx(0), + sid(0xffffffffffffffff), + cache_block_idx(0xffffffffffffffff), + state(!kCacheBlockValid), + cv(&c->mu_), + data_block_alloc(false), + handle(NULL) { + } + + void DecodeFrom(Slice record) { + fid = DecodeFixed64(record.data()); + record.remove_prefix(sizeof(uint64_t)); + block_idx = DecodeFixed64(record.data()); + record.remove_prefix(sizeof(uint64_t)); + state = DecodeFixed64(record.data()); + return; + } + + const std::string& Encode() { + std::string r; + PutFixed64(&r, fid); + PutFixed64(&r, block_idx); + PutFixed64(&r, state); + return r; + } +}; + +class BlockCacheImpl { +public: + BlockCacheImpl(const BlockCacheOptions& options); + + ~BlockCacheImpl(); + + Status LoadCache(); // init cache + + Status NewWritableFile(const std::string& fname, + WritableFile** result); + + Status NewRandomAccessFile(const std::string& fname, + RandomAccessFile** result); // cache Pread + static void BlockDeleter(const Slice& key, void* v); + +private: + Status LockAndPut(LockContent& lc); + + Status FillCache(CacheBlock* block); + + Status ReadCache(CacheBlock* block); + + uint64_t AllocFileId(); // no more than fid_batch_num + + uint64_t FileId(const std::string& fname); + + DataSet* GetDataSet(uint64_t sid); + + CacheBlock* GetAndAllocBlock(uint64_t fid, uint64_t block_idx); + + Status LogRecord(CacheBlock* block); + + Status ReleaseBlock(CacheBlock* block); + +private: + friend class BlockCacheWritableFile; + friend class BlockCacheRandomAccessFile; + friend struct CacheBlock; + BlockCacheOptions options_; + Env* dfs_env_; + //Env* posix_env_; + + port::Mutex mu_; + // key lock list + struct Waiter { + port::CondVar cv; + int wait_num; + bool done; + Waiter(port::Mutex* mu):cv(mu), wait_num(0), done(false) {} + }; + typedef std::map LockKeyMap; + LockKeyMap lock_key_; + + uint64_t new_fid_; + uint64_t prev_fid_; + + enum LockKeyType { + kDBKey = 0, + kDataSetKey = 1, + }; + struct LockContent { + int type; + + // DB key + Slice db_lock_key; + Slice db_lock_val; + std::string* db_val; + + // data set id + uint64_t sid; + DataSet* data_set; + }; + struct DataSet { + Cache* cache; + int fd; + }; + typedef std::map DataSetMap; + DataSetMap data_set_map_; + + //WritableFile* logfile_; + //log::Writer* log_; + DBImpl* db_; // store meta +}; + +BlockCacheImpl::BlockCacheImpl(const BlockCacheOptions& options) + : options_(options), + dfs_env_(options.env), + new_id_(0), + prev_id_(0), + db_(NULL) { +} + +~BlockCacheImpl::BlockCacheImpl(const BlockCacheOptions& options) {} + +Status BlockCacheImpl::NewWritableFile(const std::string& fname, + WritableFile** result) { + BlockCacheWritableFile* file = New BlockCacheWritableFile(this, fname); + *result = file; + return Status::OK; +} + +Status BlockCacheImpl::NewRandomAccessFile(const std::string& fname, + RandomAccessFile** result) { + BlockCacheRandomAccessFile* file = New BlockCacheRandomAccessFile(this, fname); + *result = file; + return Status::OK; +} + +void BlockCacheImpl::BlockDeleter(const Slice& key, void* v) { + CacheBlock* block = (CacheBlock*)v; + delete block; + return; +} + +// if lock succ, put lock_val, else get newer value +Status BlockCacheImpl::LockAndPut(LockContent& lc) { + mu_.AssertHeld(); + Status s; + std::string key; + if (lc.type === kDBKey) { + key = lc.db_lock_key.ToString(); + } else if (lc.type == kDataSetKey) { + key = "DS#"; + PutFixed64(&key, lc.sid); + } else { + return Status::NotSupported(); + } + + Waiter* w = NULL; + LockKeyMap::iterator it = lock_key_.find(key); + if (it != lock_key_.end()){ + w = it->second; + w->wait_num ++; + while (!w->done) { + w->cv.Wait(); + } + mu_.Unlock(); + + if (lc.type == kDBKey) { + ReadOptions r_opts; + s = db_->Get(r_opts, key, lc.db_val); + } else if (lc.type == kDataSetKey) { + lc.data_set = data_set_map_[lc.sid]; + } + + mu_.Lock(); + if (--w->wait_num == 0) { + // last thread wait for open + lock_key_.erase(key); + //printf("wait done %s, delete cv\n", fname.c_str()); + delete w; + } else { + //printf("wait done %s, not last\n", fname.c_str()); + } + } else { + w = new Waiter(&mu_); + w->wait_num = 1; + lock_key_[key] = w; + mu_.Unlock(); + + if (lc.type == kDBKey) { + WriteOptions w_opts; + s = db_.Write(w_opts, key, lc.db_lock_val); + if (s.ok()) { + *(lc.db_val) = lc.db_lock_val; + } + } else if (lc.type == kDataSetKey) { + std::string end_ds = "DS#"; + PutFixed64(&end_ds, lc.sid + 1); + lc.ds = new DataSet; + lc.ds->cache = NewLRUCache((options_.dataset_size / options_.block_size) + 1, leveldb::kLRUCache2Q);// number of blocks in DS + std::string file = options_.cache_dir + "/" + Uint64ToString(lc.sid); + lc.fd = open(file.c_str(), O_RDWR | O_CREAT, 0644); + assert(cl.fd > 0); + + // reload hash lru + ReadOptions s_opts; + leveldb::Iterator it = db_->NewIterator(s_opts); + for (it->Seek(key); + it->Valid() && it->key().ToString() < end_ds; + it->Next()) { + Slice lkey = it->key(); + lkey.prefix_remove(3 + sizeof(uint64_t));// remove DS#sid + //Slice lval = it->value(); + + CacheBlock* block = new CacheBlock(this); + block->DecodeFrom(it->value()); // get fid and block_idx + std::string hkey; + PutFixed64(&hkey, block->fid); + PutFixed64(&hkey, block->block_idx); + block->sid = lc.sid; + block->cache_block_idx = DecodeFixed64(lkey.data()); + Cache::Handle* handle = lc.ds->cache->Insert(hkey, block, 1, &BlockCacheImpl::BlockDeleter); + assert(handle != NULL); + handle->cache_id = block->cache_block_idx; + lc.ds->cache->Release(handle); + } + delete it; + + mu_.Lock(); + data_set_map_[sid] = lc.data_set; + mu_.Unlock(); + } + + mu_.Lock(); + if (--w->wait_num == 0) { + lock_key_.erase(key); + //printf("open done %s, no wait thread\n", fname.c_str()); + delete w; + } else { + //printf("open done %s, signal all wait thread\n", fname.c_str()); + w->done = true; + w->cv.SignalAll(); + } + } + return s; +} + +Staus BlockCacheImpl::LoadCache() { + // open meta file + std::string dbname = options_.cache_dir + "/meta/"; + options_.opts.env = Env::Default(); // local write + options_.opts.filter_policy = NewBloomFilterPolicy(10); + options_.opts.block_cache = leveldb::NewLRUCache(options_.meta_block_cache_size * 1024UL * 1024); + options_.opts.table_cache = new leveldb::TableCache(options_.meta_table_cache_size * 1024UL * 1024); + options_.opts.write_buffer_size = options_.write_buffer_size; + Status s = DB::Open(options_.opts, dbname, &db_); + assert(s.ok()); + + // recover fid + std::string key = "FID#"; + std::string val; + ReadOptions r_opts; + s = db_->Get(r_opts, key, &val); + if (!s.ok()) { + prev_fid_ = 0; + new_fid_ = 0; + } else { + prev_fid_ = DecodeFixed64(val.c_str()); + new_fid_ = prev_fid_; + } + s = Status::OK(); + return s; +} + +Status BlockCacheImpl::FillCache(CacheBlock* block) { + MutexLock l(&mu_); + uint64_t sid = block->sid; + uint64_t cache_block_idx = block->cache_block_index; + int fd = (data_set_map_[sid])->fd; + mu_.Unlock(); + + // do io without lock + ssize_t res = pwrite(fd, block->data_block.data(), block->data_block.size(), + cache_block_idx * options_.block_size); + mu_.Lock(); + if (res < 0) { + return Status::Corruption(); + } + return Status::OK(); +} + +Status BlockCacheImpl::ReadCache(CacheBlock* block) { + MutexLock l(&mu_); + uint64_t sid = block->sid; + uint64_t cache_block_idx = block->cache_block_index; + int fd = (data_set_map_[sid])->fd; + mu_.Unlock(); + + // do io without lock + ssize_t res = pread(fd, (char*)block->data_block.data(), block->data_block.size(), + cache_block_idx * options_.block_size); + mu_.Lock(); + if (res < 0) { + return Status::Corruption(); + } + return Status::OK(); +} + +uint64_t BlockCacheImpl::AllocFileId() { // no more than fid_batch_num + mu_.AssertHeld(); + uint64_t fid = ++new_fid_; + while (new_fid_ - prev_fid_ >= options_.fid_batch_num) { + std::string key = "FID#"; + std::string lock_val; + PutFixed64(&lock_val, new_fid); + std::string val; + + LockContent lc; + lc.type = kDBKey; + lc.db_lock_key = key; + lc.db_lock_val = lock_val; + lc.db_val = &val; + Status s = LockAndPut(lc); + if (s.ok()) { + prev_fid_ = DecodeFixed64(val.c_str()); + } + } + return fid; +} + +uint64_t BlockCacheImpl::FileId(const std::string& fname) { + mu_.AssertHeld(); + uint64_t fid = 0; + std::string key = "FNAME#" + fname; + mu_.Unlock(); + + ReadOptions r_opts; + std::string val; + Status s = db_->Get(r_opts, key, &val); + if (!s.ok()) { // not exist + MutexLock l(&mu_); + fid = AllocFildId(); + std::string v; + PutFixed64(&val, fid); + + LockContent lc; + lc.type = kDBKey; + lc.db_lock_key = key; + lc.db_lock_val = val; + lc.db_val = &v; + s = LockAndPut(lc); + assert(s.ok()); + fid = DecodeFixed64(v.c_str()); + } else { // fid in cache + fid = DecodeFixed64(val.c_str()); + } + + mu_.Lock(); + return fid; +} + +DataSet* BlockCacheImpl::GetDataSet(uint64_t sid) { + mu_.AssertHeld(); + DataSet* set = NULL; + + DataSetMap::iterator it = data_set_map_.find(sid); + if (it == data_set_map_.end()) { + LockContent lc; + lc.type = kDataSetKey; + lc.sid = sid; + lc.cache = NULL; + Status s = LockAndPut(lc); + set = lc.data_set; + } else { + set = it->second; + } + return set; +} + +CacheBlock* BlockCacheImpl::GetAndAllocBlock(uint64_t fid, uint64_t block_idx) { + mu_.AssertHeld(); + std::string key; + PutFixed64(&key, fid); + PutFixed64(&key, block_idx); + uint32_t hash = Hash(key.c_str(), key.size(), 7); + uint64_t sid = hash % options_.dataset_num; + + CacheBlock* block = NULL; + DataSet* ds = GetDataSet(sid); // get and alloc ds + Cache* cache = ds->cache; + Cache::Handle* h = cache->Lookup(key); + if (h == NULL) { + block = new CacheBlock(this); + h = cache->Insert(key, block, 1, &BlockCacheImpl::BlockDeleter); + assert(h != NULL); + block->fid = fid; + block->block_idx = block_idx; + block->sid = sid; + block->cache_block_idx = h->cache_id; + assert(block->state != kCacheBlockValid); + } + block->handle = h; + return block; +} + +Status BlockCacheImpl::LogRecord(CacheBlock* block) { + std::string key = "DS#"; + PutFixed64(&key, block->sid); + PutFixed64(&key, block->cache_block_idx); + leveldb::Batch batch; + batch.Put(key, block->Encode()); + return db_->Write(leveldb::WriteOptions(), &batch); +} + +Status BlockCacheImpl::ReleaseBlock(CacheBlock* block) { + mu_.AssertHeld(); + Status s; + std::string key = "DS#"; + PutFixed64(&key, block->sid); + PutFixed64(&key, block->cache_block_idx); + leveldb::Batch batch; + batch.Put(key, block->Encode()); + + if (block->data_block_alloc) { + char* data = (char*)block->data_block.data(); + delete[] data; + block->data_block = Slice(); + } + Cache::Handle* h = block->handle; + DataSet* ds = GetDataSet(block->sid); // get and alloc ds + block->handle = NULL: + ds->cache->Release(h); + mu_.Unlock(); + + // TODO: dump meta into memtable + s = db_->Write(leveldb::WriteOptions(), batch); + mu_.Lock(); + return s; +} + +class BlockCacheWriteBuffer { +public: + BlockCacheWriteBuffer(int block_size) + : offset_(0), + block_size_(block_size), + block_idx_(0), + tmp_storage_(NULL) { + } + ~BlockCacheWriteBuffer() { + assert(block_list_.size() == 0); + } + + uint32_t NumFullBlock() { // use for BGFlush + MutexLock l(&mu_); + if (block_list_.size() > 1) { + return block_list_.size() - 1; + } + return 0; + } + + Status Append(const Slice& data) { + MutexLock l(&mu_); + if (tmp_storage_ == NULL) { + tmp_storage_ = new std::string(); + tmp_storage_->resize(0); + block_list_->push_back(tmp_storage_); + } + uint32_t begin = offset_ / block_size_; + uint32_t end = (offset_ + data.size()) / block_size_; + if (begin == end) { // in the same block + tmp_storage_->append(data.data(), data.size()); + } else { + uint32_t tmp_size = block_size_ - (offset_ % block_size_); + tmp_storage_->append(data.data(), tmp_size); + Slice buf(data.data() + tmp_size(), data.size() - tmp_size); + for (uint32_t i = begin + 1; i <= end; ++i) { + tmp_storage_ = new std::string(); + tmp_storage_->resize(0); + block_list_->push_back(tmp_storage_); + if (i < end) { // last block + tmp_storage_->append(buf.data(), block_size_); + buf.remove_prefix(block_size_); + } else { // last block + tmp_storage_->append(buf.data(), buf.size()); + } + } + } + offset_ += data.size(); + return Status::OK(); + } + + std::string* PopFrontBlock(uint64_t* block_idx) { + MutexLock l(&mu_); + std::string* block = block_list_.front(); + block_list_.pop_front(); + *block_idx = block_idx_; + block_idx_++; + return block; + } + + std::string* PopBackBlock(uint64_t* block_idx) { + MutexLock l(&mu_); + std::string* block = block_list_.back(); + block_list_.pop_back(); + *block = offset_ / block_size_; + return block; + } + + void ReleaseBlock(std::string* block) { + delete block; + } + +private: + port::Mutex mu_; + uint64_t offset_; + uint32_t block_size_; + uint64_t block_idx_; + std::string* tmp_storage_; + std::list block_list_; // kBlockSize +}; + +class BlockCacheWritableFile : public WritableFile { +public: + BlockCacheWritableFile(BlockCaheImpl* c, const std::string& fname) + : cache_(c), + bg_cv_(&c->mu_), + bg_block_flush_(0), + write_buffer_(cache_->options_.block_size), + fname_(fname) { // file open + Status s = cache_->dfs_env_->NewWritableFile(fname_, &dfs_file_) + return; + } + + ~BlockCacheWritableFile() { + if (dfs_file_ != NULL) { + dfs_file_->Close(); + delete dfs_file_; + dfs_file_ = NULL + } + + MutexLock lockgard(&cache_->mu_); + uint64_t block_idx; + std::string* block_data = write_buffer_.PopBackBlock(&block_idx); + uint64_t fid = cache_->FileId(fname_); + CacheBlock* block = cache_->GetAndAllocBlock(fid, block_idx); + assert(block->state != kCacheBlockValid); + cache_->mu_.Unlock(); + + // Do io without lock + cache_->LogRecord(block); + block->data_block = Slice(*block_data); + block->data_block_alloc = false; + cache_->FillCache(block); + write_buffer_.ReleaseBlock(block_data); + + cache_->mu_.Lock(); + block->state = kCacheBlockValid; + cache_->ReleaseBlock(block); + + while (bg_block_flush_ > 0) { + bg_cv_.Wait(); + } + return; + } + + Status Append(const Slice& data) { + Status s = dfs_file_->Append(data); + if (!s.ok()) { + return s; + } + write_buffer_.Append(data); + + MutexLock lockgard(&cache_->mu_); + MaybeScheduleBGFlush(); + return Status::OK(); + } + + Status Close() { + MutexLock lockgard(&cache_->mu_); + uint64_t block_idx; + std::string* block_data = write_buffer_.PopBackBlock(&block_idx); + uint64_t fid = cache_->FileId(fname_); + CacheBlock* block = cache_->GetAndAllocBlock(fid, block_idx); + assert(block->state != kCacheBlockValid); + cache_->mu_.Unlock(); + + // Do io without lock + cache_->LogRecord(block); + block->data_block = Slice(*block_data); + block->data_block_alloc = false; + cache_->FillCache(block); + write_buffer_.ReleaseBlock(block_data); + + cache_->mu_.Lock(); + block->state = kCacheBlockValid; + cache_->ReleaseBlock(block); + + while (bg_block_flush_ > 0) { + bg_cv_.Wait(); + } + + Status s = dfs_file_->Close(); + delete dfs_file_; + dfs_file_ = NULL: + return s; + } + + Status Flush() { + return dfs_file_->Flush(); + } + + Status Sync() { + return dfs_file_->Sync(); + } + +private: + void MaybeScheduleBGFlush() { + cache_->mu_.AssertHeld(); + while (bg_block_flush_ < write_buffer_.NumFullBlock() < bg_block_flush_) { + bg_block_flush_++; + cache_->bg_flush_.Schedule(&BlockCacheWritableFile::BGFlushFunc, this, 10); + } + } + + static void BGFlushFunc(void* arg) { + reinterpret_cast(arg)->BGFlush(); + } + void BGFlush() { + MutexLock lockgard(&cache_->mu_); + if (write_buffer_.NumFlushBlock() == 0) { + return; + } + + uint64_t block_idx; + std::string* block_data = write_buffer_.PopFrontBlock(&block_idx); + uint64_t fid = cache_->FileId(fname_); + CacheBlock* block = cache_->GetAndAllocBlock(fid, block_idx); + assert(block->state != kCacheBlockValid); + cache_->mu_.Unlock(); + + // Do io without lock + cache_->LogRecord(block); + block->data_block = Slice(*block_data); + block->data_block_alloc = false; + cache_->FillCache(block); + write_buffer_.ReleaseBlock(block_data); + + cache_->mu_.Lock(); + block->state = kCacheBlockValid; + cache_->ReleaseBlock(block); + + bg_block_flush_--; + MaybeScheduleBGFlush(); + bg_cv_.Signal(); + return; + } + +private: + BlockCacheImpl* cache_; + //port::AtomicPointer shutting_down_; + port::CondVar bg_cv_; // Signalled when background work finishes + WritableFile* dfs_file_; + // protected by cache_.mu_ + int bg_block_flush_; + BlockCacheWriteBuffer write_buffer_; + std::string fname_; +}; + +class BlockCacheRandomAccessFile : public RandomAccessFile { +public: + BlockCacheRandomAccessFile(BlockCacheImpl* c, const std::string& fname) + : cache_(c), + fname_(fname) { + Status s = cache_->dfs_env_->NewRandomAccessFile(fname_, &dfs_file_); + return; + } + + ~BlockCacheRandomAccessFile() { + delete dfs_file_; + return; + } + + Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) { + MutexLock lockgard(&cache_->mu_); + uint64_t fid = cache_->FileId(fname_); + uint64_t begin = offset / cache_->options_.block_size; + uint64_t end = (offset + n) / cache_->options_.block_size; + assert(begin <= end); + std::vector c_miss; + std::vector c_locked; + std::vector c_valid; + + uint32_t first_block_size = cache_->options_.block_size - (offset / % cache_->options_.block_size); + //uint32_t last_block_size = (offset + n) % kBlockSize; + for (uint64_t block_idx = begin; block_idx <= end; ++block_idx) { + CacheBlock* block = cache_->GetAndAllocBlock(fid, block_idx); + assert(block->fid == fid && block->block_idx == block_idx); + if (begin < block->block_idx && block->block_idx < end) { // reuse read buf + block->data_block = Slice(scratch + first_block_size + (block->block_idx - begin - 1) * cache_->options_.block_size, + cache_->options_.block_size); + block->data_block_alloc = false; + } else if (begin == block->block_idx) { // Tera spec: Read Read only conflict occurs here + char* buf = new char[cache_->options_.block_size]; + block->data_block = Slice(buf, cache_->options_.block_size); + block->data_block_alloc = true; + } else if (block->block_idx == end) { // Tera spec: Read Read conflict occurs here + char* buf = new char[cache_->options_.block_size]; + block->data_block = Slice(buf, cache_->options_.block_size); + block->data_block_alloc = true; + } + + if (block->state != kCacheBlockValid && block->handle->refs == 2) { // first one access this block + c_miss.push_back(block); + } else if (block->state == kCacheBlockValid && block->handle->refs == 2) { // frist one access this block + c_valid.push_back(block); + } else { + c_locked.push_back(block); + } + } + cache_->mu_.Unlock(); + + // async read miss data + std::vector block_vec; + for (uint32_t i = 0; i < c_miss.size(); ++i) { + block_vec.push_back(c_miss[i]); + if ((i + 1 < c_miss.size()) && + c_miss[i + 1].block_idx == (block_vec[block_vec.back()].block_idx + 1)) { + continue; + } + + AsyncDfsReader* reader = new AsyncDfsReader; + reader->file = this; + reader->offset = block_vec.front().block_idx * cache_->options_.block_size; + reader->n = (block_vec.back().block_idx - block_vec.front().block_idx + 1) * cache_->options_.block_size; + reader->block_vec = block_vec; + cache_->bg_read_.Schedule(&CacheBlockRandomAccessFile::AsyncDfsRead, reader, 10); + } + + // async read valid data + for (uint32_t i = 0; i < c_valid.size(); ++i) { + CacheBlock* block = c_valid[i]; + AsyncCacheReader* reader = new AsyncCacheReader; + reader->file = this; + reader->block = block; + cache_->bg_read_.Schedule(&CacheBlockRandomAccessFile::AsyncCacheRead, reader, 10); + } + + // wait dfs read done and async cache file + for (uint32_t i = 0; i < c_miss.size(); ++i) { + MutexLock lockgard(&cache_->mu_); + CacheBlock* block = c_miss[i]; + block->cv.Wait(); + } + for (uint32_t i = 0; i < c_miss.size(); ++i) { + CacheBlock* block = c_miss[i]; + AsyncCacheWriter* writer = new AsyncCacheWriter; + writer->file = this; + writer->block = block; + cache_->bg_fill_.Schedule(&CacheBlockRandomAccessFile::AsyncCacheWrite, writer, 10); + } + for (uint32_t i = 0; i < c_miss.size(); ++i) { // wait cache fill finish + MutexLock lockgard(&cache_->mu_); + CacheBlock* block = c_miss[i]; + block->cv.Wait(); + } + + // wait cache read done + for (uint32_t i = 0; i < c_valid.size(); ++i) { + MutexLock lockgard(&cache_->mu_); + CacheBlock* block = c_valid[i]; + block->cv.Wait(); + } + + // wait other async read finish + for (uint32_t i = 0; i < c_locked.size(); ++i) { + MutexLock lockgard(&cache_->mu_); + CacheBlock* block = c_loced[i]; + while (block->state != kCacheBlockValid) { + assert(block->fid == fid && block->block_idx == block_idx); + block->cv.Wait(); + } + assert(block->data_block_alloc); // MUST alloc by block cache itself + assert(block->data_block.data() != NULL); + } + + cache_->mu_.Lock(); + for (uint32_t i = 0; i < c_miss.size(); ++i) { + CacheBlock* block = c_miss[i]; + block->state = kCacheBlockValid; + cache_->ReleaseBlock(block); + } + for (uint32_t i = 0; i < c_valid.size(); ++i) { + CacheBlock* block = c_valid[i]; + block->state = kCacheBlockValid; + cache_->ReleaseBlock(block); + } + for (uint32_t i = 0; i < c_locked.size(); ++i) { + CacheBlock* block = c_locked[i]; + block->state = kCacheBlockValid; + cache_->ReleaseBlock(block); + } + return Status::Ok(); + } + +private: + struct AsyncDfsReader { + BlockCacheRandomAccessFile* file; + std::vector block_vec; + uint64_t offset; + size_t n; + //char* scratch; + //Slice result; + Status s; + }; + static void AsyncDfsRead(void* arg) { + AsyncDfsReader* reader = (AsyncDfsReader*)arg; + reader->file->HandleDfsRead(reader); + delete reader; + return; + } + void HandleDfsRead(AsyncDfsReader* reader) { + std::vector& block_vec = reader->block_vec; + char* scratch = NULL; + if (block_vec.size() == 1) { + scratch = (char*)(block_vec[0])->data_block.data(); + assert((block_vec[0])->data_block.size() == n); + } else { + scratch = new char[reader->n]; + } + Slice result; + reader->s = dfs_file_->Read(offset, n, &result, scratch); + + MutexLock lockgard(&cache_->mu_); + if (block_vec.size() > 1) { + for (uint32_t i = 0; i < block_vec.size(); ++i) { + CacheBlock* block = block_vec[i]; + memcpy(block->data_block, result.data(), cache_->options_.block_size); + result.remove_prefix(cache_->options_.block_size); + block->cv.SignalAll(); + } + delete[] scratch; + } else { + (block_vec[0])->cv.SignalAll(); + } + return; + } + + struct AsyncCacheReader { + BlockCacheRandomAccessFile* file; + CacheBlock* block; + }; + static void AsyncCacheRead(void* arg) { + AsyncCacheReader* reader = (AsyncCacheReader*)arg; + reader->file->HandleCacheRead(reader); + delete reader; + return; + } + void HandleCacheRead(AsyncCacheReader* reader) { + CacheBlock* block = reader->block; + cache_->ReadCache(block); + + MutexLock lockgard(&cache_->mu_); + block->cv.SignalAll(); + return; + } + + struct AsyncCacheWriter { + BlockCacheRandomAccessFile* file; + CacheBlock* block; + }; + static void AsyncCacheWrite(void* arg) { + AsyncCacheWriter* writer = (AsyncCacheWriter*)arg; + writer->file->HandleCacheWrite(writer); + delete writer; + return; + } + void HandleCacheWrite(AsyncCacheWriter* writer) { + CacheBlock* block = writer->block; + block->s = cache_->LogRecord(block); + block->s = cache_->FillCache(block); + + MutexLock lockgard(&cache_->mu_); + block->cv.SignalAll(); + return; + } + +private: + BlockCacheImpl* cache_; + RandomAccessFile* dfs_file_; + std::string fname_; +}; + +} // namespace leveldb + diff --git a/src/leveldb/util/cache.cc b/src/leveldb/util/cache.cc index 6eab478a1..cc81bbb7c 100644 --- a/src/leveldb/util/cache.cc +++ b/src/leveldb/util/cache.cc @@ -37,6 +37,7 @@ struct LRUHandle { size_t key_length; uint32_t refs; uint32_t hash; // Hash of key(); used for fast sharding and comparisons + uint64_t cache_id; // cache id, user spec char key_data[1]; // Beginning of key Slice key() const { @@ -286,6 +287,163 @@ size_t LRUCache::TotalCharge() { return usage_; } +class LRU2QCache: public cache { + public: + LRU2QCache(size_t capacity) + : capacity_(capacity), + usage_(0) { + } + + ~LRU2QCache(); + + // Like Cache methods, but with an extra "hash" parameter. + Handle* Insert(const Slice& key, void* value, size_t charge, + void (*deleter)(const Slice& key, void* value)) { + const uint32_t hash = HashSlice(key); + MutexLock l(&mutex_); + LRUHandle* e = NULL; + e = table_.Lookup(key, hash); + if (e != NULL) { + return reinterpret_cast(NULL); + } + + if (usage_ < capacity_) { // cache full + e = reinterpret_cast( + malloc(sizeof(LRUHandle)-1 + key.size())); + e->value = value; + e->deleter = deleter; + e->charge = 1; + e->key_length = key.size(); + e->hash = hash; + e->refs = 2; // One from LRUCache, one for the returned handle + e->cache_id = usage_; + memcpy(e->key_data, key.data(), key.size()); + + assert(table_.Insert(e) == NULL); + LRU_Append(e); + usage_++; + return reinterpret_cast(e); + } + + // cache full, reuse item + LRUHandle* old = lru_.next; + while (old != &lru_) { + if (old->ref > 1) { + old = old->next; + continue; + } + e = reinterpret_cast( + malloc(sizeof(LRUHandle)-1 + key.size())); + e->value = value; + e->deleter = deleter; + e->charge = 1; + e->key_length = key.size(); + e->hash = hash; + e->refs = 2; // One from LRUCache, one for the returned handle + e->cache_id = old->cache_id; + memcpy(e->key_data, key.data(), key.size()); + + LRU_Remove(old); + table_.Remove(old->key(), old->hash); + Unref(old); + + assert(table_.Insert(e) == NULL); + LRU_Append(e); + return reinterpret_cast(e); + } + // TODO: try wait finish + return reinterpret_cast(NULL); + } + + Handle* Lookup(const Slice& key) { + const uint32_t hash = HashSlice(key); + MutexLock l(&mutex_); + LRUHandle* e = table_.Lookup(key, hash); + if (e != NULL) { + e->refs++; + LRU_Remove(e); + LRU_Append(e); + } + return reinterpret_cast(e); + } + + void Erase(const Slice& key) { + const uint32_t hash = HashSlice(key); + MutexLock l(&mutex_); + LRUHandle* e = table_.Remove(key, hash); + if (e != NULL) { + LRU_Remove(e); + Unref(e); + } + } + + void Release(Handle* handle) { + MutexLock l(&mutex_); + Unref(reinterpret_cast(handle)); + } + + void* Value(Handle* handle) { + return reinterpret_cast(handle)->value; + } + + uint64_t NewId() { + return 0; + } + + double HitRate(bool force_clear = false) { + return 99.9999; + } + + size_t Entries() { + MutexLock l(&mutex_); + return usage_; + } + + size_t TotalCharge() { + MutexLock l(&mutex_); + return usage_; + } + + private: + void LRU_Remove(LRUHandle* e) { + e->next->prev = e->prev; + e->prev->next = e->next; + } + + void LRU_Append(LRUHandle* e) { + // Make "e" newest entry by inserting just before lru_ + e->next = &lru_; + e->prev = lru_.prev; + e->prev->next = e; + e->next->prev = e; + } + + void Unref(LRUHandle* e) { + assert(e->refs > 0); + e->refs--; + if (e->refs <= 0) { + usage_ -= e->charge; + (*e->deleter)(e->key(), e->value); + free(e); + } + } + + // Initialized before use. + size_t capacity_; + + // mutex_ protects the following state. + port::Mutex mutex_; + size_t usage_; + + // Dummy head of LRU list. + // lru.prev is newest entry, lru.next is oldest entry. + //LRUHandle hot_lru_; + //LRUHandle cold_lru_; + LRUHandle lru_; + + HandleTable table_; +}; + static const int kNumShardBits = 4; static const int kNumShards = 1 << kNumShardBits; @@ -378,8 +536,12 @@ class ShardedLRUCache : public Cache { } // end anonymous namespace -Cache* NewLRUCache(size_t capacity) { - return new ShardedLRUCache(capacity); +Cache* NewLRUCache(size_t capacityint, LRUCacheType type) { + if (type == kLRUCacheDefault) { + return new ShardedLRUCache(capacity); + } else { + return new LRU2QCache(capacity); + } } } // namespace leveldb diff --git a/src/leveldb/util/coding_test.cc b/src/leveldb/util/coding_test.cc index fc8fbf5c9..17848377b 100644 --- a/src/leveldb/util/coding_test.cc +++ b/src/leveldb/util/coding_test.cc @@ -219,6 +219,17 @@ TEST(Coding, PutLG_ugly) { ASSERT_EQ(a_slice.ToString(), b_slice.ToString()); } +TEST(Coding, PutFixed64Cmp) { + std::string sa, sb; + PutFixed64(&sa, 100); + PutFixed64(&sb, 50); + ASSERT_TRUE(sa > sb); + uint64_t a = DecodeFixed64(sa.c_str()); + uint64_t b = DecodeFixed64(sb.c_str()); + ASSERT_TRUE(a == 100); + ASSERT_TRUE(b == 50); +} + } // namespace leveldb int main(int argc, char** argv) { diff --git a/src/tabletnode/tabletnode_impl.cc b/src/tabletnode/tabletnode_impl.cc index 0ed32fd14..de560ec52 100644 --- a/src/tabletnode/tabletnode_impl.cc +++ b/src/tabletnode/tabletnode_impl.cc @@ -148,11 +148,7 @@ TabletNodeImpl::TabletNodeImpl() sysinfo_.SetProcessStartTime(get_micros()); } -TabletNodeImpl::~TabletNodeImpl() { - if (FLAGS_tera_tabletnode_cache_enabled) { - leveldb::ThreeLevelCacheEnv::RemoveCachePaths(); - } -} +TabletNodeImpl::~TabletNodeImpl() {} bool TabletNodeImpl::Init() { if (FLAGS_tera_zk_enabled) { @@ -171,32 +167,32 @@ bool TabletNodeImpl::Init() { } void TabletNodeImpl::InitCacheSystem() { - if (!FLAGS_tera_tabletnode_cache_enabled) { - // compitable with legacy FlashEnv - leveldb::FlashEnv* flash_env = (leveldb::FlashEnv*)io::LeveldbFlashEnv(); - flash_env->SetFlashPath(FLAGS_tera_tabletnode_cache_paths, - FLAGS_tera_io_cache_path_vanish_allowed); - flash_env->SetUpdateFlashThreadNumber(FLAGS_tera_tabletnode_cache_update_thread_num); - flash_env->SetIfForceReadFromCache(FLAGS_tera_tabletnode_cache_force_read_from_cache); - return; - } + if (FLAGS_tera_tabletnode_block_cache_enabled) { + LOG(INFO) << "Tcache: set flash path: " << FLAGS_tera_tabletnode_cache_paths; + std::vector path_list; + SplitString(FLAGS_tera_tabletnode_cache_paths, ";", &path_list); + cache_paths_.swap(path_list); + + Env* posix_env = Env::Default(); + for (uint32_t i = 0; i < cache_paths_.size(); ++i) { + posix_env->CreateDir(cache_paths_[i]); + } - LOG(INFO) << "activate new cache system"; - // new cache mechanism - leveldb::ThreeLevelCacheEnv::SetCachePaths(FLAGS_tera_tabletnode_cache_paths); - leveldb::ThreeLevelCacheEnv::s_mem_cache_size_in_KB_ = FLAGS_tera_tabletnode_cache_mem_size; - leveldb::ThreeLevelCacheEnv::s_disk_cache_size_in_MB_ = FLAGS_tera_tabletnode_cache_disk_size; - leveldb::ThreeLevelCacheEnv::s_block_size_ = FLAGS_tera_tabletnode_cache_block_size; - leveldb::ThreeLevelCacheEnv::s_disk_cache_file_num_ = FLAGS_tera_tabletnode_cache_disk_filenum; - leveldb::ThreeLevelCacheEnv::s_disk_cache_file_name_ = FLAGS_tera_tabletnode_cache_name; - - if (FLAGS_tera_tabletnode_cache_log_level < 3) { - LEVELDB_SET_LOG_LEVEL(WARNING); - } else if (FLAGS_tera_tabletnode_cache_log_level < 4) { - LEVELDB_SET_LOG_LEVEL(INFO); - } else { - LEVELDB_SET_LOG_LEVEL(DEBUG); + LOG(INFO) << "activate Tcache system"; + Env* block_cache_env = io::DefaultBlockCacheEnv(); + for (uint32_t i = 0; i < cache_paths_.size(); ++i) { + leveldb::BlockCacheOptions opts; + block_cache_env->LoadCache(opts, cache_paths_[i]); + } + return; } + // compitable with legacy FlashEnv + leveldb::FlashEnv* flash_env = (leveldb::FlashEnv*)io::LeveldbFlashEnv(); + flash_env->SetFlashPath(FLAGS_tera_tabletnode_cache_paths, + FLAGS_tera_io_cache_path_vanish_allowed); + flash_env->SetUpdateFlashThreadNumber(FLAGS_tera_tabletnode_cache_update_thread_num); + flash_env->SetIfForceReadFromCache(FLAGS_tera_tabletnode_cache_force_read_from_cache); + return; } bool TabletNodeImpl::Exit() { diff --git a/src/tera_flags.cc b/src/tera_flags.cc index d5c51031a..b8aa53a08 100644 --- a/src/tera_flags.cc +++ b/src/tera_flags.cc @@ -63,6 +63,7 @@ DEFINE_int32(tera_leveldb_env_dfs_seek_latency, 10000000, "the random access lat DEFINE_int32(tera_memenv_table_cache_size, 100, "the max open file number in leveldb table_cache"); DEFINE_int32(tera_memenv_block_cache_size, 10000, "block cache size for leveldb which do not use share block cache"); DEFINE_bool(tera_use_flash_for_memenv, true, "Use flashenv for memery lg"); +DEFINE_int32(tera_leveldb_block_cache_env_num_thread, 30, "thread num of Tcache"); DEFINE_string(tera_leveldb_compact_strategy, "default", "the default strategy to drive consum compaction, should be [default|LG|dummy]"); DEFINE_bool(tera_leveldb_verify_checksums, true, "enable verify data read from storage against checksums"); @@ -200,7 +201,7 @@ DEFINE_string(tera_tabletnode_cpu_affinity_set, "1,2", "the cpu set of cpu affin DEFINE_bool(tera_tabletnode_hang_detect_enabled, false, "enable detect read/write hang"); DEFINE_int32(tera_tabletnode_hang_detect_threshold, 60000, "read/write hang detect threshold (in ms)"); -DEFINE_bool(tera_tabletnode_cache_enabled, false, "enable three-level cache mechasism"); +DEFINE_bool(tera_tabletnode_block_cache_enabled, true, "enable Tcache mechasism"); DEFINE_string(tera_tabletnode_cache_paths, "../data/cache/", "paths for cached data storage. Mutiple definition like: \"./path1/;./path2/\""); DEFINE_int32(tera_tabletnode_cache_block_size, 8192, "the block size of cache system"); DEFINE_string(tera_tabletnode_cache_name, "tera.cache", "prefix name for cache name");