Skip to content

Commit

Permalink
Merge pull request #176 from caozhiyi/dev
Browse files Browse the repository at this point in the history
add connection time out check.
  • Loading branch information
caozhiyi authored Jan 23, 2025
2 parents 24c09d6 + 6ac2864 commit 160d02d
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 13 deletions.
2 changes: 1 addition & 1 deletion src/quic/connection/connection_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ bool ClientConnection::Dial(const common::Address& addr, const std::string& alpn
common::LOG_ERROR("add alpn failed. alpn:%s", alpn.c_str());
return false;
}

SetPeerAddress(std::move(addr));

// set transport param. TODO define tp length
Expand Down
30 changes: 30 additions & 0 deletions src/quic/connection/error.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#include <unordered_map>
#include "quic/connection/error.h"

namespace quicx {
namespace quic {

static const std::unordered_map<QUIC_ERROR_CODE, std::string> ErrorCodeReasons__ = {
{QEC_NO_ERROR, "success"},
{QEC_INTERNAL_ERROR, "internal error"},
{QEC_SERVER_BUSY, "server busy"},
{QEC_FLOW_CONTROL_ERROR, "flow control error"},
{QEC_STREAM_LIMIT_ERROR, "stream limit error"},
{QEC_STREAM_STATE_ERROR, "stream state error"},
{QEC_FINAL_SIZE_ERROR, "final size error"},
{QEC_FRAME_ENCODING_ERROR, "frame encoding error"},
{QEC_TRANSPORT_PARAMETER_ERROR, "transport parameter error"},
{QEC_CONNECTION_ID_LIMIT_ERROR, "connection id limit error"},
{QEC_PROTOCOL_VIOLATION, "protocol violation"},
{QEC_INVALID_TOKEN, "invalid token"},
{QEC_CRYPTO_BUFFER_EXCEEDED, "crypto buffer exceeded"},
{QEC_CRYPTO_ERROR, "crypto error"},
{QEC_CONNECTION_TIMEOUT, "connection timeout"},
};

const std::string& GetErrorString(QUIC_ERROR_CODE code) {
return ErrorCodeReasons__.at(code);
}

}
}
8 changes: 7 additions & 1 deletion src/quic/connection/error.h
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
#ifndef QUIC_CONNECTION_ERROR
#define QUIC_CONNECTION_ERROR

#include <string>
#include <cstdint>

