Skip to content

Commit

Permalink
Merge pull request #174 from caozhiyi/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
caozhiyi authored Jan 20, 2025
2 parents 590c31f + 0ddde55 commit 7c9e04d
Show file tree
Hide file tree
Showing 65 changed files with 517 additions and 356 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ x64
.VSCodeCounter
gtest
*.log
quicx-utest
quicx_utest
third
libquicx.a
libhttp3.a
Expand Down
5 changes: 3 additions & 2 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
- [x] stream unit test
- [x] manage conection id
- [x] bbr
- [x] http3 push implement
- [ ] ack packet enc
- [ ] http3 push implement
- [ ] http3 qpack dynamic table update implement
- [ ] zero memory copy
- [ ] 0 rtt handshake
- [ ] 0 rtt handshake
- [ ] http3 settings control
39 changes: 34 additions & 5 deletions src/http3/connection/connection_client.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "common/log/log.h"
#include "http3/http/error.h"
#include "http3/connection/type.h"
#include "http3/stream/request_stream.h"
#include "http3/stream/control_receiver_stream.h"
#include "http3/stream/push_receiver_stream.h"
Expand All @@ -9,21 +10,35 @@ namespace quicx {
namespace http3 {

ClientConnection::ClientConnection(const std::string& unique_id,
const Http3Settings& settings,
const std::shared_ptr<quic::IQuicConnection>& quic_connection,
const std::function<void(const std::string& unique_id, uint32_t error_code)>& error_handler,
const std::function<void(std::unordered_map<std::string, std::string>& headers)>& push_promise_handler,
const std::function<bool(std::unordered_map<std::string, std::string>& headers)>& push_promise_handler,
const http_response_handler& push_handler):
IConnection(unique_id, quic_connection, error_handler),
push_promise_handler_(push_promise_handler),
push_handler_(push_handler) {

// create control streams
auto control_stream = quic_connection_->MakeStream(quic::SD_SEND);
control_sender_stream_ = std::make_shared<ControlClientSenderStream>(
std::dynamic_pointer_cast<quic::IQuicSendStream>(control_stream),
std::bind(&ClientConnection::HandleError, this, std::placeholders::_1, std::placeholders::_2));

settings_ = IConnection::AdaptSettings(settings);
control_sender_stream_->SendSettings(settings_);
}

ClientConnection::~ClientConnection() {
Close(0);
}

bool ClientConnection::DoRequest(std::shared_ptr<IRequest> request, const http_response_handler& handler) {
if (streams_.size() >= settings_[SETTINGS_TYPE::ST_MAX_CONCURRENT_STREAMS]) {
common::LOG_ERROR("ClientConnection::DoRequest max concurrent streams reached");
return false;
}

// create request stream
auto stream = quic_connection_->MakeStream(quic::SD_BIDI);
if (!stream) {
Expand All @@ -35,7 +50,7 @@ bool ClientConnection::DoRequest(std::shared_ptr<IRequest> request, const http_r
std::dynamic_pointer_cast<quic::IQuicBidirectionStream>(stream),
std::bind(&ClientConnection::HandleError, this, std::placeholders::_1, std::placeholders::_2),
handler,
std::bind(&ClientConnection::HandlePushPromise, this, std::placeholders::_1));
std::bind(&ClientConnection::HandlePushPromise, this, std::placeholders::_1, std::placeholders::_2));

streams_[request_stream->GetStreamID()] = request_stream;

Expand All @@ -54,13 +69,23 @@ void ClientConnection::CancelPush(uint64_t push_id) {
void ClientConnection::HandleStream(std::shared_ptr<quic::IQuicStream> stream, uint32_t error_code) {
if (error_code != 0) {
common::LOG_ERROR("ClientConnection::HandleStream error: %d", error_code);
if (stream) {
streams_.erase(stream->GetStreamID());
}
return;
}

if (stream->GetDirection() == quic::SD_BIDI) {
quic_connection_->Reset(HTTP3_ERROR_CODE::H3EC_STREAM_CREATION_ERROR);
return;
}

// TODO: implement stand line to create stream
if (streams_.size() >= settings_[SETTINGS_TYPE::ST_MAX_CONCURRENT_STREAMS]) {
common::LOG_ERROR("ClientConnection::HandleStream max concurrent streams reached");
Close(HTTP3_ERROR_CODE::H3EC_STREAM_CREATION_ERROR);
return;
}

if (stream->GetDirection() == quic::SD_RECV) {
if (stream->GetStreamID() == 1) {
Expand Down Expand Up @@ -101,9 +126,13 @@ void ClientConnection::HandleError(uint64_t stream_id, uint32_t error_code) {
}
}

void ClientConnection::HandlePushPromise(std::unordered_map<std::string, std::string>& headers) {
if (push_promise_handler_) {
push_promise_handler_(headers);
void ClientConnection::HandlePushPromise(std::unordered_map<std::string, std::string>& headers, uint64_t push_id) {
if (!push_promise_handler_) {
return;
}
bool do_recv = push_promise_handler_(headers);
if (!do_recv) {
CancelPush(push_id);
}
}

Expand Down
10 changes: 7 additions & 3 deletions src/http3/connection/connection_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ class ClientConnection
:public IConnection {
public:
ClientConnection(const std::string& unique_id,
const Http3Settings& settings,
const std::shared_ptr<quic::IQuicConnection>& quic_connection,
const std::function<void(const std::string& unique_id, uint32_t error_code)>& error_handler,
const std::function<void(std::unordered_map<std::string, std::string>& headers)>& push_promise_handler,
const std::function<bool(std::unordered_map<std::string, std::string>& headers)>& push_promise_handler,
const http_response_handler& push_handler);
virtual ~ClientConnection();

Expand All @@ -38,11 +39,14 @@ class ClientConnection
// handle error
void HandleError(uint64_t stream_id, uint32_t error_code);
// handle push promise
void HandlePushPromise(std::unordered_map<std::string, std::string>& headers);
void HandlePushPromise(std::unordered_map<std::string, std::string>& headers, uint64_t push_id);

private:
http_response_handler push_handler_;
std::function<void(std::unordered_map<std::string, std::string>&)> push_promise_handler_;
std::function<bool(std::unordered_map<std::string, std::string>&)> push_promise_handler_;

std::shared_ptr<ControlClientSenderStream> control_sender_stream_;
std::shared_ptr<ControlReceiverStream> control_recv_stream_;
};

}
Expand Down
94 changes: 80 additions & 14 deletions src/http3/connection/connection_server.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#include "common/log/log.h"
#include "http3/http/error.h"
#include "http3/connection/type.h"
#include "http3/stream/response_stream.h"
#include "http3/stream/push_sender_stream.h"
#include "http3/connection/connection_server.h"
Expand All @@ -8,47 +10,91 @@ namespace quicx {
namespace http3 {

ServerConnection::ServerConnection(const std::string& unique_id,
const Http3Settings& settings,
std::shared_ptr<quic::IQuicServer> quic_server,
const std::shared_ptr<quic::IQuicConnection>& quic_connection,
const std::function<void(const std::string& unique_id, uint32_t error_code)>& error_handler,
const http_handler& http_handler):
IConnection(unique_id, quic_connection, error_handler),
quic_server_(quic_server),
http_handler_(http_handler),
max_push_id_(0) {

// create control streams
auto control_stream = quic_connection_->MakeStream(quic::SD_SEND);
control_sender_stream_ = std::make_shared<ControlClientSenderStream>(
std::dynamic_pointer_cast<quic::IQuicSendStream>(control_stream),
std::bind(&ServerConnection::HandleError, this, std::placeholders::_1, std::placeholders::_2));

settings_ = IConnection::AdaptSettings(settings);
control_sender_stream_->SendSettings(settings_);
}

ServerConnection::~ServerConnection() {
Close(0);
}

bool ServerConnection::SendPushPromise(const std::unordered_map<std::string, std::string>& headers) {
if (max_push_id_ == 0) {
return false;
}

// TODO: implement push promise
return true;
}

bool ServerConnection::SendPush(std::shared_ptr<IResponse> response) {
if (max_push_id_ == 0) {
if (streams_.size() >= settings_[SETTINGS_TYPE::ST_MAX_CONCURRENT_STREAMS]) {
common::LOG_ERROR("ServerConnection::SendPush max concurrent streams reached");
return false;
}

auto stream = quic_connection_->MakeStream(quic::SD_SEND);
if (!stream) {
common::LOG_ERROR("ServerConnection::SendPush make stream failed");
return false;
}

std::shared_ptr<PushSenderStream> push_stream = std::make_shared<PushSenderStream>(qpack_encoder_,
std::dynamic_pointer_cast<quic::IQuicSendStream>(stream),
std::bind(&ServerConnection::HandleError, this, std::placeholders::_1, std::placeholders::_2),
0); // TODO: implement push id
std::bind(&ServerConnection::HandleError, this, std::placeholders::_1, std::placeholders::_2));

push_stream->SendPushResponse(response);
return true;
}

void ServerConnection::HandleHttp(std::shared_ptr<IRequest> request, std::shared_ptr<IResponse> response, std::shared_ptr<ResponseStream> response_stream) {
if (http_handler_) {
http_handler_(request, response);
}

if (!IsEnabledPush()) {
return;
}

// check if push is enabled
auto push_responses = response->GetPushResponses();
for (auto& push_response : push_responses) {
if (!CanPush()) {
break;
}

// send push promise
response_stream->SendPushPromise(push_response->GetHeaders(), next_push_id_++);
push_responses_[next_push_id_] = push_response;
}
if (send_limit_push_id_ < next_push_id_) {
send_limit_push_id_ = next_push_id_;

// TODO: set time out time to config
quic_server_->AddTimer(20, std::bind(&ServerConnection::HandleTimer, this));
}
}

void ServerConnection::HandleStream(std::shared_ptr<quic::IQuicStream> stream, uint32_t error) {
if (error != 0) {
common::LOG_ERROR("ServerConnection::HandleStream error: %d", error);
if (stream) {
streams_.erase(stream->GetStreamID());
}
return;
}

// TODO: implement stand line to create stream
if (streams_.size() >= settings_[SETTINGS_TYPE::ST_MAX_CONCURRENT_STREAMS]) {
common::LOG_ERROR("ServerConnection::HandleStream max concurrent streams reached");
Close(HTTP3_ERROR_CODE::H3EC_STREAM_CREATION_ERROR);
return;
}

Expand All @@ -57,7 +103,7 @@ void ServerConnection::HandleStream(std::shared_ptr<quic::IQuicStream> stream, u
std::shared_ptr<ResponseStream> response_stream = std::make_shared<ResponseStream>(qpack_encoder_,
std::dynamic_pointer_cast<quic::IQuicBidirectionStream>(stream),
std::bind(&ServerConnection::HandleError, this, std::placeholders::_1, std::placeholders::_2),
http_handler_);
std::bind(&ServerConnection::HandleHttp, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
streams_[response_stream->GetStreamID()] = response_stream;

} else if (stream->GetDirection() == quic::SD_RECV) {
Expand All @@ -83,7 +129,7 @@ void ServerConnection::HandleMaxPushId(uint64_t max_push_id) {
}

void ServerConnection::HandleCancelPush(uint64_t push_id) {
// TODO: implement cancel push
push_responses_.erase(push_id);
}

void ServerConnection::HandleError(uint64_t stream_id, uint32_t error) {
Expand All @@ -99,5 +145,25 @@ void ServerConnection::HandleError(uint64_t stream_id, uint32_t error) {
}
}

void ServerConnection::HandleTimer() {
for (auto iter = push_responses_.begin(); iter != push_responses_.end();) {
if (iter->first < send_limit_push_id_) {
SendPush(iter->second);
iter = push_responses_.erase(iter);
continue;
}
break;
}
}

bool ServerConnection::IsEnabledPush() const {
return settings_.find(SETTINGS_TYPE::ST_ENABLE_PUSH) != settings_.end()
&& settings_.at(SETTINGS_TYPE::ST_ENABLE_PUSH) == 1;
}

bool ServerConnection::CanPush() const {
return next_push_id_ < max_push_id_;
}

}
}
31 changes: 24 additions & 7 deletions src/http3/connection/connection_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
#include "http3/include/if_server.h"
#include "http3/qpack/qpack_encoder.h"
#include "quic/include/if_quic_stream.h"
#include "quic/include/if_quic_server.h"
#include "http3/stream/response_stream.h"
#include "http3/connection/if_connection.h"
#include "quic/include/if_quic_connection.h"
#include "http3/stream/control_sender_stream.h"
#include "http3/stream/control_receiver_stream.h"
#include "http3/stream/control_server_receiver_stream.h"

namespace quicx {
namespace http3 {
Expand All @@ -21,18 +23,19 @@ class ServerConnection:
public IConnection {
public:
ServerConnection(const std::string& unique_id,
const Http3Settings& settings,
std::shared_ptr<quic::IQuicServer> quic_server,
const std::shared_ptr<quic::IQuicConnection>& quic_connection,
const std::function<void(const std::string& unique_id, uint32_t error_code)>& error_handler,
const http_handler& http_handler);
virtual ~ServerConnection();

// send push promise
virtual bool SendPushPromise(const std::unordered_map<std::string, std::string>& headers);

// send push
virtual bool SendPush(std::shared_ptr<IResponse> response);

private:
// send push
bool SendPush(std::shared_ptr<IResponse> response);
// handle http request
void HandleHttp(std::shared_ptr<IRequest> request, std::shared_ptr<IResponse> response, std::shared_ptr<ResponseStream> response_stream);
// handle stream status
void HandleStream(std::shared_ptr<quic::IQuicStream> stream, uint32_t error_code);
// handle goaway frame
void HandleGoaway(uint64_t id);
Expand All @@ -42,10 +45,24 @@ class ServerConnection:
void HandleCancelPush(uint64_t push_id);
// handle error
void HandleError(uint64_t stream_id, uint32_t error_code);
// handle timer
void HandleTimer();

private:
bool IsEnabledPush() const;
bool CanPush() const;

private:
uint64_t max_push_id_;
uint64_t next_push_id_;
uint64_t send_limit_push_id_;
http_handler http_handler_;
std::shared_ptr<quic::IQuicServer> quic_server_;
// push responses, push id -> response
std::unordered_map<uint64_t, std::shared_ptr<IResponse>> push_responses_;

std::shared_ptr<ControlSenderStream> control_sender_stream_;
std::shared_ptr<ControlServerReceiverStream> control_recv_stream_;
};

}
Expand Down
Loading

0 comments on commit 7c9e04d

Please sign in to comment.