-
Notifications
You must be signed in to change notification settings - Fork 34
/
thread_pool.hpp
107 lines (91 loc) · 3.16 KB
/
thread_pool.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
/**
* Copyright Quadrivium LLC
* All Rights Reserved
* SPDX-License-Identifier: Apache-2.0
*/
#pragma once
#include <boost/asio/io_context.hpp>
#include <soralog/util.hpp>
#include "application/app_state_manager.hpp"
#include "injector/inject.hpp"
#include "log/logger.hpp"
#include "utils/pool_handler.hpp"
#include "utils/watchdog.hpp"
namespace kagome {
struct TestThreadPool {
std::shared_ptr<boost::asio::io_context> io = nullptr;
};
/**
* Creates `io_context` and runs it on `thread_count` threads.
*/
class ThreadPool {
public:
ThreadPool(ThreadPool &&) = delete;
ThreadPool(const ThreadPool &) = delete;
ThreadPool &operator=(ThreadPool &&) = delete;
ThreadPool &operator=(const ThreadPool &) = delete;
DONT_INJECT(ThreadPool);
ThreadPool(std::shared_ptr<Watchdog> watchdog,
std::string_view pool_tag,
size_t thread_count,
std::optional<std::shared_ptr<boost::asio::io_context>> ioc = {})
: log_(log::createLogger(fmt::format("ThreadPool:{}", pool_tag),
"threads")),
ioc_{ioc.has_value() ? std::move(ioc.value())
: std::make_shared<boost::asio::io_context>()},
work_guard_{ioc_->get_executor()} {
BOOST_ASSERT(ioc_);
BOOST_ASSERT(thread_count > 0);
SL_TRACE(log_, "Pool created");
threads_.reserve(thread_count);
for (size_t i = 0; i < thread_count; ++i) {
std::string label(thread_count > 1
? fmt::format("{}.{}", pool_tag, i + 1)
: pool_tag);
threads_.emplace_back(
[log(log_), io{ioc_}, watchdog, label{std::move(label)}] {
soralog::util::setThreadName(label);
SL_TRACE(log, "Thread '{}' started", label);
watchdog->run(io);
SL_TRACE(log, "Thread '{}' stopped", label);
});
}
}
ThreadPool(TestThreadPool test)
: log_{log::createLogger("TestThreadPool")},
ioc_{test.io ? test.io
: std::make_shared<boost::asio::io_context>()} {}
virtual ~ThreadPool() {
for (auto &thread : threads_) {
SL_TRACE(log_, "Joining thread…");
thread.join();
}
SL_TRACE(log_, "Pool destroyed");
}
const std::shared_ptr<boost::asio::io_context> &io_context() const {
return ioc_;
}
std::shared_ptr<PoolHandler> handlerManual() {
BOOST_ASSERT(ioc_);
return std::make_shared<PoolHandler>(ioc_);
}
std::shared_ptr<PoolHandler> handlerStarted() {
auto handler = handlerManual();
handler->start();
return handler;
}
std::shared_ptr<PoolHandler> handler(
application::AppStateManager &app_state_manager) {
auto handler = handlerManual();
app_state_manager.takeControl(*handler);
return handler;
}
private:
log::Logger log_;
std::shared_ptr<boost::asio::io_context> ioc_;
std::optional<boost::asio::executor_work_guard<
boost::asio::io_context::executor_type>>
work_guard_;
std::vector<std::thread> threads_;
};
} // namespace kagome