diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8db1ff4 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +/build +/.cache +/.vscode \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index b4fae6b..137347e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -15,25 +15,6 @@ option(PIBENCH_BUILD_LEVELDB "Build LevelDB wrapper" OFF) include(CTest) -find_package(OpenMP REQUIRED) - -if(CMAKE_C_COMPILER_ID MATCHES "Clang") - set(OpenMP_C "${CMAKE_C_COMPILER}") - set(OpenMP_C_FLAGS "-fopenmp=libomp") - set(OpenMP_C_LIB_NAMES "libomp" "libgomp" "libiomp5") - set(OpenMP_libomp_LIBRARY ${OpenMP_C_LIB_NAMES}) - set(OpenMP_libgomp_LIBRARY ${OpenMP_C_LIB_NAMES}) - set(OpenMP_libiomp5_LIBRARY ${OpenMP_C_LIB_NAMES}) -endif() -if(CMAKE_CXX_COMPILER_ID MATCHES "Clang") - set(OpenMP_CXX "${CMAKE_CXX_COMPILER}") - set(OpenMP_CXX_FLAGS "-fopenmp=libomp") - set(OpenMP_CXX_LIB_NAMES "libomp" "libgomp" "libiomp5") - set(OpenMP_libomp_LIBRARY ${OpenMP_CXX_LIB_NAMES}) - set(OpenMP_libgomp_LIBRARY ${OpenMP_CXX_LIB_NAMES}) - set(OpenMP_libiomp5_LIBRARY ${OpenMP_CXX_LIB_NAMES}) -endif() - ######################## Intel PCM ######################## add_custom_command(OUTPUT libPCM.a COMMAND make lib diff --git a/README.md b/README.md index 1c9fbc2..d260607 100644 --- a/README.md +++ b/README.md @@ -72,20 +72,6 @@ If so, you can comment the following line in `pcm/Makefile`: CXXFLAGS += -DPCM_USE_PERF ``` -# OpenMP -PiBench uses OpenMP internally for multithreading. -The environment variable `OMP_NESTED=true` must be set to guarantee correctness. -Check [here](https://docs.microsoft.com/en-us/cpp/parallel/openmp/reference/openmp-environment-variables?view=vs-2019#omp-nested) for details. - -Other environment variables such as [`OMP_PLACES`](https://gnu.huihoo.org/gcc/gcc-4.9.4/libgomp/OMP_005fPLACES.html#OMP_005fPLACES) and [`OMP_PROC_BIND`](https://gnu.huihoo.org/gcc/gcc-4.9.4/libgomp/OMP_005fPROC_005fBIND.html) can be set to control the multithreaded behavior. - -For example: - -`$ OMP_PLACES=cores OMP_PROC_BIND=true OMP_NESTED=true ./PiBench [...]` - -Note for Clang users: you may need to additionally install OpenMP runtime, on Arch Linux this can be done by installing the package `extra/openmp`. - - # Running The `PiBench` executable is generated and supports the following arguments: ``` diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 9589af2..22f24e5 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -46,13 +46,13 @@ jobs: Clang-6.0 Debug: CC: clang-6.0 CXX: clang++-6.0 - Packages: clang-6.0 libomp5 libomp-dev + Packages: clang-6.0 BuildType: Debug Clang-6.0 Release: CC: clang-6.0 CXX: clang++-6.0 - Packages: clang-6.0 libomp5 libomp-dev + Packages: clang-6.0 BuildType: Release steps: diff --git a/include/utils.hpp b/include/utils.hpp index 9c3c2e3..756632d 100644 --- a/include/utils.hpp +++ b/include/utils.hpp @@ -1,7 +1,18 @@ #ifndef __UTILS_HPP__ #define __UTILS_HPP__ +#include +#include +#include #include +#include +#include +#include +#include +#include +#include +#include +#include namespace PiBench { @@ -150,6 +161,161 @@ namespace utils asm volatile("" : : "g"(vptr) : "memory"); } } + + /** + * @brief implements a thread barrier + */ + class barrier { + public: + /** + * @brief Construct a new barrier object + * + * @param Threshold threshold of the barrier + */ + explicit barrier(std::uint64_t Threshold) + : threshold(Threshold), capacity(Threshold) {} + + /** + * @brief 'holds' the threads untill specified number of threads arrive at + * the barrier. releases waiting threads at the same time. + */ + void arriveAndWait() { + std::unique_lock lock(mtx); + uint64_t localGeneration = generation; + capacity--; + + if (capacity == 0) { + generation++; + capacity = threshold; + cv.notify_all(); + } else { + cv.wait(lock, [this, localGeneration] { + return localGeneration != generation; + }); + } + } + + private: + std::uint64_t threshold; + std::uint64_t capacity; + // used for preventing spurious wakeups + std::uint64_t generation = 0; + std::mutex mtx; + std::condition_variable cv; + }; + + /** + * @brief performs mathematical divison operation + * + * @param dividend dividend + * @param divisor divisor + * @return std::pair pair containing quotient & + * remainder respectively + */ + inline std::pair divide(const uint64_t dividend, + const uint64_t divisor) { + return {dividend / divisor, dividend % divisor}; + }; + + /** + * @brief get id of current thread + * + * @return uint32_t thread id + */ + inline uint32_t getThreadId() { + return std::hash{}(std::this_thread::get_id()); + } + + //! cores threads should be pinned to. is in utils & not in `opt` to make + //! it easier to call `setAffinity` from outside of PiBench. + inline std::vector cores; + + /** + * @brief set affinity of a thread + * + * @param threadId id of the thread to pin + * @return true if affinity set successfully + * @return false if failed to set affinity + */ + inline bool setAffinity(uint32_t threadId = getThreadId()) { + if (cores.empty()) { + return false; + }; + + int myCpuId = cores[threadId % cores.size()]; + cpu_set_t mySet; + CPU_ZERO(&mySet); + CPU_SET(myCpuId, &mySet); + sched_setaffinity(0, sizeof(cpu_set_t), &mySet); + return true; + } + + /** + * @brief runs a for loop parallelly. the work load is equally divided among + * spcified number of threads. + * + * @param threadNum number of threads that should spawned for the workload + * @param preLoopTask this function is called right before the loop is + * executed + * @param task this function is called in the for loop + * @param iterations number of iterations for loop should perform + */ + inline void parallelForLoop( + const uint64_t threadNum, const std::vector &cores, + const std::function &preLoopTask, + const std::function &task, const uint64_t iterations) { + std::vector threads; + barrier barr(threadNum); + const auto partitionedIterations = divide(iterations, threadNum); + + for (uint64_t j = 0; j < threadNum; j++) { + threads.emplace_back(std::thread([&, partitionedIterations, j]() { + setAffinity(); + uint64_t localThreadId = j; + barr.arriveAndWait(); + + uint64_t threadLoad = j == 0 ? partitionedIterations.first + + partitionedIterations.second + : partitionedIterations.first; + try { + preLoopTask(localThreadId); + } catch (std::exception &ex) { + std::cerr << "exception thrown in the pre-loop task: " << ex.what() + << '\n'; + }; + for (uint64_t i = 0; i < threadLoad; i++) { + try { + task(localThreadId); + } catch (std::exception &ex) { + std::cerr << "exception thrown in the task: " << ex.what() + << '\n'; + }; + }; + })); + }; + + for (auto &i : threads) { + i.join(); + }; + }; + + + /** + * @brief stringify a vector + * + * @tparam T underlying type of vector + * @param vec vector to stringify + * @return std::string string representation with `,` as delimiter + */ + template + std::string stringify(const std::vector& vec){ + if(vec.empty()){ return {}; }; + + std::ostringstream oss; + std::copy(vec.begin(), vec.end() - 1, std::ostream_iterator(oss, ",")); + oss << vec.back(); + return oss.str(); + } } // namespace utils } // namespace PiBench #endif \ No newline at end of file diff --git a/src/benchmark.cpp b/src/benchmark.cpp index 5b485ab..ca34d47 100644 --- a/src/benchmark.cpp +++ b/src/benchmark.cpp @@ -3,14 +3,14 @@ #include #include -#include -#include -#include // std::bind -#include // std::ceil +#include // std::ceil #include #include -#include // std::regex_replace -#include // uname +#include // std::bind +#include +#include +#include // std::regex_replace +#include // uname namespace PiBench { @@ -137,53 +137,44 @@ void benchmark_t::load() noexcept stopwatch_t sw; sw.start(); - { - #pragma omp parallel num_threads(opt_.num_threads) - { - // Initialize insert id for each thread - key_generator_->current_id_ = opt_.num_records / opt_.num_threads * omp_get_thread_num(); - - #pragma omp for schedule(static) - for (uint64_t i = 0; i < opt_.num_records; ++i) - { - // Generate key in sequence - auto key_ptr = key_generator_->next(true); - - // Generate random value - auto value_ptr = value_generator_.next(); - - auto r = tree_->insert(key_ptr, key_generator_->size(), value_ptr, opt_.value_size); - assert(r); - } - } - - } + PiBench::utils::parallelForLoop( + opt_.num_threads, PiBench::utils::cores, + [this](uint64_t threadNum) { + // Initialize insert id for each thread + key_generator_->current_id_ = + opt_.num_records / opt_.num_threads * threadNum; + }, + [this](uint64_t threadNum) { + // Generate key in sequence + auto key_ptr = key_generator_->next(true); + // Generate random value + auto value_ptr = value_generator_.next(); + auto r = tree_->insert(key_ptr, key_generator_->size(), value_ptr, + opt_.value_size); + assert(r); + }, + opt_.num_records); auto elapsed = sw.elapsed(); - std::cout << "Loading finished in " << elapsed << " milliseconds" << std::endl; - - // Verify all keys can be found - { - #pragma omp parallel num_threads(opt_.num_threads) - { - // Initialize insert id for each thread - auto id = opt_.num_records / opt_.num_threads * omp_get_thread_num(); + std::cout << "Loading finished in " << elapsed << " milliseconds" + << std::endl; - #pragma omp for schedule(static) - for (uint64_t i = 0; i < opt_.num_records; ++i) - { - // Generate key in sequence - auto key_ptr = key_generator_->hash_id(id++); - - static thread_local char value_out[value_generator_t::VALUE_MAX]; - bool found = tree_->find(key_ptr, key_generator_->size(), value_out); - if (!found) { - exit(1); - } - } - } - } + PiBench::utils::parallelForLoop( + opt_.num_threads, PiBench::utils::cores, [](uint64_t) {}, + [this](uint64_t threadNum) { + // Initialize insert id for each thread + auto id = opt_.num_records / opt_.num_threads * threadNum; + // Generate key in sequence + auto key_ptr = key_generator_->hash_id(id++); + + static thread_local char value_out[value_generator_t::VALUE_MAX]; + bool found = tree_->find(key_ptr, key_generator_->size(), value_out); + if (!found) { + exit(1); + } + }, + opt_.num_records); std::cout << "Load verified; benchmark started." << std::endl; } @@ -229,131 +220,136 @@ void benchmark_t::run() noexcept double elapsed = 0.0; stopwatch_t sw; - omp_set_nested(true); - #pragma omp parallel sections num_threads(2) - { - #pragma omp section // Monitor thread - { - std::chrono::milliseconds sampling_window(opt_.sampling_ms); - auto sample_stats = [&]() - { - std::this_thread::sleep_for(sampling_window); - stats_t s; - s.operation_count = std::accumulate(local_stats.begin(), local_stats.end(), 0ull, - [](uint64_t sum, const stats_t& curr) { - return sum + curr.operation_count; - }); - global_stats.push_back(std::move(s)); - }; - - if (opt_.bm_mode == mode_t::Operation) - { - while (!finished) - { - sample_stats(); - } - } - else - { - uint32_t iterations = opt_.seconds * 1000 / opt_.sampling_ms; - uint32_t slept = 0; - do { - sample_stats(); - } - while (++slept < iterations); - finished = true; - } + auto monitorThreadTask = [&]() { + PiBench::utils::setAffinity(); + + std::chrono::milliseconds sampling_window(opt_.sampling_ms); + auto sample_stats = [&]() { + std::this_thread::sleep_for(sampling_window); + stats_t s; + s.operation_count = + std::accumulate(local_stats.begin(), local_stats.end(), 0ull, + [](uint64_t sum, const stats_t &curr) { + return sum + curr.operation_count; + }); + global_stats.push_back(std::move(s)); + }; + + if (opt_.bm_mode == mode_t::Operation) { + while (!finished) { + sample_stats(); } - - #pragma omp section // Worker threads - { - #pragma omp parallel num_threads(opt_.num_threads) - { - auto tid = omp_get_thread_num(); - - // Initialize random seed for each thread - key_generator_->set_seed(opt_.rnd_seed * (tid + 1)); - - // Initialize insert id for each thread - key_generator_->current_id_ = current_id + (inserts_per_thread * tid); - - auto random_bool = std::bind(std::bernoulli_distribution(opt_.latency_sampling), std::knuth_b()); - - #pragma omp barrier - - #pragma omp single nowait - { - sw.start(); - } - - auto execute_op = [&]() - { - // Generate random operation - auto op = op_generator_.next(); - - // Generate random scrambled key - const char *key_ptr = nullptr; - if (op == operation_t::INSERT) - { - key_ptr = key_generator_->next(true); - } - else - { - auto id = key_generator_->next_id(); - if (opt_.bm_mode == mode_t::Time) - { - // Scale back to insert amount - id %= (local_stats[tid].success_insert_count * opt_.num_threads + opt_.num_records); - if (id >= opt_.num_records) { - uint64_t ins = id - opt_.num_records; - id = opt_.num_records + inserts_per_thread * (ins / local_stats[tid].success_insert_count) + ins % local_stats[tid].success_insert_count; - } - } - key_ptr = key_generator_->hash_id(id); - } - - auto measure_latency = random_bool(); - if (measure_latency) - { - local_stats[tid].times.push_back(std::chrono::high_resolution_clock::now()); - } - - run_op(op, key_ptr, value_out, values_out, measure_latency, local_stats[tid]); - - if (measure_latency) - { - local_stats[tid].times.push_back(std::chrono::high_resolution_clock::now()); - } - }; - - if (opt_.bm_mode == mode_t::Operation) - { - #pragma omp for schedule(static) - for (uint64_t i = 0; i < opt_.num_ops; ++i) - { - execute_op(); - } - } - else - { - uint32_t slept = 0; - do - { - execute_op(); - } - while (!finished); - } - - // Get elapsed time and signal monitor thread to finish. - #pragma omp single nowait - { - elapsed = sw.elapsed(); - finished = true; - } + } else { + uint32_t iterations = opt_.seconds * 1000 / opt_.sampling_ms; + uint32_t slept = 0; + do { + sample_stats(); + } while (++slept < iterations); + finished = true; + } + }; + auto workerThreadTask = [&]() { + PiBench::utils::setAffinity(); + + uint64_t threadNum = opt_.num_threads; + std::vector threads; + PiBench::utils::barrier barrier(threadNum); + std::once_flag onceFlag1; + std::once_flag onceFlag2; + + // calculate 'for loop' iteration distribution for each thread + std::vector threadOpsLoads(threadNum); + auto loadDivision = PiBench::utils::divide(opt_.num_ops, threadNum); + threadOpsLoads.at(0) = loadDivision.first + loadDivision.second; + std::fill(threadOpsLoads.begin() + 1, threadOpsLoads.end(), + loadDivision.first); + + const auto workerTask = [&, threadNum](uint64_t i) { + PiBench::utils::setAffinity(); + auto tid = i; + + // Initialize random seed for each thread + key_generator_->set_seed(opt_.rnd_seed * (tid + 1)); + + // Initialize insert id for each thread + key_generator_->current_id_ = current_id + (inserts_per_thread * tid); + + auto random_bool = std::bind( + std::bernoulli_distribution(opt_.latency_sampling), std::knuth_b()); + + barrier.arriveAndWait(); + std::call_once(onceFlag1, [&]() { sw.start(); }); + + auto execute_op = [&]() { + // Generate random operation + auto op = op_generator_.next(); + + // Generate random scrambled key + const char *key_ptr = nullptr; + if (op == operation_t::INSERT) { + key_ptr = key_generator_->next(true); + } else { + auto id = key_generator_->next_id(); + if (opt_.bm_mode == mode_t::Time) { + // Scale back to insert amount + id %= (local_stats[tid].success_insert_count * opt_.num_threads + + opt_.num_records); + if (id >= opt_.num_records) { + uint64_t ins = id - opt_.num_records; + id = opt_.num_records + + inserts_per_thread * + (ins / local_stats[tid].success_insert_count) + + ins % local_stats[tid].success_insert_count; + } } + key_ptr = key_generator_->hash_id(id); + } + + auto measure_latency = random_bool(); + if (measure_latency) { + local_stats[tid].times.push_back( + std::chrono::high_resolution_clock::now()); + } + + run_op(op, key_ptr, value_out, values_out, measure_latency, + local_stats[tid]); + + if (measure_latency) { + local_stats[tid].times.push_back( + std::chrono::high_resolution_clock::now()); + } + }; + + if (opt_.bm_mode == mode_t::Operation) { + for (int j = 0; j < threadOpsLoads[i]; j++) { + execute_op(); + }; + } else { + uint32_t slept = 0; + do { + execute_op(); + } while (!finished); } - } - omp_set_nested(false); + + // Get elapsed time and signal monitor thread to finish. + std::call_once(onceFlag2, [&]() { + elapsed = sw.elapsed(); + finished = true; + }); + }; + + for (uint64_t i = 0; i < threadNum; i++) { + threads.emplace_back(std::thread(workerTask, i)); + }; + for (auto &i : threads) { + i.join(); + }; + }; + + std::thread monitorThread(monitorThreadTask); + std::thread workerThread(workerThreadTask); + monitorThread.join(); + workerThread.join(); std::unique_ptr after_sstate; if (opt_.enable_pcm) @@ -606,6 +602,7 @@ std::ostream& operator<<(std::ostream& os, const PiBench::options_t& opt) << "\tKey size: " << opt.key_size << "\n" << "\tValue size: " << opt.value_size << "\n" << "\tRandom seed: " << opt.rnd_seed << "\n" + << "\tCores to pin threads: " << PiBench::utils::stringify(PiBench::utils::cores) << "\n" << "\tKey distribution: " << opt.key_distribution << (opt.key_distribution == PiBench::distribution_t::SELFSIMILAR || opt.key_distribution == PiBench::distribution_t::ZIPFIAN ? "(" + std::to_string(opt.key_skew) + ")" diff --git a/src/main.cpp b/src/main.cpp index c482ab4..c13cde0 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -2,11 +2,13 @@ #include "benchmark.hpp" #include "library_loader.hpp" #include "cxxopts.hpp" +#include "utils.hpp" #include #include #include #include +#include #include @@ -29,6 +31,7 @@ int main(int argc, char** argv) ("n,records", "Number of records to load", cxxopts::value()->default_value(std::to_string(opt.num_records))) ("p,operations", "Number of operations to execute", cxxopts::value()->default_value(std::to_string(opt.num_ops))) ("t,threads", "Number of threads to use", cxxopts::value()->default_value(std::to_string(opt.num_threads))) + ("c,cores", "Cores threads should be pinned to", cxxopts::value()->default_value("")) ("f,key_prefix", "Prefix string prepended to every key", cxxopts::value()->default_value("\"" + opt.key_prefix + "\"")) ("k,key_size", "Size of keys in Bytes (without prefix)", cxxopts::value()->default_value(std::to_string(opt.key_size))) ("v,value_size", "Size of values in Bytes", cxxopts::value()->default_value(std::to_string(opt.value_size))) @@ -95,6 +98,21 @@ int main(int argc, char** argv) if (result.count("threads")) opt.num_threads = result["threads"].as(); + // Parse "cores". + if (result.count("cores")) { + std::string cores = result["cores"].as(); + + size_t last = 0; + size_t next = 0; + while ((next = cores.find(',', last)) != std::string::npos) { + PiBench::utils::cores.emplace_back(static_cast( + std::stoul(cores.substr(last, next - last)))); + last = next + 1; + } + PiBench::utils::cores.emplace_back( + static_cast(std::stoul(cores.substr(last)))); + } + // Parse "sampling_ms" if (result.count("sampling_ms")) opt.sampling_ms = result["sampling_ms"].as();