Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 75 additions & 19 deletions src/concurrent_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
#define ZIM_CONCURRENT_CACHE_H

#include "lrucache.h"
#include "log.h"

#include <chrono>
#include <cstddef>
#include <future>
#include <mutex>
Expand All @@ -39,65 +41,119 @@ namespace zim
safe, and, in case of a cache miss, will block until that element becomes
available.
*/
template <typename Key, typename Value>
template <typename Key, typename Value, typename CostEstimation>
class ConcurrentCache
{
private: // types
typedef std::shared_future<Value> ValuePlaceholder;
typedef lru_cache<Key, ValuePlaceholder> Impl;

public: // types
explicit ConcurrentCache(size_t maxEntries)
: impl_(maxEntries)
struct CacheEntry
{
size_t cost = 0;
ValuePlaceholder value;

bool ready() const {
const auto zeroNs = std::chrono::nanoseconds::zero();
return value.wait_for(zeroNs) == std::future_status::ready;
}
};

struct GetCacheEntryCost
{
static size_t cost(const CacheEntry& x) { return x.cost; }
};

typedef lru_cache<Key, CacheEntry, GetCacheEntryCost> Impl;

public: // functions
explicit ConcurrentCache(size_t maxCost)
: impl_(maxCost)
{}

// Gets the entry corresponding to the given key. If the entry is not in the
// cache, it is obtained by calling f() (without any arguments) and the
// result is put into the cache.
//
// The cache as a whole is locked only for the duration of accessing
// the respective slot. If, in the case of the a cache miss, the generation
// the respective slot. If, in the case of a cache miss, the generation
// of the missing element takes a long time, only attempts to access that
// element will block - the rest of the cache remains open to concurrent
// access.
template<class F>
Value getOrPut(const Key& key, F f)
{
log_debug_func_call("ConcurrentCache::getOrPut", key);

std::promise<Value> valuePromise;
std::unique_lock<std::mutex> l(lock_);
const auto x = impl_.getOrPut(key, valuePromise.get_future().share());
l.unlock();
const auto x = getCacheSlot(key, valuePromise.get_future().share());
CacheEntry cacheEntry(x.value());
log_debug("Obtained the cache slot");
if ( x.miss() ) {
log_debug("It was a cache miss. Going to obtain the value...");
try {
valuePromise.set_value(f());
cacheEntry.cost = materializeValue(valuePromise, f);
finalizeCacheMiss(key, cacheEntry);
log_debug("Done. Cache cost is at " << getCurrentCost() );
} catch (std::exception& e) {
log_debug("Evaluation failed. Releasing the cache slot...");
drop(key);
throw;
}
}

return x.value().get();
log_debug((!cacheEntry.ready() ? "Waiting for result..." : "Returning immediately..."));
return log_debug_return_value(cacheEntry.value.get());
}

bool drop(const Key& key)
{
std::unique_lock<std::mutex> l(lock_);
log_debug_func_call("ConcurrentCache::drop", key);
log_debug_raii_sync_statement(std::unique_lock<std::mutex> l(lock_));
return impl_.drop(key);
}

size_t getMaxSize() const {
size_t getMaxCost() const {
std::unique_lock<std::mutex> l(lock_);
return impl_.getMaxSize();
return impl_.getMaxCost();
}

size_t getCurrentSize() const {
size_t getCurrentCost() const {
std::unique_lock<std::mutex> l(lock_);
return impl_.size();
return impl_.cost();
}

void setMaxSize(size_t newSize) {
std::unique_lock<std::mutex> l(lock_);
return impl_.setMaxSize(newSize);
void setMaxCost(size_t newSize) {
log_debug_func_call("ConcurrentCache::setMaxCost", newSize);
log_debug_raii_sync_statement(std::unique_lock<std::mutex> l(lock_));
return impl_.setMaxCost(newSize);
}

private: // functions
typename Impl::AccessResult getCacheSlot(const Key& key, const ValuePlaceholder& v)
{
log_debug_func_call("ConcurrentCache::getCacheSlot", key);
log_debug_raii_sync_statement(std::unique_lock<std::mutex> l(lock_));
return impl_.getOrPut(key, CacheEntry{0, v});
}

template<class F>
static size_t materializeValue(std::promise<Value>& valuePromise, F f)
{
const auto materializedValue = f();
log_debug("Value was successfully obtained.");
valuePromise.set_value(materializedValue);
log_debug("Made the value available for concurrent access.");
log_debug("Computing the cost of the new entry...");
auto cost = CostEstimation::cost(materializedValue);
log_debug("cost=" << cost);
return cost;
}

void finalizeCacheMiss(const Key& key, const CacheEntry& cacheEntry)
{
log_debug_func_call("ConcurrentCache::finalizeCacheMiss", key);
log_debug_raii_sync_statement(std::unique_lock<std::mutex> l(lock_));
impl_.put(key, cacheEntry);
}

private: // data
Expand Down
8 changes: 4 additions & 4 deletions src/dirent_accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ class LIBZIM_PRIVATE_API DirectDirentAccessor
std::shared_ptr<const Dirent> getDirent(entry_index_t idx) const;
entry_index_t getDirentCount() const { return m_direntCount; }

size_t getMaxCacheSize() const { return m_direntCache.getMaxSize(); }
size_t getCurrentCacheSize() const { return m_direntCache.size(); }
void setMaxCacheSize(size_t nbDirents) const { m_direntCache.setMaxSize(nbDirents); }
size_t getMaxCacheSize() const { return m_direntCache.getMaxCost(); }
size_t getCurrentCacheSize() const { return m_direntCache.cost(); }
void setMaxCacheSize(size_t nbDirents) const { m_direntCache.setMaxCost(nbDirents); }

private: // functions
std::shared_ptr<const Dirent> readDirent(offset_t) const;
Expand All @@ -67,7 +67,7 @@ class LIBZIM_PRIVATE_API DirectDirentAccessor
std::unique_ptr<const Reader> mp_pathPtrReader;
entry_index_t m_direntCount;

mutable lru_cache<entry_index_type, std::shared_ptr<const Dirent>> m_direntCache;
mutable lru_cache<entry_index_type, std::shared_ptr<const Dirent>, UnitCostEstimation> m_direntCache;
mutable std::mutex m_direntCacheLock;

mutable std::vector<char> m_bufferDirentZone;
Expand Down
6 changes: 3 additions & 3 deletions src/fileimpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -805,13 +805,13 @@ bool checkTitleListing(const IndirectDirentAccessor& accessor, entry_index_type


size_t FileImpl::getClusterCacheMaxSize() const {
return clusterCache.getMaxSize();
return clusterCache.getMaxCost();
}
size_t FileImpl::getClusterCacheCurrentSize() const {
return clusterCache.getCurrentSize();
return clusterCache.getCurrentCost();
}
void FileImpl::setClusterCacheMaxSize(size_t nbClusters) {
clusterCache.setMaxSize(nbClusters);
clusterCache.setMaxCost(nbClusters);
}

size_t FileImpl::getDirentCacheMaxSize() const {
Expand Down
3 changes: 2 additions & 1 deletion src/fileimpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "file_compound.h"
#include "fileheader.h"
#include "zim/archive.h"
#include "lrucache.h"
#include "zim_types.h"
#include "direntreader.h"

Expand All @@ -59,7 +60,7 @@ namespace zim
std::unique_ptr<const IndirectDirentAccessor> mp_titleDirentAccessor;

typedef std::shared_ptr<const Cluster> ClusterHandle;
mutable ConcurrentCache<cluster_index_type, ClusterHandle> clusterCache;
mutable ConcurrentCache<cluster_index_type, ClusterHandle, UnitCostEstimation> clusterCache;

const bool m_hasFrontArticlesIndex;
const entry_index_t m_startUserEntry;
Expand Down
5 changes: 5 additions & 0 deletions src/log.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*
*/

#ifndef ZIM_LOG_H
#define ZIM_LOG_H

#include "config.h"

// Should we keep the dependence on cxxtools logging framework?
Expand Down Expand Up @@ -206,3 +209,5 @@ namespace LoggingImpl
#define log_init()

#endif

#endif // ZIM_LOG_H
Loading