Skip to content

Commit

Permalink
[coro_http][improve]support user response (#854)
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos authored Dec 16, 2024
1 parent 5d1a580 commit db90069
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 17 deletions.
5 changes: 5 additions & 0 deletions include/ylt/standalone/cinatra/coro_http_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,8 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
}

async_simple::coro::Lazy<resp_data> read_websocket() {
auto time_out_guard =
timer_guard(this, req_timeout_duration_, "websocket timer");
co_return co_await async_read_ws();
}

Expand Down Expand Up @@ -2141,6 +2143,9 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
if (auto [ec, _] = co_await async_read_ws(
sock, read_buf, ws.left_header_len(), has_init_ssl);
ec) {
if (socket_->is_timeout_) {
co_return resp_data{std::make_error_code(std::errc::timed_out), 404};
}
data.net_err = ec;
data.status = 404;

Expand Down
17 changes: 8 additions & 9 deletions include/ylt/standalone/cinatra/coro_http_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,9 @@ class coro_http_connection
}
}
// not found
if (!is_matched_regex_router)
if (!is_matched_regex_router) {
response_.set_status(status_type::not_found);
}
}
}
}
Expand All @@ -305,10 +306,12 @@ class coro_http_connection

if (!response_.get_delay()) {
if (head_buf_.size()) {
if (type == content_type::multipart) {
response_.set_status_and_content(
status_type::not_implemented,
"mutipart handler not implemented or incorrect implemented");
if (type == content_type::multipart ||
type == content_type::chunked) {
if (response_.content().empty())
response_.set_status_and_content(
status_type::not_implemented,
"mutipart handler not implemented or incorrect implemented");
co_await reply();
close();
CINATRA_LOG_ERROR
Expand Down Expand Up @@ -405,10 +408,6 @@ class coro_http_connection
if (need_to_bufffer) {
response_.to_buffers(buffers_, chunk_size_str_);
}
int64_t send_size = 0;
for (auto &buf : buffers_) {
send_size += buf.size();
}
std::tie(ec, size) = co_await async_write(buffers_);
}
else {
Expand Down
11 changes: 5 additions & 6 deletions include/ylt/standalone/cinatra/coro_http_router.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class coro_http_router {
if (ok) {
co_await handler(req, resp);
}
ok = true;
(do_after(asps, req, resp, ok), ...);
};
}
Expand Down Expand Up @@ -113,6 +114,7 @@ class coro_http_router {
if (ok) {
handler(req, resp);
}
ok = true;
(do_after(asps, req, resp, ok), ...);
};
}
Expand Down Expand Up @@ -155,20 +157,17 @@ class coro_http_router {
}
ok = aspect.before(req, resp);
}
else {
ok = true;
}
}

template <typename T>
void do_after(T& aspect, coro_http_request& req, coro_http_response& resp,
bool& ok) {
if constexpr (has_after_v<T>) {
if (!ok) {
return;
}
ok = aspect.after(req, resp);
}
else {
ok = true;
}
}

std::function<void(coro_http_request& req, coro_http_response& resp)>*
Expand Down
2 changes: 2 additions & 0 deletions include/ylt/standalone/cinatra/session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ class session {
return std::nullopt;
}

const auto &get_all_data() const { return data_; }

const std::string &get_session_id() {
std::unique_lock<std::mutex> lock(mtx_);
return session_id_;
Expand Down
77 changes: 76 additions & 1 deletion src/coro_http/tests/test_cinatra.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -549,15 +549,32 @@ struct add_more_data {
}
};

std::vector<std::string> aspect_test_vec;

struct auth_t {
bool before(coro_http_request &req, coro_http_response &res) { return true; }
bool after(coro_http_request &req, coro_http_response &res) {
aspect_test_vec.push_back("enter auth_t after");
return false;
}
};

struct dely_t {
bool before(coro_http_request &req, coro_http_response &res) {
res.set_status_and_content(status_type::unauthorized, "unauthorized");
return false;
}
bool after(coro_http_request &req, coro_http_response &res) {
aspect_test_vec.push_back("enter delay_t after");
return true;
}
};

struct another_t {
bool after(coro_http_request &req, coro_http_response &res) {
// won't comming
return true;
}
};

