Skip to content

Commit

Permalink
add BlockingQueue_bench2.cc for context switching in hot potato scenario
Browse files Browse the repository at this point in the history
  • Loading branch information
chenshuo committed Jul 19, 2021
1 parent 770a323 commit 90920bb
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 10 deletions.
2 changes: 1 addition & 1 deletion muduo/base/BlockingQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class BlockingQueue : noncopyable
mutable MutexLock mutex_;
Condition notEmpty_ GUARDED_BY(mutex_);
queue_type queue_ GUARDED_BY(mutex_);
} __attribute__ ((aligned (64)));;
}; // __attribute__ ((aligned (64)));

} // namespace muduo

Expand Down
32 changes: 23 additions & 9 deletions muduo/base/tests/BlockingQueue_bench.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "muduo/base/BlockingQueue.h"
#include "muduo/base/CountDownLatch.h"
#include "muduo/base/Logging.h"
#include "muduo/base/Thread.h"
#include "muduo/base/Timestamp.h"

Expand All @@ -9,6 +10,9 @@
#include <stdio.h>
#include <unistd.h>

bool g_verbose = false;

// Many threads, one queue.
class Bench
{
public:
Expand All @@ -33,13 +37,15 @@ class Bench
{
printf("waiting for count down latch\n");
latch_.wait();
printf("all threads started\n");
LOG_INFO << threads_.size() << " threads started";
int64_t total_delay = 0;
for (int i = 0; i < times; ++i)
{
muduo::Timestamp now(muduo::Timestamp::now());
queue_.put(now);
usleep(1000);
total_delay += delay_queue_.take();
}
printf("Average delay: %.3fus\n", static_cast<double>(total_delay) / times);
}

void joinAll()
Expand All @@ -53,15 +59,18 @@ class Bench
{
thr->join();
}
LOG_INFO << threads_.size() << " threads stopped";
}

private:

void threadFunc()
{
if (g_verbose) {
printf("tid=%d, %s started\n",
muduo::CurrentThread::tid(),
muduo::CurrentThread::name());
}

std::map<int, int> delays;
latch_.countDown();
Expand All @@ -76,22 +85,27 @@ class Bench
// printf("tid=%d, latency = %d us\n",
// muduo::CurrentThread::tid(), delay);
++delays[delay];
delay_queue_.put(delay);
}
running = t.valid();
}

printf("tid=%d, %s stopped\n",
muduo::CurrentThread::tid(),
muduo::CurrentThread::name());
for (const auto& delay : delays)
if (g_verbose)
{
printf("tid = %d, delay = %d, count = %d\n",
printf("tid=%d, %s stopped\n",
muduo::CurrentThread::tid(),
delay.first, delay.second);
muduo::CurrentThread::name());
for (const auto& delay : delays)
{
printf("tid = %d, delay = %d, count = %d\n",
muduo::CurrentThread::tid(),
delay.first, delay.second);
}
}
}

muduo::BlockingQueue<muduo::Timestamp> queue_;
muduo::BlockingQueue<int> delay_queue_;
muduo::CountDownLatch latch_;
std::vector<std::unique_ptr<muduo::Thread>> threads_;
};
Expand All @@ -101,6 +115,6 @@ int main(int argc, char* argv[])
int threads = argc > 1 ? atoi(argv[1]) : 1;

Bench t(threads);
t.run(10000);
t.run(100000);
t.joinAll();
}
123 changes: 123 additions & 0 deletions muduo/base/tests/BlockingQueue_bench2.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
#include "muduo/base/BlockingQueue.h"
#include "muduo/base/CountDownLatch.h"
#include "muduo/base/Thread.h"
#include "muduo/base/Timestamp.h"

#include <map>
#include <string>
#include <vector>
#include <stdio.h>
#include <unistd.h>