namespace quicx {
namespace quic {

enum QUIC_ERROR_CODE {
enum QUIC_ERROR_CODE: uint32_t {
QEC_NO_ERROR = 0x00, // an endpoint uses this with CONNECTION_CLOSE to signal that the connection is being closed abruptly in the absence of any error.
QEC_INTERNAL_ERROR = 0x01, // the endpoint encountered an internal error and cannot continue with the connection.
QEC_SERVER_BUSY = 0x02, // the server is currently busy and does not accept any new connections.
Expand All @@ -19,8 +22,11 @@ enum QUIC_ERROR_CODE {
QEC_INVALID_TOKEN = 0x0b, // received a Retry Token in a client Initial that is invalid.
QEC_CRYPTO_BUFFER_EXCEEDED = 0x0d, // received more data in CRYPTO frames than it can buffer.
QEC_CRYPTO_ERROR = 0x10, // cryptographic handshake failed.
QEC_CONNECTION_TIMEOUT = 0x11, // connection timeout.
};

const std::string& GetErrorString(QUIC_ERROR_CODE code);

}
}

Expand Down
8 changes: 5 additions & 3 deletions src/quic/quicx/processor_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,11 @@ void ProcessorBase::ProcessSend() {
}

void ProcessorBase::HandleHandshakeDone(std::shared_ptr<IConnection> conn) {
connecting_map_.erase(conn->GetConnectionIDHash());
conn_map_[conn->GetConnectionIDHash()] = conn;
connection_handler_(conn, 0, "");
if (connecting_map_.find(conn->GetConnectionIDHash()) != connecting_map_.end()) {
connecting_map_.erase(conn->GetConnectionIDHash());
conn_map_[conn->GetConnectionIDHash()] = conn;
connection_handler_(conn, 0, "");
}
}

void ProcessorBase::HandleActiveSendConnection(std::shared_ptr<IConnection> conn) {
Expand Down
16 changes: 14 additions & 2 deletions src/quic/quicx/processor_client.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "common/log/log.h"
#include "quic/connection/error.h"
#include "quic/quicx/processor_client.h"
#include "quic/connection/connection_client.h"
#include "quic/connection/connection_id_generator.h"
Expand All @@ -12,7 +13,7 @@ ProcessorClient::ProcessorClient(std::shared_ptr<TLSCtx> ctx,
ProcessorBase(ctx, params, connection_handler) {

}

ProcessorClient::~ProcessorClient() {

}
Expand All @@ -28,7 +29,11 @@ void ProcessorClient::Connect(const std::string& ip, uint16_t port,

connecting_map_[conn->GetConnectionIDHash()] = conn;
conn->Dial(common::Address(ip, port), alpn, params_);
// TODO add timer to check connection status

common::TimerTask task([conn, this]() {
HandleConnectionTimeout(conn);
});
time_->AddTimer(task, timeout_ms);
}

bool ProcessorClient::HandlePacket(std::shared_ptr<INetPacket> packet) {
Expand Down Expand Up @@ -69,5 +74,12 @@ bool ProcessorClient::HandlePacket(std::shared_ptr<INetPacket> packet) {
return false;
}

void ProcessorClient::HandleConnectionTimeout(std::shared_ptr<IConnection> conn) {
if (conn_map_.find(conn->GetConnectionIDHash()) != conn_map_.end()) {
conn_map_.erase(conn->GetConnectionIDHash());
connection_handler_(conn, QUIC_ERROR_CODE::QEC_CONNECTION_TIMEOUT, GetErrorString(QUIC_ERROR_CODE::QEC_CONNECTION_TIMEOUT));
}
}

}
}
1 change: 1 addition & 0 deletions src/quic/quicx/processor_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class ProcessorClient:
const std::string& alpn, int32_t timeout_ms);
private:
bool HandlePacket(std::shared_ptr<INetPacket> packet);
void HandleConnectionTimeout(std::shared_ptr<IConnection> conn);
};

}
Expand Down
8 changes: 7 additions & 1 deletion src/quic/quicx/processor_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,13 @@ bool ProcessorServer::HandlePacket(std::shared_ptr<INetPacket> packet) {
new_conn->SetPeerAddress(packet->GetAddress());
new_conn->OnPackets(packet->GetTime(), packets);

// TODO add timer to check connection status
common::TimerTask task([new_conn, this]() {
if (connecting_map_.find(new_conn->GetConnectionIDHash()) != connecting_map_.end()) {
connecting_map_.erase(new_conn->GetConnectionIDHash());
common::LOG_DEBUG("connection timeout. cid:%llu", new_conn->GetConnectionIDHash());
}
});
time_->AddTimer(task, 1000); // TODO add timeout to config
return true;
}

Expand Down
12 changes: 11 additions & 1 deletion src/quic/quicx/quic_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,17 @@ bool QuicClient::Connection(const std::string& ip, uint16_t port,
if (!processors_map_.empty()) {
auto iter = processors_map_.begin();
std::advance(iter, rand() % processors_map_.size());
std::dynamic_pointer_cast<ProcessorClient>(iter->second)->Connect(ip, port, alpn, timeout_ms);
auto processor = std::dynamic_pointer_cast<ProcessorClient>(iter->second);
// if the current thread is the same as the processor's thread, then connect directly
if (std::this_thread::get_id() == iter->first) {
processor->Connect(ip, port, alpn, timeout_ms);

} else {
iter->second->Push([ip, port, alpn, timeout_ms, processor]() {
processor->Connect(ip, port, alpn, timeout_ms);
});
iter->second->Weakup();
}
return true;
}
return false;
Expand Down
6 changes: 3 additions & 3 deletions src/quic/quicx/thread_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ void ThreadProcessor::Run() {
processor_map__[std::this_thread::get_id()] = this;
connection_transfor_ = std::make_shared<ConnectionTransfor>();
current_thread_id_ = std::this_thread::get_id();

current_thread_id_set_ = true;
current_thread_id_cv_.notify_all();

Expand All @@ -49,10 +49,10 @@ void ThreadProcessor::Stop() {

// TODO: wait all connections closed
Thread::Stop();
Weakeup();
Weakup();
}

void ThreadProcessor::Weakeup() {
void ThreadProcessor::Weakup() {
receiver_->Weakup();
}

Expand Down
2 changes: 1 addition & 1 deletion src/quic/quicx/thread_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class ThreadProcessor:

virtual void Stop();

void Weakeup();
void Weakup();

std::thread::id GetCurrentThreadId();

Expand Down

0 comments on commit 160d02d

Please sign in to comment.