TEST_CASE("test aspect") {
Expand Down Expand Up @@ -585,7 +602,7 @@ TEST_CASE("test aspect") {
[](coro_http_request &req, coro_http_response &resp) {
resp.set_status_and_content(status_type::ok, "ok");
},
dely_t{}, auth_t{});
dely_t{}, auth_t{}, another_t{});
server.set_http_handler<GET>(
"/exception", [](coro_http_request &req, coro_http_response &resp) {
throw std::invalid_argument("invalid argument");
Expand Down Expand Up @@ -619,6 +636,7 @@ TEST_CASE("test aspect") {
CHECK(result.status == 200);
result = async_simple::coro::syncAwait(client.async_get("/auth"));
CHECK(result.status == 401);
CHECK(aspect_test_vec.size() == 2);
CHECK(result.resp_body == "unauthorized");
result = async_simple::coro::syncAwait(client.async_get("/exception"));
CHECK(result.status == 503);
Expand Down Expand Up @@ -729,6 +747,9 @@ TEST_CASE("test pipeline") {
coro_http_server server(1, 9001);
server.set_http_handler<GET, POST>(
"/test", [](coro_http_request &req, coro_http_response &res) {
if (req.get_content_type() == content_type::multipart) {
return;
}
res.set_status_and_content(status_type::ok, "hello world");
});
server.set_http_handler<GET, POST>(
Expand Down Expand Up @@ -864,6 +885,58 @@ TEST_CASE("test pipeline") {
}
#endif

TEST_CASE("test multipart and chunked return error") {
coro_http_server server(1, 8090);
server.set_http_handler<cinatra::PUT, cinatra::POST>(
"/multipart",
[](request &req, response &resp) -> async_simple::coro::Lazy<void> {
resp.set_status_and_content(status_type::bad_request,
"invalid headers");
co_return;
});
server.set_http_handler<cinatra::PUT, cinatra::POST>(
"/chunked",
[](request &req, response &resp) -> async_simple::coro::Lazy<void> {
resp.set_status_and_content(status_type::bad_request,
"invalid headers");
co_return;
});
server.async_start();

std::string filename = "small_test_file.txt";
create_file(filename, 10);
{
coro_http_client client{};
std::string uri1 = "http://127.0.0.1:8090/chunked";
auto result = async_simple::coro::syncAwait(
client.async_upload_chunked(uri1, http_method::PUT, filename));
CHECK(result.status != 200);
if (!result.resp_body.empty())
CHECK(result.resp_body == "invalid headers");
}

{
coro_http_client client{};
std::string uri2 = "http://127.0.0.1:8090/multipart";
client.add_str_part("test", "test value");
auto result =
async_simple::coro::syncAwait(client.async_upload_multipart(uri2));
CHECK(result.status != 200);
if (!result.resp_body.empty())
CHECK(result.resp_body == "invalid headers");
}

{
coro_http_client client{};
std::string uri1 = "http://127.0.0.1:8090/no_such";
auto result = async_simple::coro::syncAwait(
client.async_upload_chunked(uri1, http_method::PUT, filename));
CHECK(result.status != 200);
}
std::error_code ec;
fs::remove(filename, ec);
}

async_simple::coro::Lazy<void> send_data(auto &ch, size_t count) {
for (int i = 0; i < count; i++) {
co_await coro_io::async_send(ch, i);
Expand Down Expand Up @@ -3073,6 +3146,8 @@ TEST_CASE("test session") {
session_id_check_login = session->get_session_id();
bool login = session->get_data<bool>("login").value_or(false);
CHECK(login == true);
auto &all = session->get_all_data();
CHECK(all.size() > 0);
res.set_status(status_type::ok);
});
server.set_http_handler<GET>(
Expand Down
38 changes: 37 additions & 1 deletion src/coro_http/tests/test_cinatra_websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,26 @@ TEST_CASE("test websocket") {
break;
}

auto ec = co_await req.get_conn()->write_websocket(result.data);
if (ec) {
break;
}
}
});
server.set_http_handler<cinatra::GET>(
"/test_client_timeout",
[](coro_http_request &req,
coro_http_response &resp) -> async_simple::coro::Lazy<void> {
CHECK(req.get_content_type() == content_type::websocket);
websocket_result result{};
while (true) {
result = co_await req.get_conn()->read_websocket();
if (result.ec) {
break;
}

std::this_thread::sleep_for(200ms);

auto ec = co_await req.get_conn()->write_websocket(result.data);
if (ec) {
break;
Expand All @@ -114,7 +134,23 @@ TEST_CASE("test websocket") {
});
server.async_start();

std::this_thread::sleep_for(std::chrono::milliseconds(100));
auto client_timeout = []() -> async_simple::coro::Lazy<void> {
coro_http_client client{};
client.set_req_timeout(50ms);
client.set_ws_sec_key("s//GYHa/XO7Hd2F2eOGfyA==");

auto r = co_await client.connect("ws://localhost:8090/test_client_timeout");
if (r.net_err) {
co_return;
}

co_await client.write_websocket("hello websocket");
auto data = co_await client.read_websocket();
std::cout << data.net_err.message() << std::endl;
CHECK(data.net_err == std::errc::timed_out);
};

async_simple::coro::syncAwait(client_timeout());

coro_http_client client{};
client.set_ws_sec_key("s//GYHa/XO7Hd2F2eOGfyA==");
Expand Down

0 comments on commit db90069

Please sign in to comment.