// hot potato benchmarking https://en.wikipedia.org/wiki/Hot_potato
// N threads, one hot potato.
class Bench
{
public:
Bench(int numThreads)
: startLatch_(numThreads),
stopLatch_(1)
{
queues_.reserve(numThreads);
threads_.reserve(numThreads);
for (int i = 0; i < numThreads; ++i)
{
queues_.emplace_back(new muduo::BlockingQueue<int>());
char name[32];
snprintf(name, sizeof name, "work thread %d", i);
threads_.emplace_back(new muduo::Thread(
[this, i] { threadFunc(i); },
muduo::string(name)));
}
}

void Start()
{
muduo::Timestamp start = muduo::Timestamp::now();
for (auto& thr : threads_)
{
thr->start();
}
startLatch_.wait();
muduo::Timestamp started = muduo::Timestamp::now();
printf("all %zd threads started, %.3fms\n",
threads_.size(), 1e3 * timeDifference(started, start));
}

void Run()
{
muduo::Timestamp start = muduo::Timestamp::now();
const int rounds = 100003;
queues_[0]->put(rounds);

auto done = done_.take();
double elapsed = timeDifference(done.second, start);
printf("thread id=%d done, total %.3fms, %.3fus / round\n",
done.first, 1e3 * elapsed, 1e6 * elapsed / rounds);
}

void Stop()
{
muduo::Timestamp stop = muduo::Timestamp::now();
for (const auto& queue : queues_)
{
queue->put(-1);
}
for (auto& thr : threads_)
{
thr->join();
}

muduo::Timestamp t2 = muduo::Timestamp::now();
printf("all %zd threads joined, %.3fms\n",
threads_.size(), 1e3 * timeDifference(t2, stop));
}

private:
void threadFunc(int id)
{
startLatch_.countDown();

muduo::BlockingQueue<int>* input = queues_[id].get();
muduo::BlockingQueue<int>* output = queues_[(id+1) % queues_.size()].get();
while (true)
{
int value = input->take();
if (value > 0)
{
output->put(value - 1);
if (verbose_)
{
// printf("thread %d, got %d\n", id, value);
}
continue;
}

if (value == 0)
{
done_.put(std::make_pair(id, muduo::Timestamp::now()));
}
break;
}
}

using TimestampQueue = muduo::BlockingQueue<std::pair<int, muduo::Timestamp>>;
TimestampQueue done_;
muduo::CountDownLatch startLatch_, stopLatch_;
std::vector<std::unique_ptr<muduo::BlockingQueue<int>>> queues_;
std::vector<std::unique_ptr<muduo::Thread>> threads_;
const bool verbose_ = true;
};

int main(int argc, char* argv[])
{
int threads = argc > 1 ? atoi(argv[1]) : 1;

printf("sizeof BlockingQueue = %zd\n", sizeof(muduo::BlockingQueue<int>));
printf("sizeof deque<int> = %zd\n", sizeof(std::deque<int>));
Bench t(threads);
t.Start();
t.Run();
t.Stop();
// exit(0);
}
4 changes: 4 additions & 0 deletions muduo/base/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ target_link_libraries(blockingqueue_test muduo_base)
add_executable(blockingqueue_bench BlockingQueue_bench.cc)
target_link_libraries(blockingqueue_bench muduo_base)

add_executable(blockingqueue_bench2 BlockingQueue_bench2.cc)
target_link_libraries(blockingqueue_bench2 muduo_base)
# set_target_properties(blockingqueue_bench2 PROPERTIES COMPILE_FLAGS "-std=c++17")

add_executable(boundedblockingqueue_test BoundedBlockingQueue_test.cc)
target_link_libraries(boundedblockingqueue_test muduo_base)

Expand Down
4 changes: 4 additions & 0 deletions muduo/base/tests/Mutex_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ int foo()

int main()
{
printf("sizeof pthread_mutex_t: %zd\n", sizeof(pthread_mutex_t));
printf("sizeof Mutex: %zd\n", sizeof(MutexLock));
printf("sizeof pthread_cond_t: %zd\n", sizeof(pthread_cond_t));
printf("sizeof Condition: %zd\n", sizeof(Condition));
MCHECK(foo());
if (g_count != 1)
{
Expand Down

0 comments on commit 90920bb

Please sign in to comment.