Skip to content

Commit 72d60cc

Browse files
committed
支持对等连接
1 parent 64cac20 commit 72d60cc

File tree

21 files changed

+1082
-70
lines changed

21 files changed

+1082
-70
lines changed

CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,13 @@ include_directories(${spdlog_SOURCE_DIR}/include)
3131
add_subdirectory(${SOURCE}/core)
3232
add_subdirectory(${SOURCE}/websocket)
3333
add_subdirectory(${SOURCE}/tun)
34+
add_subdirectory(${SOURCE}/peer)
3435
add_subdirectory(${SOURCE}/utility)
3536

3637
target_link_libraries(${CMAKE_PROJECT_NAME} PRIVATE core)
3738
target_link_libraries(${CMAKE_PROJECT_NAME} PRIVATE websocket)
3839
target_link_libraries(${CMAKE_PROJECT_NAME} PRIVATE tun)
40+
target_link_libraries(${CMAKE_PROJECT_NAME} PRIVATE peer)
3941
target_link_libraries(${CMAKE_PROJECT_NAME} PRIVATE utility)
4042

4143
target_link_libraries(${CMAKE_PROJECT_NAME} PRIVATE ixwebsocket)

src/core/client.cc

Lines changed: 134 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ int Client::setDynamicAddress(const std::string &cidr) {
4646
return 0;
4747
}
4848

49+
int Client::setStun(const std::string &stun) {
50+
this->stun = stun;
51+
return 0;
52+
}
53+
4954
std::string Client::getAddress() {
5055
return this->localAddress;
5156
}
@@ -73,7 +78,11 @@ int Client::shutdown() {
7378
if (this->tunThread.joinable()) {
7479
this->tunThread.join();
7580
}
81+
if (this->dispatcherThread.joinable()) {
82+
this->dispatcherThread.join();
83+
}
7684

85+
this->dispatcher.shutdown();
7786
this->tun.down();
7887
this->ws.disconnect();
7988
return 0;
@@ -90,7 +99,7 @@ int Client::startWsThread() {
9099
}
91100

92101
// 只需要开 wsThread, 执行过程中会设置 tun 并开 tunThread.
93-
this->wsThread = std::move(std::thread(&Client::handleWebSocketMessage, this));
102+
this->wsThread = std::move(std::thread([&] { this->handleWebSocketMessage(); }));
94103
return 0;
95104
}
96105

@@ -111,12 +120,35 @@ int Client::startTunThread() {
111120
return -1;
112121
}
113122

114-
this->tunThread = std::move(std::thread(&Client::handleTunMessage, this));
123+
this->tunThread = std::move(std::thread([&] { this->handleTunMessage(); }));
115124

116125
sendAuthMessage();
117126
return 0;
118127
}
119128

