Skip to content

Commit

Permalink
Merge pull request #175 from caozhiyi/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
caozhiyi authored Jan 22, 2025
2 parents 7c9e04d + cbd7587 commit 24c09d6
Show file tree
Hide file tree
Showing 13 changed files with 76 additions and 26 deletions.
6 changes: 2 additions & 4 deletions src/common/timer/timer_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,11 @@
namespace quicx {
namespace common {

typedef std::function<void()> TimerCallback;

class TimerTask {
public:
TimerCallback tcb_;
std::function<void()> tcb_;
TimerTask() {}
TimerTask(TimerCallback tcb): tcb_(tcb) {}
TimerTask(std::function<void()> tcb): tcb_(tcb) {}
TimerTask(const TimerTask& t): tcb_(t.tcb_), time_(t.time_), id_(t.id_) {}
private:
uint64_t time_;
Expand Down
4 changes: 2 additions & 2 deletions src/quic/include/type.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ struct QuicTransportParams {
uint32_t initial_max_stream_data_bidi_local_ = 1472*10;
uint32_t initial_max_stream_data_bidi_remote_ = 1472*10;
uint32_t initial_max_stream_data_uni_ = 1472*10;
uint32_t initial_max_streams_bidi_ = 6;
uint32_t initial_max_streams_uni_ = 6;
uint32_t initial_max_streams_bidi_ = 20;
uint32_t initial_max_streams_uni_ = 20;
uint32_t ack_delay_exponent_ms_ = 5;
uint32_t max_ack_delay_ms_ = 5;
bool disable_active_migration_ = false;
Expand Down
6 changes: 6 additions & 0 deletions src/quic/quicx/processor_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "quic/common/version.h"
#include "quic/common/constants.h"
#include "common/network/address.h"
#include "common/timer/timer_task.h"
#include "quic/packet/init_packet.h"
#include "quic/packet/packet_decode.h"
#include "quic/quicx/processor_base.h"
Expand Down Expand Up @@ -56,6 +57,11 @@ void ProcessorBase::AddReceiver(const std::string& ip, uint16_t port) {
receiver_->AddReceiver(ip, port);
}

void ProcessorBase::AddTimer(uint32_t interval_ms, timer_callback cb) {
common::TimerTask task(cb);
time_->AddTimer(task, interval_ms);
}

void ProcessorBase::ProcessRecv(uint32_t timeout_ms) {
uint8_t recv_buf[__max_v4_packet_size] = {0};
std::shared_ptr<INetPacket> packet = std::make_shared<INetPacket>();
Expand Down
1 change: 1 addition & 0 deletions src/quic/quicx/processor_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class ProcessorBase:
virtual void Process();
virtual void AddReceiver(uint64_t socket_fd);
virtual void AddReceiver(const std::string& ip, uint16_t port);
virtual void AddTimer(uint32_t interval_ms, timer_callback cb);

protected:
void ProcessRecv(uint32_t timeout_ms);
Expand Down
16 changes: 12 additions & 4 deletions src/quic/quicx/quic_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,29 @@ QuicBase::~QuicBase() {
}

void QuicBase::Join() {
for (auto& processor : processors_) {
processor->Join();
for (auto& processor : processors_map_) {
processor.second->Join();
}
}

void QuicBase::Destroy() {
for (auto& processor : processors_) {
processor->Stop();
for (auto& processor : processors_map_) {
processor.second->Stop();
}
}

void QuicBase::SetConnectionStateCallBack(connection_state_callback cb) {
connection_state_cb_ = cb;
}

void QuicBase::AddTimer(uint32_t interval_ms, timer_callback cb) {
auto iter = processors_map_.find(std::this_thread::get_id());
if (iter != processors_map_.end()) {
iter->second->AddTimer(interval_ms, cb);
return;
}
common::LOG_ERROR("no processor found for thread id: {}", std::this_thread::get_id());
}

void QuicBase::InitLogger(LogLevel level) {
std::shared_ptr<common::Logger> log = std::make_shared<common::StdoutLogger>();
Expand Down
6 changes: 5 additions & 1 deletion src/quic/quicx/quic_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

#include <memory>
#include <vector>
#include <thread>
#include <unordered_map>

#include "quic/include/type.h"
#include "quic/quicx/if_processor.h"
Expand All @@ -23,12 +25,14 @@ class QuicBase {

virtual void SetConnectionStateCallBack(connection_state_callback cb);

void AddTimer(uint32_t interval_ms, timer_callback cb);

protected:
void InitLogger(LogLevel level);

protected:
std::shared_ptr<TLSCtx> tls_ctx_;
std::vector<std::shared_ptr<ProcessorBase>> processors_;
std::unordered_map<std::thread::id, std::shared_ptr<ProcessorBase>> processors_map_;

connection_state_callback connection_state_cb_;
QuicTransportParams params_;
Expand Down
15 changes: 10 additions & 5 deletions src/quic/quicx/quic_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ bool QuicClient::Init(uint16_t thread_num, LogLevel level) {
common::LOG_ERROR("tls ctx init faliled.");
return false;
}
processors_.reserve(thread_num);
processors_map_.reserve(thread_num);
for (size_t i = 0; i < thread_num; i++) {
auto processor = std::make_shared<ProcessorClient>(tls_ctx_, params_, connection_state_cb_);
processor->Start();
processors_.emplace_back(processor);
processors_map_.emplace(processor->GetCurrentThreadId(), processor);
}
return true;
}
Expand All @@ -44,11 +44,16 @@ void QuicClient::Destroy() {
QuicBase::Destroy();
}

void QuicClient::AddTimer(uint32_t interval_ms, timer_callback cb) {
QuicBase::AddTimer(interval_ms, cb);
}

bool QuicClient::Connection(const std::string& ip, uint16_t port,
const std::string& alpn, int32_t timeout_ms) {
if (!processors_.empty()) {
// TODO: random select processor
std::dynamic_pointer_cast<ProcessorClient>(processors_[0])->Connect(ip, port, alpn, timeout_ms);
if (!processors_map_.empty()) {
auto iter = processors_map_.begin();
std::advance(iter, rand() % processors_map_.size());
std::dynamic_pointer_cast<ProcessorClient>(iter->second)->Connect(ip, port, alpn, timeout_ms);
return true;
}
return false;
Expand Down
2 changes: 1 addition & 1 deletion src/quic/quicx/quic_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class QuicClient:

virtual void Destroy();

virtual void AddTimer(uint32_t interval_ms, timer_callback cb) { /* TODO: implement this */ }
virtual void AddTimer(uint32_t interval_ms, timer_callback cb);

virtual bool Connection(const std::string& ip, uint16_t port,
const std::string& alpn, int32_t timeout_ms);
Expand Down
16 changes: 10 additions & 6 deletions src/quic/quicx/quic_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ bool QuicServer::Init(const std::string& cert_file, const std::string& key_file,
}
tls_ctx_ = tls_ctx;

processors_.reserve(thread_num);
processors_map_.reserve(thread_num);
for (size_t i = 0; i < thread_num; i++) {
auto processor = std::make_shared<ProcessorServer>(tls_ctx_, params_, connection_state_cb_);
processor->SetServerAlpn(alpn);
processor->Start();
processors_.emplace_back(processor);
processors_map_.emplace(processor->GetCurrentThreadId(), processor);
}
return true;
}
Expand All @@ -49,12 +49,12 @@ bool QuicServer::Init(const char* cert_pem, const char* key_pem, const std::stri
}
tls_ctx_ = tls_ctx;

processors_.reserve(thread_num);
processors_map_.reserve(thread_num);
for (size_t i = 0; i < thread_num; i++) {
auto processor = std::make_shared<ProcessorServer>(tls_ctx_, params_, connection_state_cb_);
processor->SetServerAlpn(alpn);
processor->Start();
processors_.emplace_back(processor);
processors_map_.emplace(processor->GetCurrentThreadId(), processor);
}
return true;
}
Expand All @@ -67,9 +67,13 @@ void QuicServer::Destroy() {
QuicBase::Destroy();
}

void QuicServer::AddTimer(uint32_t interval_ms, timer_callback cb) {
QuicBase::AddTimer(interval_ms, cb);
}

bool QuicServer::ListenAndAccept(const std::string& ip, uint16_t port) {
for (auto& processor : processors_) {
processor->AddReceiver(ip, port);
for (auto& processor : processors_map_) {
processor.second->AddReceiver(ip, port);
}
return true;
}
Expand Down
2 changes: 1 addition & 1 deletion src/quic/quicx/quic_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class QuicServer:

virtual void Destroy();

virtual void AddTimer(uint32_t interval_ms, timer_callback cb) { /* TODO: implement this */ }
virtual void AddTimer(uint32_t interval_ms, timer_callback cb);

virtual bool ListenAndAccept(const std::string& ip, uint16_t port);

Expand Down
11 changes: 11 additions & 0 deletions src/quic/quicx/thread_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ void ThreadProcessor::Run() {
// register processor in woker thread
processor_map__[std::this_thread::get_id()] = this;
connection_transfor_ = std::make_shared<ConnectionTransfor>();
current_thread_id_ = std::this_thread::get_id();

current_thread_id_set_ = true;
current_thread_id_cv_.notify_all();

while (!IsStop()) {
Process();

Expand All @@ -51,6 +56,12 @@ void ThreadProcessor::Weakeup() {
receiver_->Weakup();
}

std::thread::id ThreadProcessor::GetCurrentThreadId() {
std::unique_lock<std::mutex> lock(current_thread_id_mutex_);
current_thread_id_cv_.wait(lock, [this]() { return current_thread_id_set_; });
return current_thread_id_;
}

void ThreadProcessor::ConnectionIDNoexist(uint64_t cid_hash, std::shared_ptr<IConnection>& conn) {
// do nothing
}
Expand Down
8 changes: 7 additions & 1 deletion src/quic/quicx/thread_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ class ThreadProcessor:
virtual void Stop();

void Weakeup();

std::thread::id GetCurrentThreadId();

protected:
// transfer a connection from other processor
Expand All @@ -42,10 +44,14 @@ class ThreadProcessor:
std::shared_ptr<IReceiver> receiver_;
std::unordered_map<uint64_t, std::shared_ptr<IConnection>> conn_map_; // all connections


friend class ConnectionTransfor;
std::shared_ptr<ConnectionTransfor> connection_transfor_;
static std::unordered_map<std::thread::id, ThreadProcessor*> processor_map__;
std::thread::id current_thread_id_;

bool current_thread_id_set_;
std::mutex current_thread_id_mutex_;
std::condition_variable current_thread_id_cv_;
};

}
Expand Down
9 changes: 8 additions & 1 deletion utest/quic/connection/connection_flow_control_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,14 @@ class TransportParamTest:
public common::Singleton<TransportParamTest> {
public:
TransportParamTest() {
tp_.Init(DEFAULT_QUIC_TRANSPORT_PARAMS);
QuicTransportParams tp;
tp.initial_max_data_ = 10000;
tp.initial_max_stream_data_bidi_local_ = 8;
tp.initial_max_stream_data_bidi_remote_ = 8;
tp.initial_max_stream_data_uni_ = 8;
tp.initial_max_streams_bidi_ = 8;
tp.initial_max_streams_uni_ = 8;
tp_.Init(tp);
}

~TransportParamTest() {}
Expand Down

0 comments on commit 24c09d6

Please sign in to comment.