From d18871b14b5b02ae7cd69cc8be803e3b84d593d3 Mon Sep 17 00:00:00 2001 From: Sekwon Lee Date: Sun, 6 Sep 2020 20:53:23 +0000 Subject: [PATCH 1/9] remove the document of large workloads --- large_workload.md | 5 ----- 1 file changed, 5 deletions(-) delete mode 100644 large_workload.md diff --git a/large_workload.md b/large_workload.md deleted file mode 100644 index 3fb44dc1..00000000 --- a/large_workload.md +++ /dev/null @@ -1,5 +0,0 @@ -## Experiments with large workload - -We have conducted the additional experiments with large workloads -with 1 billion keys to make more reliable performance results on Intel -Optane DC persistent memory. Those results will be posted here soon. From 38f340b02d73bb1964d2185fda491ce181816bb4 Mon Sep 17 00:00:00 2001 From: Sekwon Lee Date: Thu, 31 Dec 2020 11:34:35 +0900 Subject: [PATCH 2/9] Remove unnecessary header file --- P-CLHT/include/clht.h | 379 ------------------------------------------ 1 file changed, 379 deletions(-) delete mode 100644 P-CLHT/include/clht.h diff --git a/P-CLHT/include/clht.h b/P-CLHT/include/clht.h deleted file mode 100644 index b937d9e1..00000000 --- a/P-CLHT/include/clht.h +++ /dev/null @@ -1,379 +0,0 @@ -/* - * File: clht.h - * Author: Vasileios Trigonakis - * Description: CLHT common interface - * clht.h is part of ASCYLIB - * - * The MIT License (MIT) - * - * Copyright (c) 2014 Vasileios Trigonakis - * Distributed Programming Lab (LPD), EPFL - * - * Permission is hereby granted, free of charge, to any person obtaining a copy of - * this software and associated documentation files (the "Software"), to deal in - * the Software without restriction, including without limitation the rights to - * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of - * the Software, and to permit persons to whom the Software is furnished to do so, - * subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS - * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR - * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER - * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN - * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - * - */ - -#ifndef _CLHT_H_ -#define _CLHT_H_ - -#include -#include -#include - -#ifdef __cplusplus -extern "C" { -#endif - -#define true 1 -#define false 0 - -/* #define DEBUG */ - -#if defined(DEBUG) -# define DPP(x) x++ -#else -# define DPP(x) -#endif - -#define CACHE_LINE_SIZE 64 - -#define MAP_INVLD 0 -#define MAP_VALID 1 -#define MAP_INSRT 2 - -#define KEY_BUCKT 3 -#define ENTRIES_PER_BUCKET KEY_BUCKT - -#define CLHT_DO_GC 1 -#define CLHT_PERC_FULL_HALVE 2 -#define CLHT_PERC_FULL_DOUBLE 15 -#define CLHT_OCCUP_AFTER_RES 40 -#define CLHT_INC_EMERGENCY 2 -#define CLHT_NO_EMPTY_SLOT_TRIES 16 -#define CLHT_GC_HT_VERSION_USED(ht) clht_gc_thread_version(ht) -#define LOAD_FACTOR 2 - -#ifndef ALIGNED -# if __GNUC__ && !SCC -# define ALIGNED(N) __attribute__ ((aligned (N))) -# else -# define ALIGNED(N) -# endif -#endif - -#define likely(x) __builtin_expect((x), 1) -#define unlikely(x) __builtin_expect((x), 0) - -#if defined(__sparc__) -# define PREFETCHW(x) -# define PREFETCH(x) -# define PREFETCHNTA(x) -# define PREFETCHT0(x) -# define PREFETCHT1(x) -# define PREFETCHT2(x) - -# define PAUSE asm volatile("rd %%ccr, %%g0\n\t" \ - ::: "memory") -# define _mm_pause() PAUSE -# define _mm_mfence() __asm__ __volatile__("membar #LoadLoad | #LoadStore | #StoreLoad | #StoreStore"); -# define _mm_lfence() __asm__ __volatile__("membar #LoadLoad | #LoadStore"); -# define _mm_sfence() __asm__ __volatile__("membar #StoreLoad | #StoreStore"); - - -#elif defined(__tile__) -# define _mm_lfence() arch_atomic_read_barrier() -# define _mm_sfence() arch_atomic_write_barrier() -# define _mm_mfence() arch_atomic_full_barrier() -# define _mm_pause() cycle_relax() -#endif - -#define CAS_U64_BOOL(a, b, c) (CAS_U64(a, b, c) == b) -static inline int -is_power_of_two (unsigned int x) -{ - return ((x != 0) && !(x & (x - 1))); -} - - -typedef uintptr_t clht_addr_t; -typedef volatile uintptr_t clht_val_t; -typedef uint64_t clht_snapshot_all_t; - -typedef union -{ - volatile uint64_t snapshot; - struct - { -#if KEY_BUCKT == 4 - uint32_t version; -#elif KEY_BUCKT == 6 - uint16_t version; -#else - uint32_t version; -#endif - uint8_t map[KEY_BUCKT]; - }; -} clht_snapshot_t; - -//#if __GNUC__ > 4 && __GNUC_MINOR__ > 4 -//_Static_assert (sizeof(clht_snapshot_t) == 8, "sizeof(clht_snapshot_t) == 8"); -//#endif - -typedef volatile struct ALIGNED(CACHE_LINE_SIZE) bucket_s -{ - union - { - volatile uint64_t snapshot; - struct - { -#if KEY_BUCKT == 4 - uint32_t version; -#elif KEY_BUCKT == 6 - uint16_t version; -#else - uint32_t version; -/* # error "KEY_BUCKT should be either 4 or 6" */ -#endif - uint8_t map[KEY_BUCKT]; - }; - }; - clht_addr_t key[KEY_BUCKT]; - clht_val_t val[KEY_BUCKT]; - volatile struct bucket_s* padding; -} bucket_t; - -//#if __GNUC__ > 4 && __GNUC_MINOR__ > 4 -//_Static_assert (sizeof(bucket_t) % 64 == 0, "sizeof(bucket_t) == 64"); -//#endif - -#if defined(__tile__) -typedef volatile uint32_t clht_lock_t; -#else -typedef volatile uint8_t clht_lock_t; -#endif -/* typedef volatile uint64_t clht_lock_t; */ -#define CLHT_LOCK_FREE 0 -#define CLHT_LOCK_ACQR 1 - -#define CLHT_CHECK_RESIZE(w) \ - while (unlikely(w->resize_lock == CLHT_LOCK_ACQR)) \ - { \ - _mm_pause(); \ - CLHT_GC_HT_VERSION_USED(w->ht); \ - } - -#define CLHT_LOCK_RESIZE(w) \ - (CAS_U8(&w->resize_lock, CLHT_LOCK_FREE, CLHT_LOCK_ACQR) == CLHT_LOCK_FREE) - -#define CLHT_RLS_RESIZE(w) \ - w->resize_lock = CLHT_LOCK_FREE - -#define TRYLOCK_ACQ(lock) \ - TAS_U8(lock) - -#define TRYLOCK_RLS(lock) \ - lock = CLHT_LOCK_FREE - - -typedef struct ALIGNED(CACHE_LINE_SIZE) clht -{ - union - { - struct - { - struct clht_hashtable_s* ht; - uint8_t next_cache_line[CACHE_LINE_SIZE - (sizeof(void*))]; - struct clht_hashtable_s* ht_oldest; - struct ht_ts* version_list; - size_t version_min; - volatile clht_lock_t resize_lock; - volatile clht_lock_t gc_lock; - volatile clht_lock_t status_lock; - }; - uint8_t padding[2 * CACHE_LINE_SIZE]; - }; -} clht_t; - -typedef struct ALIGNED(CACHE_LINE_SIZE) clht_hashtable_s -{ - union - { - struct - { - size_t num_buckets; - bucket_t* table; - size_t hash; - size_t version; - uint8_t next_cache_line[CACHE_LINE_SIZE - (3 * sizeof(size_t)) - (sizeof(void*))]; - struct clht_hashtable_s* table_tmp; - struct clht_hashtable_s* table_prev; - struct clht_hashtable_s* table_new; - volatile uint32_t num_expands; - union - { - volatile uint32_t num_expands_threshold; - uint32_t num_buckets_prev; - }; - volatile int32_t is_helper; - volatile int32_t helper_done; - size_t version_min; - }; - uint8_t padding[2*CACHE_LINE_SIZE]; - }; -} clht_hashtable_t; - -extern uint64_t __ac_Jenkins_hash_64(uint64_t key); - -/* Hash a key for a particular hashtable. */ -uint64_t clht_hash(clht_hashtable_t* hashtable, clht_addr_t key); - - -static inline int -snap_get_empty_index(uint64_t snap) -{ - clht_snapshot_t s = { .snapshot = snap }; - int i; - for (i = 0; i < KEY_BUCKT; i++) - { - if (s.map[i] == MAP_INVLD) - { - return i; - } - } - return -1; -} - -static inline int -keys_get_empty_index(clht_addr_t* keys) -{ - int i; - for (i = 0; i < KEY_BUCKT; i++) - { - if (keys[i] == 0) - { - return i; - } - } - return -1; -} - -static inline int -buck_get_empty_index(bucket_t* b, uint64_t snap) -{ - clht_snapshot_t s = { .snapshot = snap }; - - int i; - for (i = 0; i < KEY_BUCKT; i++) - { - if (b->key[i] == 0 && s.map[i] != MAP_INSRT) - { - return i; - } - } - return -1; -} - - -static inline int -vals_get_empty_index(clht_val_t* vals, clht_snapshot_all_t snap) -{ - clht_snapshot_t s = { .snapshot = snap }; - - int i; - for (i = 0; i < KEY_BUCKT; i++) - { - if (vals[i] == 0 && s.map[i] != MAP_INSRT) - { - return i; - } - } - return -1; -} - - -static inline uint64_t -snap_set_map(uint64_t s, int index, int val) -{ - clht_snapshot_t s1 = { .snapshot = s }; - s1.map[index] = val; - return s1.snapshot; -} - -static inline uint64_t -snap_set_map_and_inc_version(uint64_t s, int index, int val) -{ - clht_snapshot_t s1 = { .snapshot = s}; - s1.map[index] = val; - s1.version++; - return s1.snapshot; -} - -static inline void -_mm_pause_rep(uint64_t w) -{ - while (w--) - { - _mm_pause(); - } -} - - - -/* ******************************************************************************** */ -/* inteface */ -/* ******************************************************************************** */ - -/* Create a new hashtable. */ -clht_t* clht_create(uint64_t num_buckets); -/* initializes the necessary per-thread structures for the hash table */ -void clht_gc_thread_init(clht_t* hashtable, int id); - - -/* Insert a key-value pair into a hashtable. */ -int clht_put(clht_t* hashtable, clht_addr_t key, clht_val_t val); -/* Retrieve a key-value pair from a hashtable. */ -clht_val_t clht_get(clht_hashtable_t* hashtable, clht_addr_t key); -/* Remove a key-value pair from a hashtable. */ -clht_val_t clht_remove(clht_t* hashtable, clht_addr_t key); - -/* returns the size of the hash table */ -size_t clht_size(clht_hashtable_t* hashtable); - -/* frees the memory used by the hashtable */ -void clht_gc_destroy(clht_t* hashtable); - -/* prints the contents of the hash table */ -void clht_print(clht_hashtable_t* hashtable); - -/* string description of the type of the hash table - For example, CLHT-LB-RESIZE */ -const char* clht_type_desc(); - -void clht_lock_initialization(clht_t *h); - -/* internal */ -extern void clht_gc_thread_version(clht_hashtable_t* h); -extern void clht_gc_thread_version_max(); -extern int clht_gc_get_id(); - -#ifdef __cplusplus -} -#endif - -#endif /* _CLHT_H_ */ - From 2d064763b2eeb29613d57ec38dab30bcc7f5ed70 Mon Sep 17 00:00:00 2001 From: Sekwon Lee Date: Thu, 31 Dec 2020 11:35:47 +0900 Subject: [PATCH 3/9] Update P-CLHT to use pmdk allocator (need to test more) --- P-CLHT/CMakeLists.txt | 2 +- P-CLHT/example.cpp | 4 +- P-CLHT/external/ssmem/src/ssmem.c | 5 +- P-CLHT/include/clht_lb_res.h | 59 ++-- P-CLHT/src/clht_gc.c | 235 ++++++++-------- P-CLHT/src/clht_lb_res.c | 432 +++++++++++++++--------------- 6 files changed, 385 insertions(+), 352 deletions(-) diff --git a/P-CLHT/CMakeLists.txt b/P-CLHT/CMakeLists.txt index 1697d1cd..8d76ac44 100644 --- a/P-CLHT/CMakeLists.txt +++ b/P-CLHT/CMakeLists.txt @@ -43,4 +43,4 @@ set(P_CLHT_TEST example.cpp src/clht_lb_res.c src/clht_gc.c add_executable(example ${P_CLHT_TEST}) target_link_libraries(example ${TbbLib} ${JemallocLib} boost_system - boost_thread pthread) + boost_thread pthread pmemobj pmem) diff --git a/P-CLHT/example.cpp b/P-CLHT/example.cpp index e76b5bb6..90089c0b 100644 --- a/P-CLHT/example.cpp +++ b/P-CLHT/example.cpp @@ -8,7 +8,7 @@ using namespace std; -#include "clht.h" +#include "clht_lb_res.h" #include "ssmem.h" typedef struct thread_data { @@ -117,7 +117,7 @@ void run(char **argv) { barrier_cross(&barrier); for (uint64_t i = start_key; i < end_key; i++) { - uintptr_t val = clht_get(tds[thread_id].ht->ht, keys[i]); + uintptr_t val = clht_get(tds[thread_id].ht, keys[i]); if (val != keys[i]) { std::cout << "[CLHT] wrong key read: " << val << "expected: " << keys[i] << std::endl; exit(1); diff --git a/P-CLHT/external/ssmem/src/ssmem.c b/P-CLHT/external/ssmem/src/ssmem.c index e0555f06..7378df4f 100644 --- a/P-CLHT/external/ssmem/src/ssmem.c +++ b/P-CLHT/external/ssmem/src/ssmem.c @@ -36,6 +36,8 @@ #include #include +#include + ssmem_ts_t* ssmem_ts_list = NULL; volatile uint32_t ssmem_ts_list_len = 0; __thread volatile ssmem_ts_t* ssmem_ts_local = NULL; @@ -542,7 +544,8 @@ ssmem_mem_reclaim(ssmem_allocator_t* a) do { rel_cur = rel_nxt; - free(rel_cur->mem); + PMEMoid cur_mem_oid = pmemobj_oid((void *)rel_cur->mem); + pmemobj_free(&cur_mem_oid); free(rel_cur); rel_nxt = rel_nxt->next; } diff --git a/P-CLHT/include/clht_lb_res.h b/P-CLHT/include/clht_lb_res.h index 94a1108d..13df6999 100644 --- a/P-CLHT/include/clht_lb_res.h +++ b/P-CLHT/include/clht_lb_res.h @@ -37,6 +37,9 @@ #include "atomic_ops.h" #include "utils.h" +#include +#include + #ifdef __cplusplus extern "C" { #endif @@ -61,7 +64,7 @@ extern __thread ssmem_allocator_t* clht_alloc; #define CLHT_RATIO_HALVE 8 #define CLHT_MIN_CLHT_SIZE 8 #define CLHT_DO_CHECK_STATUS 0 -#define CLHT_DO_GC 0 +#define CLHT_DO_GC 1 #define CLHT_STATUS_INVOK 500000 #define CLHT_STATUS_INVOK_IN 500000 #define LOAD_FACTOR 2 @@ -160,7 +163,7 @@ typedef struct ALIGNED(CACHE_LINE_SIZE) bucket_s volatile uint32_t hops; clht_addr_t key[ENTRIES_PER_BUCKET]; clht_val_t val[ENTRIES_PER_BUCKET]; - volatile struct bucket_s* next; + uint64_t next_off; } bucket_t; //#if __GNUC__ > 4 && __GNUC_MINOR__ > 4 @@ -173,7 +176,7 @@ typedef struct ALIGNED(CACHE_LINE_SIZE) clht { struct { - struct clht_hashtable_s* ht; + uint64_t ht_off; uint8_t next_cache_line[CACHE_LINE_SIZE - (sizeof(void*))]; struct clht_hashtable_s* ht_oldest; struct ht_ts* version_list; @@ -193,7 +196,7 @@ typedef struct ALIGNED(CACHE_LINE_SIZE) clht_hashtable_s struct { size_t num_buckets; - bucket_t* table; + uint64_t table_off; size_t hash; size_t version; uint8_t next_cache_line[CACHE_LINE_SIZE - (3 * sizeof(size_t)) - (sizeof(void*))]; @@ -407,40 +410,64 @@ clht_hashtable_t* clht_hashtable_create(uint64_t num_buckets); clht_t* clht_create(uint64_t num_buckets); /* Insert a key-value pair into a hashtable. */ -int clht_put(clht_t* hashtable, clht_addr_t key, clht_val_t val); +int clht_put(clht_t* h, clht_addr_t key, clht_val_t val); + +/* Update a value associated with a key */ +int clht_update(clht_t* h, clht_addr_t key, clht_val_t val); /* Retrieve a key-value pair from a hashtable. */ -clht_val_t clht_get(clht_hashtable_t* hashtable, clht_addr_t key); +clht_val_t clht_get(clht_t* h, clht_addr_t key); /* Remove a key-value pair from a hashtable. */ -clht_val_t clht_remove(clht_t* hashtable, clht_addr_t key); +clht_val_t clht_remove(clht_t* h, clht_addr_t key); size_t clht_size(clht_hashtable_t* hashtable); -size_t clht_size_mem(clht_hashtable_t* hashtable); -size_t clht_size_mem_garbage(clht_hashtable_t* hashtable); +size_t clht_size_mem(clht_hashtable_t* h); +size_t clht_size_mem_garbage(clht_hashtable_t* h); -void clht_gc_thread_init(clht_t* hashtable, int id); +void clht_gc_thread_init(clht_t* h, int id); extern void clht_gc_thread_version(clht_hashtable_t* h); extern int clht_gc_get_id(); -int clht_gc_collect(clht_t* h); -int clht_gc_release(clht_hashtable_t* h); -int clht_gc_collect_all(clht_t* h); +int clht_gc_collect(clht_t* hashtable); +int clht_gc_release(clht_hashtable_t* hashtable); +int clht_gc_collect_all(clht_t* hashtable); int clht_gc_free(clht_hashtable_t* hashtable); void clht_gc_destroy(clht_t* hashtable); void clht_print(clht_hashtable_t* hashtable); #if defined(CLHT_LB_LINKED) /* emergency_increase, grabs the lock and forces an increase by *emergency_increase times */ -size_t ht_status(clht_t* hashtable, int resize_increase, int emergency_increase, int just_print); +size_t ht_status(clht_t* h, int resize_increase, int emergency_increase, int just_print); #else -size_t ht_status(clht_t* hashtable, int resize_increase, int just_print); +size_t ht_status(clht_t* h, int resize_increase, int just_print); #endif bucket_t* clht_bucket_create(); -int ht_resize_pes(clht_t* hashtable, int is_increase, int by); +int ht_resize_pes(clht_t* h, int is_increase, int by); const char* clht_type_desc(); void clht_lock_initialization(clht_t *h); + +extern void *clht_ptr_from_off(uint64_t offset, bool alignment); + +// Initialize the persistent memory pool +POBJ_LAYOUT_BEGIN(clht); +POBJ_LAYOUT_ROOT(clht, clht_t); +POBJ_LAYOUT_TOID(clht, clht_hashtable_t); +POBJ_LAYOUT_TOID(clht, bucket_t); +POBJ_LAYOUT_END(clht); + +// Global pool uuid +uint64_t pool_uuid; + +// Global pool pointer +PMEMobjpool *pop; + +// pmemobj header size (presume using default compact header) +#define POBJ_HEADER_SIZE 16 + +#define ALIGNMENT_PADDING (CACHE_LINE_SIZE - POBJ_HEADER_SIZE) + #ifdef __cplusplus } #endif diff --git a/P-CLHT/src/clht_gc.c b/P-CLHT/src/clht_gc.c index 797dbfe8..2538b708 100644 --- a/P-CLHT/src/clht_gc.c +++ b/P-CLHT/src/clht_gc.c @@ -37,35 +37,36 @@ static __thread ht_ts_t* clht_ts_thread = NULL; /* * initialize thread metadata for GC */ -void + void clht_gc_thread_init(clht_t* h, int id) { - clht_alloc = (ssmem_allocator_t*) malloc(sizeof(ssmem_allocator_t)); - assert(clht_alloc != NULL); - ssmem_alloc_init_fs_size(clht_alloc, SSMEM_DEFAULT_MEM_SIZE, SSMEM_GC_FREE_SET_SIZE, id); + clht_alloc = (ssmem_allocator_t*) malloc(sizeof(ssmem_allocator_t)); + assert(clht_alloc != NULL); + ssmem_alloc_init_fs_size(clht_alloc, SSMEM_DEFAULT_MEM_SIZE, SSMEM_GC_FREE_SET_SIZE, id); - ht_ts_t* ts = (ht_ts_t*) memalign(CACHE_LINE_SIZE, sizeof(ht_ts_t)); - assert(ts != NULL); + ht_ts_t* ts = (ht_ts_t*) memalign(CACHE_LINE_SIZE, sizeof(ht_ts_t)); + assert(ts != NULL); - ts->version = h->ht->version; - ts->id = id; + clht_hashtable_t *ht_ptr = clht_ptr_from_off(h->ht_off, false); + ts->version = ht_ptr->version; + ts->id = id; - do + do { - ts->next = h->version_list; + ts->next = h->version_list; } - while (CAS_U64((volatile size_t*) &h->version_list, (size_t) ts->next, (size_t) ts) != (size_t) ts->next); + while (CAS_U64((volatile size_t*) &h->version_list, (size_t) ts->next, (size_t) ts) != (size_t) ts->next); - clht_ts_thread = ts; + clht_ts_thread = ts; } /* * set the ht version currently used by the current thread */ -inline void + inline void clht_gc_thread_version(clht_hashtable_t* h) { - clht_ts_thread->version = h->version; + clht_ts_thread->version = h->version; } /* @@ -73,20 +74,20 @@ clht_gc_thread_version(clht_hashtable_t* h) * to maximum to indicate that there is no ongoing update * operation. */ -void + void clht_gc_thread_version_max() { - clht_ts_thread->version = -1; + clht_ts_thread->version = -1; } /* * get the GC id of the current thread */ -inline int + inline int clht_gc_get_id() { - return clht_ts_thread->id; + return clht_ts_thread->id; } static int clht_gc_collect_cond(clht_t* hashtable, int collect_not_referenced_only); @@ -95,14 +96,14 @@ static int clht_gc_collect_cond(clht_t* hashtable, int collect_not_referenced_on * perform a GC of the versions of the ht that are not currently used by any * of the participating threads */ -inline int + inline int clht_gc_collect(clht_t* hashtable) { #if CLHT_DO_GC == 1 - CLHT_GC_HT_VERSION_USED(hashtable->ht); - return clht_gc_collect_cond(hashtable, 1); + CLHT_GC_HT_VERSION_USED(clht_ptr_from_off(hashtable->ht_off, false)); + return clht_gc_collect_cond(hashtable, 1); #else - return 0; + return 0; #endif } @@ -110,10 +111,10 @@ clht_gc_collect(clht_t* hashtable) * perform a GC of ALL old versions of the ht, regardless if they are * referenced by any of the threads */ -int + int clht_gc_collect_all(clht_t* hashtable) { - return clht_gc_collect_cond(hashtable, 0); + return clht_gc_collect_cond(hashtable, 0); } #define GET_ID(x) x ? clht_gc_get_id() : 99 @@ -123,22 +124,23 @@ clht_gc_collect_all(clht_t* hashtable) * version that is currently used. In other words, all versions, less * than the returned value, can be GCed */ -size_t + size_t clht_gc_min_version_used(clht_t* h) { - volatile ht_ts_t* cur = h->version_list; + volatile ht_ts_t* cur = h->version_list; - size_t min = h->ht->version; - while (cur != NULL) + clht_hashtable_t *ht_ptr = clht_ptr_from_off(h->ht_off, false); + size_t min = ht_ptr->version; + while (cur != NULL) { - if (cur->version < min) - { - min = cur->version; - } - cur = cur->next; + if (cur->version < min) + { + min = cur->version; + } + cur = cur->next; } - return min; + return min; } /* @@ -146,112 +148,117 @@ clht_gc_min_version_used(clht_t* h) * collect_not_referenced_only == 0 -> clht_gc_collect_all(); * collect_not_referenced_only != 0 -> clht_gc_collect(); */ -static int + static int clht_gc_collect_cond(clht_t* hashtable, int collect_not_referenced_only) { - /* if version_min >= current version there is nothing to collect! */ - if ((hashtable->version_min >= hashtable->ht->version) || TRYLOCK_ACQ(&hashtable->gc_lock)) + clht_hashtable_t *ht_ptr = (clht_hashtable_t *)clht_ptr_from_off(hashtable->ht_off, false); + /* if version_min >= current version there is nothing to collect! */ + if ((hashtable->version_min >= ht_ptr->version) || TRYLOCK_ACQ(&hashtable->gc_lock)) { - /* printf("** someone else is performing gc\n"); */ - return 0; + /* printf("** someone else is performing gc\n"); */ + return 0; } - ticks s = getticks(); + //ticks s = getticks(); - /* printf("[GCOLLE-%02d] LOCK : %zu\n", GET_ID(collect_not_referenced_only), hashtable->version); */ + /* printf("[GCOLLE-%02d] LOCK : %zu\n", GET_ID(collect_not_referenced_only), hashtable->version); */ - size_t version_min = hashtable->ht->version; - if (collect_not_referenced_only) + size_t version_min = ht_ptr->version; + if (collect_not_referenced_only) { - version_min = clht_gc_min_version_used(hashtable); + version_min = clht_gc_min_version_used(hashtable); } - /* printf("[GCOLLE-%02d] gc collect versions < %3zu - current: %3zu - oldest: %zu\n", */ - /* GET_ID(collect_not_referenced_only), version_min, hashtable->version, hashtable->version_min); */ + /* printf("[GCOLLE-%02d] gc collect versions < %3zu - current: %3zu - oldest: %zu\n", */ + /* GET_ID(collect_not_referenced_only), version_min, hashtable->version, hashtable->version_min); */ - int gced_num = 0; + int gced_num = 0; - if (hashtable->version_min >= version_min) + if (hashtable->version_min >= version_min) { - /* printf("[GCOLLE-%02d] UNLOCK: %zu (nothing to collect)\n", GET_ID(collect_not_referenced_only), hashtable->ht->version); */ - TRYLOCK_RLS(hashtable->gc_lock); + /* printf("[GCOLLE-%02d] UNLOCK: %zu (nothing to collect)\n", GET_ID(collect_not_referenced_only), hashtable->ht->version); */ + TRYLOCK_RLS(hashtable->gc_lock); } - else + else { - /* printf("[GCOLLE-%02d] collect from %zu to %zu\n", GET_ID(collect_not_referenced_only), hashtable->version_min, version_min); */ - - clht_hashtable_t* cur = hashtable->ht_oldest; - while (cur != NULL && cur->version < version_min) - { - gced_num++; - clht_hashtable_t* nxt = cur->table_new; - /* printf("[GCOLLE-%02d] gc_free version: %6zu | current version: %6zu\n", GET_ID(collect_not_referenced_only), */ - /* cur->version, hashtable->ht->version); */ - nxt->table_prev = NULL; - clht_gc_free(cur); - cur = nxt; - } - - hashtable->version_min = cur->version; - hashtable->ht_oldest = cur; - - TRYLOCK_RLS(hashtable->gc_lock); - /* printf("[GCOLLE-%02d] UNLOCK: %zu\n", GET_ID(collect_not_referenced_only), cur->version); */ + /* printf("[GCOLLE-%02d] collect from %zu to %zu\n", GET_ID(collect_not_referenced_only), hashtable->version_min, version_min); */ + + clht_hashtable_t* cur = hashtable->ht_oldest; + while (cur != NULL && cur->version < version_min) + { + gced_num++; + clht_hashtable_t* nxt = cur->table_new; + /* printf("[GCOLLE-%02d] gc_free version: %6zu | current version: %6zu\n", GET_ID(collect_not_referenced_only), */ + /* cur->version, hashtable->ht->version); */ + nxt->table_prev = NULL; + clht_gc_free(cur); + cur = nxt; + } + + hashtable->version_min = cur->version; + hashtable->ht_oldest = cur; + + TRYLOCK_RLS(hashtable->gc_lock); + /* printf("[GCOLLE-%02d] UNLOCK: %zu\n", GET_ID(collect_not_referenced_only), cur->version); */ } - ticks e = getticks() - s; - printf("[GCOLLE-%02d] collected: %-3d | took: %13llu ti = %8.6f s\n", - GET_ID(collect_not_referenced_only), gced_num, (unsigned long long) e, e / 2.1e9); + //ticks e = getticks() - s; + //printf("[GCOLLE-%02d] collected: %-3d | took: %13llu ti = %8.6f s\n", + // GET_ID(collect_not_referenced_only), gced_num, (unsigned long long) e, e / 2.1e9); - return gced_num; + return gced_num; } /* * free the given hashtable */ -int + int clht_gc_free(clht_hashtable_t* hashtable) { - /* the CLHT_LINKED version does not allocate any extra buckets! */ + /* the CLHT_LINKED version does not allocate any extra buckets! */ #if !defined(CLHT_LB_LINKED) && !defined(LOCKFREE_RES) - uint64_t num_buckets = hashtable->num_buckets; - volatile bucket_t* bucket = NULL; + uint64_t num_buckets = hashtable->num_buckets; + volatile bucket_t* bucket = NULL; - uint64_t bin; - for (bin = 0; bin < num_buckets; bin++) + uint64_t bin; + for (bin = 0; bin < num_buckets; bin++) { - bucket = hashtable->table + bin; - bucket = bucket->next; - while (bucket != NULL) - { - volatile bucket_t* cur = bucket; - bucket = bucket->next; - free((void*) cur); - } + bucket = ((bucket_t *)clht_ptr_from_off(hashtable->table_off, true)) + bin; + bucket = (bucket_t *)(clht_ptr_from_off(bucket->next_off, true)); + while (bucket != NULL) + { + volatile bucket_t* cur = (bucket_t *)((uint64_t)bucket - ALIGNMENT_PADDING); + bucket = clht_ptr_from_off(bucket->next_off, true); + PMEMoid cur_oid = pmemobj_oid((void *)cur); + pmemobj_free(&cur_oid); + } } #endif - free(hashtable->table); - free(hashtable); + PMEMoid table_oid = {pool_uuid, hashtable->table_off}; + pmemobj_free(&table_oid); + PMEMoid ht_oid = pmemobj_oid((void *)hashtable); + pmemobj_free(&ht_oid); - return 1; + return 1; } /* * free all hashtable version (inluding the latest) */ -void + void clht_gc_destroy(clht_t* hashtable) { #if !defined(CLHT_LINKED) - clht_gc_collect_all(hashtable); - clht_gc_free(hashtable->ht); - free(hashtable); + clht_gc_collect_all(hashtable); + clht_gc_free(clht_ptr_from_off(hashtable->ht_off, false)); + PMEMoid ht_oid = pmemobj_oid((void *)hashtable); + pmemobj_free(&ht_oid); #endif - // ssmem_alloc_term(clht_alloc); - free(clht_alloc); + // ssmem_alloc_term(clht_alloc); + free(clht_alloc); } /* @@ -259,31 +266,31 @@ clht_gc_destroy(clht_t* hashtable) * to the OS (free), when it is safe (nobody is using it * anymore) */ -inline int + inline int clht_gc_release(clht_hashtable_t* hashtable) { - /* the CLHT_LINKED version does not allocate any extra buckets! */ + /* the CLHT_LINKED version does not allocate any extra buckets! */ #if !defined(CLHT_LINKED) && !defined(LOCKFREE_RES) - uint64_t num_buckets = hashtable->num_buckets; - volatile bucket_t* bucket = NULL; + uint64_t num_buckets = hashtable->num_buckets; + volatile bucket_t* bucket = NULL; - uint64_t bin; - for (bin = 0; bin < num_buckets; bin++) + uint64_t bin; + for (bin = 0; bin < num_buckets; bin++) { - bucket = hashtable->table + bin; - bucket = bucket->next; - while (bucket != NULL) - { - volatile bucket_t* cur = bucket; - bucket = bucket->next; - ssmem_release(clht_alloc, (void*) cur); - } + bucket = ((bucket_t *)clht_ptr_from_off(hashtable->table_off, true)) + bin; + bucket = (bucket_t *)clht_ptr_from_off(bucket->next_off, true); + while (bucket != NULL) + { + volatile bucket_t* cur = (bucket_t *)((uint64_t)bucket - ALIGNMENT_PADDING); + bucket = (bucket_t *)clht_ptr_from_off(bucket->next_off, true); + ssmem_release(clht_alloc, (void*) cur); + } } #endif - ssmem_release(clht_alloc, hashtable->table); - ssmem_release(clht_alloc, hashtable); - return 1; + ssmem_release(clht_alloc, clht_ptr_from_off(hashtable->table_off, false)); + ssmem_release(clht_alloc, hashtable); + return 1; } diff --git a/P-CLHT/src/clht_lb_res.c b/P-CLHT/src/clht_lb_res.c index 16650c33..ff1668fc 100644 --- a/P-CLHT/src/clht_lb_res.c +++ b/P-CLHT/src/clht_lb_res.c @@ -32,7 +32,6 @@ #include #include #include -#include #include #include #include @@ -40,9 +39,6 @@ #include "clht_lb_res.h" //#define CLHTDEBUG -//#define CRASH_AFTER_SWAP_CLHT -//#define CRASH_BEFORE_SWAP_CLHT -//#define CRASH_DURING_NODE_CREATE __thread ssmem_allocator_t* clht_alloc; @@ -57,7 +53,6 @@ __thread size_t check_ht_status_steps = CLHT_STATUS_INVOK_IN; #include "stdlib.h" #include "assert.h" - #if defined(CLHTDEBUG) #define DEBUG_PRINT(fmt, args...) fprintf(stderr, "CLHT_DEBUG: %s:%d:%s(): " fmt, \ __FILE__, __LINE__, __func__, ##args) @@ -65,14 +60,6 @@ __thread size_t check_ht_status_steps = CLHT_STATUS_INVOK_IN; #define DEBUG_PRINT(fmt, args...) #endif -/* -#ifdef CLHTDEBUG - #define DEBUG_PRINT(x) printf x -#else - #define DEBUG_PRINT(x) do {} while (0) -#endif -*/ - const char* clht_type_desc() { @@ -106,23 +93,18 @@ __ac_Jenkins_hash_64(uint64_t key) return key; } -static uint64_t write_latency = 0; -static uint64_t CPU_FREQ_MHZ = 2100; - -static inline void cpu_pause() -{ - __asm__ volatile ("pause" ::: "memory"); -} - -static inline unsigned long read_tsc(void) +inline void *clht_ptr_from_off(uint64_t offset, bool alignment) { - unsigned long var; - unsigned int hi, lo; - - asm volatile ("rdtsc" : "=a" (lo), "=d" (hi)); - var = ((unsigned long long int) hi << 32) | lo; - - return var; + PMEMoid oid = {pool_uuid, offset}; + void *vaddr = pmemobj_direct(oid); + if (!alignment) + return vaddr; + else { + if (vaddr) + return (void *)((uint64_t)vaddr + ALIGNMENT_PADDING); + else + return vaddr; + } } static inline void mfence() { @@ -135,7 +117,6 @@ static inline void clflush(char *data, int len, bool front, bool back) if (front) mfence(); for(; ptrnext) - clflush_next_check((char *)(((bucket_t *)ptr)->next), sizeof(bucket_t), false); - while(read_tsc() < etsc) cpu_pause(); + if (((bucket_t *)ptr)->next_off != OID_NULL.off) + clflush_next_check((char *)clht_ptr_from_off((((bucket_t *)ptr)->next_off), true), sizeof(bucket_t), false); } if (fence) mfence(); } +// Implemented without assuming cacheline-aligned allocation +static inline void clflush_new_hashtable(bucket_t *buckets, size_t num_buckets) { + bucket_t *bucket; + + clflush((char *)buckets, num_buckets * sizeof(bucket_t), false, false); + for (uint64_t i = 0; i < num_buckets; i++) { + bucket = &buckets[i]; + while (bucket->next_off != OID_NULL.off) { + bucket = (bucket_t *)clht_ptr_from_off(bucket->next_off, true); + clflush((char *)bucket, sizeof(bucket_t), false, false); + } + } +} + static inline void movnt64(uint64_t *dest, uint64_t const src, bool front, bool back) { assert(((uint64_t)dest & 7) == 0); if (front) mfence(); @@ -178,33 +171,42 @@ static inline void movnt64(uint64_t *dest, uint64_t const src, bool front, bool if (back) mfence(); } -/* Create a new bucket. */ - bucket_t* -clht_bucket_create() +static int bucket_init(PMEMobjpool *pop_arg, void *ptr, void *arg) { - bucket_t* bucket = NULL; - bucket = (bucket_t *) memalign(CACHE_LINE_SIZE, sizeof(bucket_t)); - if (bucket == NULL) - { - return NULL; - } - + bucket_t *bucket = (bucket_t *)((uint64_t)ptr + ALIGNMENT_PADDING); bucket->lock = 0; uint32_t j; for (j = 0; j < ENTRIES_PER_BUCKET; j++) - { bucket->key[j] = 0; + bucket->next_off = OID_NULL.off; + return 0; +} + +/* Create a new bucket. */ + bucket_t* +clht_bucket_create() +{ + bucket_t* bucket = NULL; + PMEMoid bucket_oid; + if (pmemobj_alloc(pop, &bucket_oid, sizeof(bucket_t) + ALIGNMENT_PADDING, TOID_TYPE_NUM(bucket_t), bucket_init, 0)) { + fprintf(stderr, "pmemobj_alloc failed for clht_bucket_create\n"); + assert(0); } - bucket->next = NULL; - return bucket; + bucket = pmemobj_direct(bucket_oid); + if (bucket == NULL) + return NULL; + return (bucket_t *)((uint64_t)bucket + ALIGNMENT_PADDING); } bucket_t* clht_bucket_create_stats(clht_hashtable_t* h, int* resize) { bucket_t* b = clht_bucket_create(); + if (((uint64_t)b & (CACHE_LINE_SIZE - 1)) != 0) + fprintf(stderr, "cacheline-unaligned bucket allocation\n"); + //if (IAF_U32(&h->num_expands) == h->num_expands_threshold) if (IAF_U32(&h->num_expands) >= h->num_expands_threshold) { @@ -219,29 +221,55 @@ clht_hashtable_t* clht_hashtable_create(uint64_t num_buckets); clht_t* clht_create(uint64_t num_buckets) { - clht_t* w = (clht_t*) memalign(CACHE_LINE_SIZE, sizeof(clht_t)); + // Enable prefault + int arg_open = 1, arg_create = 1; + if ((pmemobj_ctl_set(pop, "prefault.at_open", &arg_open)) != 0) + perror("failed to configure prefaults at open\n"); + if ((pmemobj_ctl_set(pop, "prefault.at_create", &arg_create)) != 0) + perror("failed to configure prefaults at create\n"); + + // Open the PMEMpool if it exists, otherwise create it + size_t pool_size = 32*1024*1024*1024UL; + if (access("/dev/shm/pool", F_OK) != -1) + pop = pmemobj_open("/dev/shm/pool", POBJ_LAYOUT_NAME(clht)); + else + pop = pmemobj_create("/dev/shm/pool", POBJ_LAYOUT_NAME(clht), pool_size, 0666); + + if (pop == NULL) + perror("failed to open the pool\n"); + + // Create the root pointer + PMEMoid my_root = pmemobj_root(pop, sizeof(clht_t)); + if (pmemobj_direct(my_root) == NULL) + perror("root pointer is null\n"); + pool_uuid = my_root.pool_uuid_lo; + + clht_t *w = pmemobj_direct(my_root); if (w == NULL) - { - printf("** malloc @ hatshtalbe\n"); return NULL; - } - w->ht = clht_hashtable_create(num_buckets); - if (w->ht == NULL) - { - free(w); - return NULL; + if (w->ht_off == 0) { + clht_hashtable_t *ht_ptr; + ht_ptr = clht_hashtable_create(num_buckets); + assert(ht_ptr != NULL); + + w->ht_off = pmemobj_oid(ht_ptr).off; + w->resize_lock = LOCK_FREE; + w->gc_lock = LOCK_FREE; + w->status_lock = LOCK_FREE; + w->version_list = NULL; + w->version_min = 0; + w->ht_oldest = ht_ptr; + + // This should flush everything to persistent memory + clflush((char *)clht_ptr_from_off(ht_ptr->table_off, true), num_buckets * sizeof(bucket_t), false, true); + clflush((char *)ht_ptr, sizeof(clht_hashtable_t), false, true); + clflush((char *)w, sizeof(clht_t), false, true); + } else { + w->resize_lock = LOCK_FREE; + w->gc_lock = LOCK_FREE; + w->status_lock = LOCK_FREE; } - w->resize_lock = LOCK_FREE; - w->gc_lock = LOCK_FREE; - w->status_lock = LOCK_FREE; - w->version_list = NULL; - w->version_min = 0; - w->ht_oldest = w->ht; - - clflush((char *)w->ht->table, num_buckets * sizeof(bucket_t), false, true); - clflush((char *)w->ht, sizeof(clht_hashtable_t), false, true); - clflush((char *)w, sizeof(clht_t), false, true); return w; } @@ -257,33 +285,40 @@ clht_hashtable_create(uint64_t num_buckets) } /* Allocate the table itself. */ - hashtable = (clht_hashtable_t*) memalign(CACHE_LINE_SIZE, sizeof(clht_hashtable_t)); - if (hashtable == NULL) - { - printf("** malloc @ hatshtalbe\n"); - return NULL; + PMEMoid ht_oid; + if (pmemobj_alloc(pop, &ht_oid, sizeof(clht_hashtable_t), TOID_TYPE_NUM(clht_hashtable_t), 0, 0)) { + fprintf(stderr, "pmemobj_alloc failed for clht_hashtable_create\n"); + assert(0); } - /* hashtable->table = calloc(num_buckets, (sizeof(bucket_t))); */ - hashtable->table = (bucket_t*) memalign(CACHE_LINE_SIZE, num_buckets * (sizeof(bucket_t))); - if (hashtable->table == NULL) - { - printf("** alloc: hashtable->table\n"); fflush(stdout); - free(hashtable); + hashtable = pmemobj_direct(ht_oid); + if (hashtable == NULL) { + printf("** malloc @ hashtable\n"); return NULL; } - memset(hashtable->table, 0, num_buckets * (sizeof(bucket_t))); + PMEMoid table_oid; + if (pmemobj_zalloc(pop, &table_oid, (num_buckets * sizeof(bucket_t)) + ALIGNMENT_PADDING, TOID_TYPE_NUM(bucket_t))) { + fprintf(stderr, "pmemobj_alloc failed for table_oid in clht_hashtable_create\n"); + assert(0); + } + + hashtable->table_off = table_oid.off; + bucket_t *bucket_ptr = (bucket_t *)clht_ptr_from_off(hashtable->table_off, true); + if (bucket_ptr == NULL) { + fprintf(stderr, "** alloc: hashtable->table\n"); + perror("bucket_ptr is null\n"); + return NULL; + } else if (((uint64_t)bucket_ptr & (CACHE_LINE_SIZE -1)) != 0) { + fprintf(stderr, "cacheline-unaligned hash table allocation\n"); + } uint64_t i; - for (i = 0; i < num_buckets; i++) - { - hashtable->table[i].lock = LOCK_FREE; + for (i = 0; i < num_buckets; i++) { + bucket_ptr[i].lock = LOCK_FREE; uint32_t j; for (j = 0; j < ENTRIES_PER_BUCKET; j++) - { - hashtable->table[i].key[j] = 0; - } + bucket_ptr[i].key[j] = 0; } hashtable->num_buckets = num_buckets; @@ -317,21 +352,23 @@ clht_hash(clht_hashtable_t* hashtable, clht_addr_t key) return key & (hashtable->hash); } - /* Retrieve a key-value entry from a hash table. */ clht_val_t -clht_get(clht_hashtable_t* hashtable, clht_addr_t key) +clht_get(clht_t* h, clht_addr_t key) { + clht_hashtable_t *hashtable = clht_ptr_from_off(h->ht_off, false); size_t bin = clht_hash(hashtable, key); CLHT_GC_HT_VERSION_USED(hashtable); - volatile bucket_t* bucket = hashtable->table + bin; + volatile bucket_t *bucket = ((bucket_t *)clht_ptr_from_off(hashtable->table_off, true)) + bin; uint32_t j; + clht_val_t val; do { for (j = 0; j < ENTRIES_PER_BUCKET; j++) { - clht_val_t val = bucket->val[j]; +retry: + val = bucket->val[j]; #ifdef __tile__ _mm_lfence(); #endif @@ -343,12 +380,12 @@ clht_get(clht_hashtable_t* hashtable, clht_addr_t key) } else { - return 0; + goto retry; } } } - bucket = bucket->next; + bucket = (bucket_t *)clht_ptr_from_off(bucket->next_off, true); } while (unlikely(bucket != NULL)); return 0; @@ -367,7 +404,7 @@ bucket_exists(volatile bucket_t* bucket, clht_addr_t key) return true; } } - bucket = bucket->next; + bucket = (bucket_t *)clht_ptr_from_off(bucket->next_off, true); } while (unlikely(bucket != NULL)); return false; @@ -377,9 +414,10 @@ bucket_exists(volatile bucket_t* bucket, clht_addr_t key) int clht_put(clht_t* h, clht_addr_t key, clht_val_t val) { - clht_hashtable_t* hashtable = h->ht; + clht_hashtable_t* hashtable = clht_ptr_from_off(h->ht_off, false); size_t bin = clht_hash(hashtable, key); - volatile bucket_t* bucket = hashtable->table + bin; + volatile bucket_t *bucket = ((bucket_t *)clht_ptr_from_off(hashtable->table_off, true)) + bin; + #if CLHT_READ_ONLY_FAIL == 1 if (bucket_exists(bucket, key)) { @@ -390,10 +428,10 @@ clht_put(clht_t* h, clht_addr_t key, clht_val_t val) clht_lock_t* lock = &bucket->lock; while (!LOCK_ACQ(lock, hashtable)) { - hashtable = h->ht; + hashtable = clht_ptr_from_off(h->ht_off, false); size_t bin = clht_hash(hashtable, key); - bucket = hashtable->table + bin; + bucket = ((bucket_t *)clht_ptr_from_off(hashtable->table_off, true)) + bin; lock = &bucket->lock; } @@ -420,7 +458,7 @@ clht_put(clht_t* h, clht_addr_t key, clht_val_t val) } int resize = 0; - if (likely(bucket->next == NULL)) + if (likely(clht_ptr_from_off(bucket->next_off, true) == NULL)) { if (unlikely(empty == NULL)) { @@ -438,7 +476,7 @@ clht_put(clht_t* h, clht_addr_t key, clht_val_t val) _mm_sfence(); #endif clflush((char *)b, sizeof(bucket_t), false, true); - movnt64((uint64_t *)&bucket->next, (uint64_t)b, false, true); + movnt64((uint64_t *)&bucket->next_off, (uint64_t)(pmemobj_oid(b).off - ALIGNMENT_PADDING), false, true); } else { @@ -464,19 +502,64 @@ clht_put(clht_t* h, clht_addr_t key, clht_val_t val) } return true; } - bucket = bucket->next; + bucket = (bucket_t *)clht_ptr_from_off(bucket->next_off, true); } while (true); } +/* Insert a key-value entry into a hash table. */ + int +clht_update(clht_t* h, clht_addr_t key, clht_val_t val) +{ + clht_hashtable_t* hashtable = (clht_hashtable_t *)clht_ptr_from_off(h->ht_off, false); + size_t bin = clht_hash(hashtable, key); + volatile bucket_t *bucket = ((bucket_t *)clht_ptr_from_off(hashtable->table_off, true)) + bin; + +#if CLHT_READ_ONLY_FAIL == 1 + if (!bucket_exists(bucket, key)) + { + return false; + } +#endif + + clht_lock_t* lock = &bucket->lock; + while (!LOCK_ACQ(lock, hashtable)) + { + hashtable = (clht_hashtable_t *)clht_ptr_from_off(h->ht_off, false); + size_t bin = clht_hash(hashtable, key); + + bucket = ((bucket_t *)clht_ptr_from_off(hashtable->table_off, true)) + bin; + lock = &bucket->lock; + } + + CLHT_GC_HT_VERSION_USED(hashtable); + CLHT_CHECK_STATUS(h); + + uint32_t j; + do + { + for (j = 0; j < ENTRIES_PER_BUCKET; j++) + { + if (bucket->key[j] == key) + { + movnt64((uint64_t *)&bucket->val[j], val, true, true); + return true; + } + } + bucket = (bucket_t *)clht_ptr_from_off(bucket->next_off, true); + } + while (unlikely(bucket != NULL)); + + return false; +} /* Remove a key-value entry from a hash table. */ clht_val_t clht_remove(clht_t* h, clht_addr_t key) { - clht_hashtable_t* hashtable = h->ht; + clht_hashtable_t* hashtable = (clht_hashtable_t *)clht_ptr_from_off(h->ht_off, false); size_t bin = clht_hash(hashtable, key); - volatile bucket_t* bucket = hashtable->table + bin; + volatile bucket_t* bucket = ((bucket_t *)clht_ptr_from_off(hashtable->table_off, true)) + bin; #if CLHT_READ_ONLY_FAIL == 1 if (!bucket_exists(bucket, key)) @@ -488,10 +571,10 @@ clht_remove(clht_t* h, clht_addr_t key) clht_lock_t* lock = &bucket->lock; while (!LOCK_ACQ(lock, hashtable)) { - hashtable = h->ht; + hashtable = (clht_hashtable_t *)clht_ptr_from_off(h->ht_off, false); size_t bin = clht_hash(hashtable, key); - bucket = hashtable->table + bin; + bucket = ((bucket_t *)clht_ptr_from_off(hashtable->table_off, true)) + bin; lock = &bucket->lock; } @@ -512,7 +595,7 @@ clht_remove(clht_t* h, clht_addr_t key) return val; } } - bucket = bucket->next; + bucket = (bucket_t *)clht_ptr_from_off(bucket->next_off, true); } while (unlikely(bucket != NULL)); LOCK_RLS(lock); @@ -522,7 +605,7 @@ clht_remove(clht_t* h, clht_addr_t key) static uint32_t clht_put_seq(clht_hashtable_t* hashtable, clht_addr_t key, clht_val_t val, uint64_t bin) { - volatile bucket_t* bucket = hashtable->table + bin; + volatile bucket_t* bucket = ((bucket_t *)clht_ptr_from_off(hashtable->table_off, true)) + bin; uint32_t j; do @@ -537,17 +620,18 @@ clht_put_seq(clht_hashtable_t* hashtable, clht_addr_t key, clht_val_t val, uint6 } } - if (bucket->next == NULL) + if (clht_ptr_from_off(bucket->next_off, true) == NULL) { DPP(put_num_failed_expand); int null; - bucket->next = clht_bucket_create_stats(hashtable, &null); - bucket->next->val[0] = val; - bucket->next->key[0] = key; + bucket->next_off = pmemobj_oid(clht_bucket_create_stats(hashtable, &null)).off - ALIGNMENT_PADDING; + bucket_t *bucket_ptr = (bucket_t *)clht_ptr_from_off(bucket->next_off, true); + bucket_ptr->val[0] = val; + bucket_ptr->key[0] = key; return true; } - bucket = bucket->next; + bucket = (bucket_t *)clht_ptr_from_off(bucket->next_off, true); } while (true); } @@ -569,39 +653,10 @@ bucket_cpy(clht_t* h, volatile bucket_t* bucket, clht_hashtable_t* ht_new) if (key != 0) { uint64_t bin = clht_hash(ht_new, key); - -#if defined(CRASH_DURING_NODE_CREATE) - pid_t pid = fork(); - - if (pid == 0) { - // Crash state soon after pointer swap. - // This state will verify that all structural changes - // have been performed before the final pointer swap - clht_lock_initialization(h); - DEBUG_PRINT("Child process returned during new bucket creation\n"); - DEBUG_PRINT("-------------ht new------------\n"); - clht_print(ht_new); - DEBUG_PRINT("-------------ht current------------\n"); - clht_print(h->ht); - DEBUG_PRINT("-------------------------\n"); - return -1; - } - - else if (pid > 0){ - int returnStatus; - waitpid(pid, &returnStatus, 0); - DEBUG_PRINT("Continuing in parent process to finish bucket copy\n"); - } - else { - DEBUG_PRINT("Fork failed"); - return 0; - } -#endif - clht_put_seq(ht_new, key, bucket->val[j], bin); } } - bucket = bucket->next; + bucket = (bucket_t *)clht_ptr_from_off(bucket->next_off, true); } while (bucket != NULL); @@ -621,7 +676,7 @@ ht_resize_help(clht_hashtable_t* h) /* hash = num_buckets - 1 */ for (b = h->hash; b >= 0; b--) { - bucket_t* bu_cur = h->table + b; + bucket_t* bu_cur = ((bucket_t *)clht_ptr_from_off(h->table_off, true)) + b; if (!bucket_cpy((clht_t *)h, bu_cur, h->table_tmp)) { /* reached a point where the resizer is handling */ /* printf("[GC-%02d] helped #buckets: %10zu = %5.1f%%\n", */ @@ -633,7 +688,6 @@ ht_resize_help(clht_hashtable_t* h) h->helper_done = 1; } - // return -1 if crash is simulated. int ht_resize_pes(clht_t* h, int is_increase, int by) @@ -642,7 +696,7 @@ ht_resize_pes(clht_t* h, int is_increase, int by) check_ht_status_steps = CLHT_STATUS_INVOK; - clht_hashtable_t* ht_old = h->ht; + clht_hashtable_t* ht_old = (clht_hashtable_t *)clht_ptr_from_off(h->ht_off, false); if (TRYLOCK_ACQ(&h->resize_lock)) { @@ -674,7 +728,7 @@ ht_resize_pes(clht_t* h, int is_increase, int by) size_t b; for (b = 0; b < ht_old->num_buckets; b++) { - bucket_t* bu_cur = ht_old->table + b; + bucket_t* bu_cur = (bucket_t *)(clht_ptr_from_off(ht_old->table_off, true)) + b; int ret = bucket_cpy(h, bu_cur, ht_new); /* reached a point where the helper is handling */ if (ret == -1) return -1; @@ -696,7 +750,7 @@ ht_resize_pes(clht_t* h, int is_increase, int by) size_t b; for (b = 0; b < ht_old->num_buckets; b++) { - bucket_t* bu_cur = ht_old->table + b; + bucket_t* bu_cur = ((bucket_t *)clht_ptr_from_off(ht_old->table_off, true)) + b; int ret = bucket_cpy(h, bu_cur, ht_new); if (ret == -1) return -1; @@ -722,72 +776,14 @@ ht_resize_pes(clht_t* h, int is_increase, int by) mfence(); clflush((char *)ht_new, sizeof(clht_hashtable_t), false, false); - clflush_next_check((char *)ht_new->table, num_buckets_new * sizeof(bucket_t), false); + clflush_new_hashtable((bucket_t *)clht_ptr_from_off(ht_new->table_off, true), num_buckets_new); mfence(); -#if defined(CRASH_BEFORE_SWAP_CLHT) - pid_t pid = fork(); - - if (pid == 0) { - // Crash state soon after pointer swap. - // This state will verify that all structural changes - // have been performed before the final pointer swap - clht_lock_initialization(h); - DEBUG_PRINT("Child process returned before root swap\n"); - DEBUG_PRINT("-------------ht old------------\n"); - clht_print(ht_old); - DEBUG_PRINT("-------------ht new------------\n"); - clht_print(ht_new); - DEBUG_PRINT("-------------ht current------------\n"); - clht_print(h->ht); - DEBUG_PRINT("-------------------------\n"); - return -1; - } - - else if (pid > 0){ - int returnStatus; - waitpid(pid, &returnStatus, 0); - DEBUG_PRINT("Continuing in parent process to finish resizing during ins\n"); - } - else { - DEBUG_PRINT("Fork failed"); - return 0; - } -#endif - // atomically swap the root pointer - SWAP_U64((uint64_t*) h, (uint64_t) ht_new); + // Presume the head of "h" contains the pointer (offset) to the hash table + SWAP_U64((uint64_t*) h, (uint64_t) pmemobj_oid(ht_new).off); clflush((char *)h, sizeof(uint64_t), false, true); -#if defined(CRASH_AFTER_SWAP_CLHT) - pid_t pid1 = fork(); - - if (pid1 == 0) { - // Crash state soon after pointer swap. - // This state will verify that all structural changes - // have been performed before the final pointer swap - clht_lock_initialization(h); - DEBUG_PRINT("Child process returned soon after root swap\n"); - DEBUG_PRINT("-------------ht old------------\n"); - clht_print(ht_old); - DEBUG_PRINT("-------------ht new------------\n"); - clht_print(ht_new); - DEBUG_PRINT("-------------ht current------------\n"); - clht_print(h->ht); - DEBUG_PRINT("-------------------------\n"); - return -1; - } - - else if (pid1 > 0){ - int returnStatus; - waitpid(pid1, &returnStatus, 0); - DEBUG_PRINT("Continuing in parent process to finish resizing during ins\n"); - } - else { - DEBUG_PRINT("Fork failed"); - return 0; - } -#endif DEBUG_PRINT("Parent reached correctly\n"); ht_old->table_new = ht_new; TRYLOCK_RLS(h->resize_lock); @@ -830,7 +826,7 @@ clht_size(clht_hashtable_t* hashtable) uint64_t bin; for (bin = 0; bin < num_buckets; bin++) { - bucket = hashtable->table + bin; + bucket = ((bucket_t *)clht_ptr_from_off(hashtable->table_off, true)) + bin; uint32_t j; do @@ -843,15 +839,13 @@ clht_size(clht_hashtable_t* hashtable) } } - bucket = bucket->next; + bucket = (bucket_t *)clht_ptr_from_off(bucket->next_off, true); } while (bucket != NULL); } return size; } - -// returns 0 on crash size_t ht_status(clht_t* h, int resize_increase, int just_print) { @@ -860,7 +854,7 @@ ht_status(clht_t* h, int resize_increase, int just_print) return 0; } - clht_hashtable_t* hashtable = h->ht; + clht_hashtable_t* hashtable = (clht_hashtable_t *)clht_ptr_from_off(h->ht_off, false); uint64_t num_buckets = hashtable->num_buckets; volatile bucket_t* bucket = NULL; size_t size = 0; @@ -870,7 +864,7 @@ ht_status(clht_t* h, int resize_increase, int just_print) uint64_t bin; for (bin = 0; bin < num_buckets; bin++) { - bucket = hashtable->table + bin; + bucket = ((bucket_t *)clht_ptr_from_off(hashtable->table_off, true)) + bin; int expands_cont = -1; expands--; @@ -887,7 +881,7 @@ ht_status(clht_t* h, int resize_increase, int just_print) } } - bucket = bucket->next; + bucket = (bucket_t *)clht_ptr_from_off(bucket->next_off, true); } while (bucket != NULL); @@ -986,7 +980,7 @@ clht_print(clht_hashtable_t* hashtable) uint64_t bin; for (bin = 0; bin < num_buckets; bin++) { - bucket = hashtable->table + bin; + bucket = ((bucket_t *)clht_ptr_from_off(hashtable->table_off, true)) + bin; printf("[[%05zu]] ", bin); @@ -1001,7 +995,7 @@ clht_print(clht_hashtable_t* hashtable) } } - bucket = bucket->next; + bucket = (bucket_t *)clht_ptr_from_off(bucket->next_off, true); printf(" ** -> "); } while (bucket != NULL); @@ -1013,7 +1007,7 @@ clht_print(clht_hashtable_t* hashtable) void clht_lock_initialization(clht_t *h) { DEBUG_PRINT("Performing Lock initialization\n"); - clht_hashtable_t *ht = h->ht; + clht_hashtable_t *ht = (clht_hashtable_t *)clht_ptr_from_off(h->ht_off, false); volatile bucket_t *next; h->resize_lock = LOCK_FREE; @@ -1021,9 +1015,11 @@ void clht_lock_initialization(clht_t *h) h->gc_lock = LOCK_FREE; int i; + bucket_t *buckets = (bucket_t *)clht_ptr_from_off(ht->table_off, true); for (i = 0; i < ht->num_buckets; i++) { - ht->table[i].lock = LOCK_FREE; - for (next = ht->table[i].next; next != NULL; next = next->next) { + buckets[i].lock = LOCK_FREE; + for (next = clht_ptr_from_off(buckets[i].next_off, true); + next != NULL; next = clht_ptr_from_off(next->next_off, true)) { next->lock = LOCK_FREE; } } From 70bf21c6240327f4f8fac343aba708f194fe19f4 Mon Sep 17 00:00:00 2001 From: Sekwon Lee Date: Sun, 3 Jan 2021 13:41:47 +0900 Subject: [PATCH 4/9] Fix missed unlock --- P-CLHT/src/clht_lb_res.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/P-CLHT/src/clht_lb_res.c b/P-CLHT/src/clht_lb_res.c index ff1668fc..7411f12b 100644 --- a/P-CLHT/src/clht_lb_res.c +++ b/P-CLHT/src/clht_lb_res.c @@ -507,7 +507,7 @@ clht_put(clht_t* h, clht_addr_t key, clht_val_t val) while (true); } -/* Insert a key-value entry into a hash table. */ +/* Update a value entry associated with given key. */ int clht_update(clht_t* h, clht_addr_t key, clht_val_t val) { @@ -543,6 +543,7 @@ clht_update(clht_t* h, clht_addr_t key, clht_val_t val) if (bucket->key[j] == key) { movnt64((uint64_t *)&bucket->val[j], val, true, true); + LOCK_RLS(lock); return true; } } From 4c5a75234549e22ee0eabd0789ec0b02c495146c Mon Sep 17 00:00:00 2001 From: Sekwon Lee Date: Sun, 18 Apr 2021 22:35:43 -0500 Subject: [PATCH 5/9] Fix a bug not releasing locks in clht_update --- P-CLHT/src/clht_lb_res.c | 1 + 1 file changed, 1 insertion(+) diff --git a/P-CLHT/src/clht_lb_res.c b/P-CLHT/src/clht_lb_res.c index 7411f12b..6f6e56a3 100644 --- a/P-CLHT/src/clht_lb_res.c +++ b/P-CLHT/src/clht_lb_res.c @@ -551,6 +551,7 @@ clht_update(clht_t* h, clht_addr_t key, clht_val_t val) } while (unlikely(bucket != NULL)); + LOCK_RLS(lock); return false; } From fddae5d7d5839273c2a765f189f8eaa0c33e34bb Mon Sep 17 00:00:00 2001 From: Sekwon Lee Date: Sun, 18 Apr 2021 22:38:07 -0500 Subject: [PATCH 6/9] Comments on the assumption releasing locks after a crash --- P-CLHT/include/clht_lb_res.h | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/P-CLHT/include/clht_lb_res.h b/P-CLHT/include/clht_lb_res.h index 13df6999..545cd7cc 100644 --- a/P-CLHT/include/clht_lb_res.h +++ b/P-CLHT/include/clht_lb_res.h @@ -159,6 +159,8 @@ typedef volatile uint8_t clht_lock_t; typedef struct ALIGNED(CACHE_LINE_SIZE) bucket_s { + // Although our current implementation does not provide post-crash mechanism, + // the locks should be released after a crash (Please refer to the function clht_lock_initialization()) clht_lock_t lock; volatile uint32_t hops; clht_addr_t key[ENTRIES_PER_BUCKET]; @@ -181,6 +183,8 @@ typedef struct ALIGNED(CACHE_LINE_SIZE) clht struct clht_hashtable_s* ht_oldest; struct ht_ts* version_list; size_t version_min; + // Although our current implementation does not provide post-crash mechanism, + // the locks should be released after a crash (Please refer to the function clht_lock_initialization()) volatile clht_lock_t resize_lock; volatile clht_lock_t gc_lock; volatile clht_lock_t status_lock; From 39c58ccef137813069b870df95c23e48f2a0f4de Mon Sep 17 00:00:00 2001 From: Sekwon Lee Date: Sun, 18 Apr 2021 22:43:27 -0500 Subject: [PATCH 7/9] Comments on the duplicated flush and initialization to the hash table --- P-CLHT/src/clht_lb_res.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/P-CLHT/src/clht_lb_res.c b/P-CLHT/src/clht_lb_res.c index 6f6e56a3..9f17f2af 100644 --- a/P-CLHT/src/clht_lb_res.c +++ b/P-CLHT/src/clht_lb_res.c @@ -262,6 +262,8 @@ clht_create(uint64_t num_buckets) w->ht_oldest = ht_ptr; // This should flush everything to persistent memory + // Actually, the following flush to the buckets is unnecessary as we are using pmemobj_zalloc + // to allocate the hash table. However, we just leave the flush for a reference clflush((char *)clht_ptr_from_off(ht_ptr->table_off, true), num_buckets * sizeof(bucket_t), false, true); clflush((char *)ht_ptr, sizeof(clht_hashtable_t), false, true); clflush((char *)w, sizeof(clht_t), false, true); @@ -313,6 +315,9 @@ clht_hashtable_create(uint64_t num_buckets) fprintf(stderr, "cacheline-unaligned hash table allocation\n"); } + // Note that in practice, the initializing procedures from 320 to 325 are unnecessary + // since we are allocating the hash table using pmemobj_zalloc. However, we leave those + // for a reference. uint64_t i; for (i = 0; i < num_buckets; i++) { bucket_ptr[i].lock = LOCK_FREE; From 537f41e9273f1805705de1c0818cffbad5de3e9d Mon Sep 17 00:00:00 2001 From: Sekwon Lee Date: Fri, 30 Sep 2022 19:02:13 +0000 Subject: [PATCH 8/9] Resolve compile error: multiple definition --- P-CLHT/include/clht_lb_res.h | 4 ++-- P-CLHT/src/clht_lb_res.c | 4 ++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/P-CLHT/include/clht_lb_res.h b/P-CLHT/include/clht_lb_res.h index 545cd7cc..1f1c3236 100644 --- a/P-CLHT/include/clht_lb_res.h +++ b/P-CLHT/include/clht_lb_res.h @@ -462,10 +462,10 @@ POBJ_LAYOUT_TOID(clht, bucket_t); POBJ_LAYOUT_END(clht); // Global pool uuid -uint64_t pool_uuid; +extern uint64_t pool_uuid; // Global pool pointer -PMEMobjpool *pop; +extern PMEMobjpool *pop; // pmemobj header size (presume using default compact header) #define POBJ_HEADER_SIZE 16 diff --git a/P-CLHT/src/clht_lb_res.c b/P-CLHT/src/clht_lb_res.c index 050e2a22..2eb1bc68 100644 --- a/P-CLHT/src/clht_lb_res.c +++ b/P-CLHT/src/clht_lb_res.c @@ -38,6 +38,10 @@ #include "clht_lb_res.h" +uint64_t pool_uuid; + +PMEMobjpool *pop; + //#define CLHTDEBUG __thread ssmem_allocator_t* clht_alloc; From 145f51071627b76db7069ed831cf933273daf129 Mon Sep 17 00:00:00 2001 From: Sekwon Lee Date: Wed, 19 Apr 2023 03:06:43 +0000 Subject: [PATCH 9/9] Fix type-casting error --- P-CLHT/src/clht_lb_res.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/P-CLHT/src/clht_lb_res.c b/P-CLHT/src/clht_lb_res.c index 2eb1bc68..d302b2fa 100644 --- a/P-CLHT/src/clht_lb_res.c +++ b/P-CLHT/src/clht_lb_res.c @@ -794,7 +794,7 @@ ht_resize_pes(clht_t* h, int is_increase, int by) // Presume the head of "h" contains the pointer (offset) to the hash table //SWAP_U64((uint64_t*) h, (uint64_t) pmemobj_oid(ht_new).off); //clflush((char *)h, sizeof(uint64_t), false, true); - movnt64((uint64_t)&h->ht_off, (uint64_t)pmemobj_oid(ht_new).off, false, true); + movnt64((uint64_t *)&h->ht_off, (uint64_t)pmemobj_oid(ht_new).off, false, true); DEBUG_PRINT("Parent reached correctly\n"); ht_old->table_new = ht_new;