@@ -198,15 +198,15 @@ http_request_completion_t http_request::run(cluster* owner) {
198
198
request_queue::request_queue (class cluster * owner, uint32_t request_threads) : creator(owner), terminating(false ), globally_ratelimited(false ), globally_limited_for(0 ), in_thread_pool_size(request_threads)
199
199
{
200
200
for (uint32_t in_alloc = 0 ; in_alloc < in_thread_pool_size; ++in_alloc) {
201
- requests_in.push_back (new in_thread (owner, this , in_alloc));
201
+ requests_in.push_back (std::make_unique< in_thread> (owner, this , in_alloc));
202
202
}
203
203
out_thread = new std::thread (&request_queue::out_loop, this );
204
204
}
205
205
206
206
request_queue& request_queue::add_request_threads (uint32_t request_threads)
207
207
{
208
208
for (uint32_t in_alloc_ex = 0 ; in_alloc_ex < request_threads; ++in_alloc_ex) {
209
- requests_in.push_back (new in_thread (creator, this , in_alloc_ex + in_thread_pool_size));
209
+ requests_in.push_back (std::make_unique< in_thread> (creator, this , in_alloc_ex + in_thread_pool_size));
210
210
}
211
211
in_thread_pool_size += request_threads;
212
212
return *this ;
@@ -224,16 +224,24 @@ in_thread::in_thread(class cluster* owner, class request_queue* req_q, uint32_t
224
224
225
225
in_thread::~in_thread ()
226
226
{
227
- terminating = true ;
228
- in_ready.notify_one ();
227
+ terminate ();
229
228
in_thr->join ();
230
229
delete in_thr;
231
230
}
232
231
232
+ void in_thread::terminate ()
233
+ {
234
+ terminating.store (true , std::memory_order_relaxed);
235
+ in_ready.notify_one ();
236
+ }
237
+
233
238
request_queue::~request_queue ()
234
239
{
235
- terminating = true ;
240
+ terminating. store ( true , std::memory_order_relaxed) ;
236
241
out_ready.notify_one ();
242
+ for (auto & in_thr : requests_in) {
243
+ in_thr->terminate (); // signal all of them here, otherwise they will all join 1 by 1 and it will take forever
244
+ }
237
245
out_thread->join ();
238
246
delete out_thread;
239
247
}
@@ -281,7 +289,7 @@ struct compare_request {
281
289
void in_thread::in_loop (uint32_t index)
282
290
{
283
291
utility::set_thread_name (std::string (" http_req/" ) + std::to_string (index ));
284
- while (!terminating) {
292
+ while (!terminating. load (std::memory_order_relaxed) ) {
285
293
std::mutex mtx;
286
294
std::unique_lock<std::mutex> lock{ mtx };
287
295
in_ready.wait_for (lock, std::chrono::seconds (1 ));
@@ -394,7 +402,7 @@ bool request_queue::queued_deleting_request::operator<(time_t time) const noexce
394
402
void request_queue::out_loop ()
395
403
{
396
404
utility::set_thread_name (" req_callback" );
397
- while (!terminating) {
405
+ while (!terminating. load (std::memory_order_relaxed) ) {
398
406
399
407
std::mutex mtx;
400
408
std::unique_lock lock{ mtx };
0 commit comments