129+
int Client::startDispatcherThread() {
130+
if (this->stun.empty()) {
131+
spdlog::info("stun is empty, peer-to-peer connections are not enabled");
132+
return 0;
133+
}
134+
if (this->dispatcher.setPassword(this->password)) {
135+
return -1;
136+
}
137+
if (this->dispatcher.setStun(this->stun)) {
138+
return -1;
139+
}
140+
if (this->dispatcher.setTunIP(this->tun.getIP())) {
141+
return -1;
142+
}
143+
if (this->dispatcher.run()) {
144+
return -1;
145+
}
146+
147+
this->dispatcherThread = std::move(std::thread([&] { this->handleDispatcherMessage(); }));
148+
149+
return 0;
150+
}
151+
120152
void Client::handleWebSocketMessage() {
121153
int error;
122154
WebSocketMessage message;
@@ -132,17 +164,22 @@ void Client::handleWebSocketMessage() {
132164
break;
133165
}
134166
if (message.type == WebSocketMessageType::Message) {
135-
// TYPE_FORWARD, 拆包后转发给 TUN 设备
136-
if (message.buffer.front() == MessageType::TYPE_FORWARD) {
167+
// FORWARD, 拆包后转发给 TUN 设备
168+
if (message.buffer.front() == MessageType::FORWARD) {
137169
handleForwardMessage(message);
138170
continue;
139171
}
140172
// 收到动态地址响应包,启动 TUN 设备并发送 Auth 包
141-
if (message.buffer.front() == MessageType::TYPE_DYNAMIC_ADDRESS) {
173+
if (message.buffer.front() == MessageType::DHCP) {
142174
handleDynamicAddressMessage(message);
143175
continue;
144176
}
145-
spdlog::warn("unknown message type. type {}", message.buffer.front());
177+
// 收到对端连接请求包
178+
if (message.buffer.front() == MessageType::PEER) {
179+
handlePeerConnMessage(message);
180+
continue;
181+
}
182+
spdlog::warn("unknown message: {:n}", spdlog::to_hex(message.buffer));
146183
continue;
147184
}
148185

@@ -153,6 +190,11 @@ void Client::handleWebSocketMessage() {
153190
Candy::shutdown();
154191
break;
155192
}
193+
if (startDispatcherThread()) {
194+
spdlog::critical("start dispatcher thread failed");
195+
Candy::shutdown();
196+
break;
197+
}
156198
continue;
157199
}
158200

@@ -181,7 +223,7 @@ void Client::handleWebSocketMessage() {
181223

182224
void Client::handleTunMessage() {
183225
int error;
184-
WebSocketMessage message;
226+
185227
std::string buffer;
186228
IPv4Header *header;
187229

@@ -208,17 +250,53 @@ void Client::handleTunMessage() {
208250
continue;
209251
}
210252

211-
// 目前客户端只与服务端通信,所以可以不加判断的直接把数据发给服务端.
212-
// 未来支持端到端通信后,发送数据前先判断到目的地址能否直连,并通过直连的连接发送
213-
message.buffer.clear();
214-
message.buffer.push_back(MessageType::TYPE_FORWARD);
253+
// 获取当前对端状态机的状态
254+
uint32_t peerIp = Address::netToHost(header->daddr);
255+
PeerConnState state = this->dispatcher.getPeerState(peerIp);
256+
257+
// 处于连接状态,直接发送,不需要其他操作
258+
if (state == PeerConnState::CONNECTED) {
259+
this->dispatcher.write(buffer);
260+
continue;
261+
}
262+
263+
// 通过 WebSocket 转发
264+
WebSocketMessage message;
265+
message.buffer.push_back(MessageType::FORWARD);
215266
message.buffer.append(buffer);
216267
ws.write(message);
268+
269+
if (state == PeerConnState::INIT) {
270+
uint32_t pubIp;
271+
uint16_t pubPort;
272+
if (this->dispatcher.fetchPublicInfo(pubIp, pubPort)) {
273+
continue;
274+
}
275+
sendPeerMessage(this->tun.getIP(), peerIp, pubIp, pubPort);
276+
this->dispatcher.createPeerPublicInfo(peerIp);
277+
}
217278
}
218279
Candy::shutdown();
219280
return;
220281
}
221282

283+
void Client::handleDispatcherMessage() {
284+
std::string buffer;
285+
int len;
286+
while (this->running) {
287+
len = this->dispatcher.read(buffer);
288+
if (len == 0) {
289+
continue;
290+
}
291+
if (len < 0) {
292+
spdlog::error("handle dispatcher message error");
293+
continue;
294+
}
295+
this->tun.write(buffer);
296+
}
297+
return;
298+
}
299+
222300
void Client::sendDynamicAddressMessage() {
223301
Address address;
224302
if (address.cidrUpdate(this->dynamicAddress)) {
@@ -227,11 +305,11 @@ void Client::sendDynamicAddressMessage() {
227305
return;
228306
}
229307

230-
DynamicAddressHeader header(address.getCidr());
308+
DynamicAddressMessage header(address.getCidr());
231309
header.updateHash(this->password);
232310

233311
WebSocketMessage message;
234-
message.buffer.assign((char *)(&header), sizeof(DynamicAddressHeader));
312+
message.buffer.assign((char *)(&header), sizeof(DynamicAddressMessage));
235313
this->ws.write(message);
236314
return;
237315
}
@@ -253,14 +331,29 @@ void Client::sendAuthMessage() {
253331
return;
254332
}
255333

334+
void Client::sendPeerMessage(uint32_t src, uint32_t dst, uint32_t pubIp, uint16_t pubPort) {
335+
PeerConnMessage header;
336+
header.tunSrcIp = Address::hostToNet(src);
337+
header.tunDestIp = Address::hostToNet(dst);
338+
header.pubIp = Address::hostToNet(pubIp);
339+
header.pubPort = Address::hostToNet(pubPort);
340+
341+
WebSocketMessage message;
342+
message.buffer.assign((char *)(&header), sizeof(PeerConnMessage));
343+
this->ws.write(message);
344+
345+
spdlog::debug("send peer message: src {:x} dst {:x} ip {:x} port {}", src, dst, pubIp, pubPort);
346+
return;
347+
}
348+
256349
void Client::handleDynamicAddressMessage(WebSocketMessage &message) {
257-
if (message.buffer.size() != sizeof(DynamicAddressHeader)) {
258-
spdlog::warn("invalid dynamic address package: len {}", message.buffer.length());
350+
if (message.buffer.size() < sizeof(DynamicAddressMessage)) {
351+
spdlog::warn("invalid dynamic address message: len {}", message.buffer.length());
259352
spdlog::debug("dynamic address buffer: {:n}", spdlog::to_hex(message.buffer));
260353
return;
261354
}
262355

263-
DynamicAddressHeader *header = (DynamicAddressHeader *)message.buffer.c_str();
356+
DynamicAddressMessage *header = (DynamicAddressMessage *)message.buffer.c_str();
264357

265358
Address address;
266359
if (address.cidrUpdate(header->cidr)) {
@@ -272,17 +365,41 @@ void Client::handleDynamicAddressMessage(WebSocketMessage &message) {
272365
if (startTunThread()) {
273366
spdlog::critical("start tun thread with dynamic address failed");
274367
Candy::shutdown();
368+
return;
369+
}
370+
if (startDispatcherThread()) {
371+
spdlog::critical("start dispatcher thread failed");
372+
Candy::shutdown();
373+
return;
275374
}
276375
}
277376

278377
void Client::handleForwardMessage(WebSocketMessage &message) {
279378
if (message.buffer.size() < sizeof(ForwardHeader)) {
280-
spdlog::warn("invalid forward package: {:n}", spdlog::to_hex(message.buffer));
379+
spdlog::warn("invalid forward message: {:n}", spdlog::to_hex(message.buffer));
281380
}
282381

283382
const char *src = message.buffer.c_str() + sizeof(ForwardHeader::type);
284383
const size_t len = message.buffer.length() - sizeof(ForwardHeader::type);
285384
this->tun.write(std::string(src, len));
286385
}
287386

387+
void Client::handlePeerConnMessage(WebSocketMessage &message) {
388+
if (message.buffer.size() < sizeof(PeerConnMessage)) {
389+
spdlog::warn("invalid peer message: {:n}", spdlog::to_hex(message.buffer));
390+
}
391+
PeerConnMessage *header = (PeerConnMessage *)message.buffer.c_str();
392+
393+
uint32_t tunSrcIp = Address::netToHost(header->tunSrcIp);
394+
uint32_t tunDestIp = Address::netToHost(header->tunDestIp);
395+
uint32_t pubIp = Address::netToHost(header->pubIp);
396+
uint16_t pubPort = Address::netToHost(header->pubPort);
397+
398+
if (tunDestIp != this->tun.getIP()) {
399+
spdlog::warn("peer message dest not match: {:n}", spdlog::to_hex(message.buffer));
400+
}
401+
this->dispatcher.updatePeerPublicInfo(tunSrcIp, pubIp, pubPort);
402+
return;
403+
}
404+
288405
}; // namespace Candy

src/core/client.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#ifndef CANDY_CORE_CLIENT_H
33
#define CANDY_CORE_CLIENT_H
44

5+
#include "peer/dispatcher.h"
56
#include "tun/tun.h"
67
#include "websocket/client.h"
78
#include <string>
@@ -26,6 +27,9 @@ class Client {
2627
// 设置默认动态 IP 地址,向服务端建议使用这个地址,这个地址不可用时服务端将返回一个可用的新地址
2728
int setDynamicAddress(const std::string &cidr);
2829

30+
// 设置 STUN 服务端,用于开启对等连接
31+
int setStun(const std::string &stun);
32+
2933
// 获取当前地址,设置的静态地址或者由服务端分发的动态地址
3034
std::string getAddress();
3135

@@ -36,30 +40,41 @@ class Client {
3640
private:
3741
int startWsThread();
3842
int startTunThread();
43+
int startDispatcherThread();
3944

4045
// 处理来自 WebSocket 服务端的消息
4146
void handleWebSocketMessage();
4247
// 处理来自 TUN 设备的消息
4348
void handleTunMessage();
49+
// 处理来自 Dispatcher 的消息
50+
void handleDispatcherMessage();
4451

4552
void sendDynamicAddressMessage();
4653
void sendAuthMessage();
54+
void sendPeerMessage(uint32_t src, uint32_t dst, uint32_t pubIp, uint16_t pubPort);
4755

4856
void handleForwardMessage(WebSocketMessage &message);
4957
void handleDynamicAddressMessage(WebSocketMessage &message);
58+
void handlePeerConnMessage(WebSocketMessage &message);
5059

5160
std::string tunName;
5261
std::string password;
5362
std::string wsUri;
63+
// tunThread 和 dispatcherThread 运行依赖正确的 localAddress, 赋值后才能启动这两个线程
5464
std::string localAddress;
5565
std::string dynamicAddress;
5666

67+
std::string stun;
68+
5769
std::thread wsThread;
5870
std::thread tunThread;
71+
std::thread dispatcherThread;
72+
5973
bool running = false;
6074

6175
Tun tun;
6276
WebSocketClient ws;
77+
Dispatcher dispatcher;
6378
};
6479

6580
}; // namespace Candy

src/core/message.cc

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
namespace Candy {
77

88
AuthHeader::AuthHeader(uint32_t ip) {
9-
this->type = MessageType::TYPE_AUTH;
9+
this->type = MessageType::AUTH;
1010
this->ip = Address::hostToNet(ip);
1111
this->timestamp = Time::hostToNet(Time::unixTime());
1212
}
@@ -42,23 +42,23 @@ bool AuthHeader::check(const std::string &password) {
4242
}
4343

4444
ForwardHeader::ForwardHeader() {
45-
this->type = MessageType::TYPE_FORWARD;
45+
this->type = MessageType::FORWARD;
4646
}
4747

48-
DynamicAddressHeader::DynamicAddressHeader(const std::string &cidr) {
49-
this->type = MessageType::TYPE_DYNAMIC_ADDRESS;
48+
DynamicAddressMessage::DynamicAddressMessage(const std::string &cidr) {
49+
this->type = MessageType::DHCP;
5050
this->timestamp = Time::hostToNet(Time::unixTime());
5151
std::strcpy(this->cidr, cidr.c_str());
5252
}
5353

54-
void DynamicAddressHeader::updateHash(const std::string &password) {
54+
void DynamicAddressMessage::updateHash(const std::string &password) {
5555
std::string data;
5656
data.append(password);
5757
data.append((char *)&timestamp, sizeof(timestamp));
5858
SHA256((unsigned char *)data.data(), data.size(), this->hash);
5959
}
6060

61-
bool DynamicAddressHeader::check(const std::string &password) {
61+
bool DynamicAddressMessage::check(const std::string &password) {
6262
// 检查时间戳
6363
if (std::abs(Time::unixTime() - (int64_t)Time::netToHost(this->timestamp)) > 30) {
6464
spdlog::warn("dynamic address header timestamp check failed: timestamp {}", Time::netToHost(this->timestamp));
@@ -80,4 +80,8 @@ bool DynamicAddressHeader::check(const std::string &password) {
8080
return true;
8181
}
8282

83+
PeerConnMessage::PeerConnMessage() {
84+
this->type = MessageType::PEER;
85+
}
86+
8387
}; // namespace Candy

0 commit comments

Comments
 (0)