-
Notifications
You must be signed in to change notification settings - Fork 10
/
eventloopthreadpool.cpp
80 lines (67 loc) · 1.9 KB
/
eventloopthreadpool.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#include "eventloopthreadpool.h"
EventLoopThreadPool::EventLoopThreadPool(int threadCnt) : m_threadCnt(threadCnt),
m_robinIndex(0)
{
}
EventLoopThreadPool::~EventLoopThreadPool()
{
Quit();
}
void EventLoopThreadPool::PushFd(int fd)
{
m_robinIndex %= m_threadCnt;
m_eventLoopThreads[m_robinIndex]->PushFd(fd);
DEBUGLOG("%s %s %d, EventLoopThread threadId=%d is going to process the connection fd=%d", __FILE__, __func__, __LINE__, m_robinIndex, fd);
m_robinIndex += 1;
}
int EventLoopThreadPool::GetThreadCnt()
{
return m_threadCnt;
}
bool EventLoopThreadPool::Empty()
{
return 0 == m_threadCnt;
}
void EventLoopThreadPool::RunInThread(int threadId)
{
EventLoopThread eventLoopThread;
m_eventLoopThreads[threadId] = static_cast<EventLoopThread *>(&eventLoopThread);
eventLoopThread.SetReadCallback(m_readCallback);
eventLoopThread.SetWriteCallback(m_writeCallback);
eventLoopThread.SetErrorCallback(m_errorCallback);
eventLoopThread.SetThreadId(threadId);
DEBUGLOG("%s %s %d, EventLoopThread threadId=%d Loop is going to start", __FILE__, __func__, __LINE__, threadId);
eventLoopThread.Loop();
}
void EventLoopThreadPool::Loop()
{
m_threads.reserve(m_threadCnt);
m_eventLoopThreads.resize(m_threadCnt);
for (int i = 0; i < m_threadCnt; ++i)
{
m_threads.emplace_back(new Thread(std::bind(&EventLoopThreadPool::RunInThread, this, i)));
m_threads[i]->Start();
}
}
void EventLoopThreadPool::Quit()
{
for (int i = 0; i < m_threadCnt; ++i)
{
m_eventLoopThreads[i]->Quit();
m_threads[i]->Join();
}
std::vector<std::unique_ptr<Thread>> temp;
m_threads.swap(temp);
}
void EventLoopThreadPool::SetReadCallback(CallbackType readCallback)
{
m_readCallback = readCallback;
}
void EventLoopThreadPool::SetWriteCallback(CallbackType writeCallback)
{
m_writeCallback = writeCallback;
}
void EventLoopThreadPool::SetErrorCallback(CallbackType errorCallback)
{
m_errorCallback = errorCallback;
}