Skip to content

Commit

Permalink
[feature](meta-service) Support querying and adjusting rpc qps limit …
Browse files Browse the repository at this point in the history
…on meta service
  • Loading branch information
TangSiyang2001 committed Oct 25, 2024
1 parent 6dcc221 commit cb4d256
Show file tree
Hide file tree
Showing 5 changed files with 353 additions and 41 deletions.
70 changes: 69 additions & 1 deletion cloud/src/meta-service/meta_service_http.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@
#include <rapidjson/prettywriter.h>
#include <rapidjson/stringbuffer.h>

#include <cstdint>
#include <memory>
#include <optional>
#include <string>
#include <string_view>
#include <type_traits>
#include <variant>
#include <vector>
Expand All @@ -42,6 +45,7 @@
#include "meta-service/txn_kv.h"
#include "meta-service/txn_kv_error.h"
#include "meta_service.h"
#include "rate-limiter/rate_limiter.h"

namespace doris::cloud {

Expand Down Expand Up @@ -331,6 +335,66 @@ static HttpResponse process_alter_iam(MetaServiceImpl* service, brpc::Controller
return http_json_reply(resp.status());
}

static HttpResponse process_adjust_rate_limit(MetaServiceImpl* service, brpc::Controller* cntl) {
const auto& uri = cntl->http_request().uri();
bool is_attr_set = false;
int64_t default_qps_limit = -1;
if (auto default_qps_limit_str = std::string(http_query(uri, "default_qps_limit"));
!default_qps_limit_str.empty()) {
try {
default_qps_limit = std::stoll(default_qps_limit_str);
} catch (const std::exception& ex) {
return http_json_reply(
MetaServiceCode::INVALID_ARGUMENT,
fmt::format("param `default_qps_limit` is not a legal int64 type:{}",
ex.what()));
}
if (default_qps_limit < 0) {
return http_json_reply(MetaServiceCode::INVALID_ARGUMENT,
"`default_qps_limit` should not be less than 0");
}
is_attr_set |= true;
}
std::string_view specific_max_qps_limit = http_query(uri, "specific_max_qps_limit");
is_attr_set |= (!specific_max_qps_limit.empty());
if (!is_attr_set) {
return http_json_reply(
MetaServiceCode::INVALID_ARGUMENT,
"default_qps_limit(int64) or "
"specific_max_qps_limit(list of[rpcname:qps(int64);]) is required as query param");
}
auto rate_limiter = service->rate_limiter();
rate_limiter->reset_rate_limit(service, default_qps_limit, specific_max_qps_limit.data());
return http_json_reply(MetaServiceCode::OK, "success to adjust rate limit");
}

static HttpResponse process_query_rate_limit(MetaServiceImpl* service, brpc::Controller* cntl) {
const auto& uri = cntl->http_request().uri();
auto rpc_name = std::string(http_query(uri, "rpc_name"));
auto rate_limiter = service->rate_limiter();
rapidjson::Document d;
if (rpc_name.empty()) {
auto get_qps_limit = [&d](std::string_view rpc_name,
std::shared_ptr<RpcRateLimiter> rpc_limiter) {
d.AddMember(rapidjson::StringRef(rpc_name.data(), rpc_name.size()),
std::to_string(rpc_limiter->max_qps_limit()), d.GetAllocator());
};
rate_limiter->for_each_rpc_limiter(std::move(get_qps_limit));
} else {
auto rpc_limiter = rate_limiter->get_rpc_rate_limiter(rpc_name);
if (rpc_limiter == nullptr) [[unlikely]] {
return http_json_reply(MetaServiceCode::INVALID_ARGUMENT,
fmt::format("rpc_name={} is not exists", rpc_name));
}
d.AddMember(rapidjson::StringRef(rpc_name.data(), rpc_name.size()),
std::to_string(rpc_limiter->max_qps_limit()), d.GetAllocator());
}
rapidjson::StringBuffer sb;
rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(sb);
d.Accept(writer);
return http_json_reply(MetaServiceCode::OK, sb.GetString());
}

static HttpResponse process_decode_key(MetaServiceImpl*, brpc::Controller* ctrl) {
auto& uri = ctrl->http_request().uri();
std::string_view key = http_query(uri, "key");
Expand Down Expand Up @@ -598,13 +662,17 @@ void MetaServiceImpl::http(::google::protobuf::RpcController* controller,
{"abort_tablet_job", process_abort_tablet_job},
{"alter_ram_user", process_alter_ram_user},
{"alter_iam", process_alter_iam},
{"adjust_rate_limit", process_adjust_rate_limit},
{"query_rate_limit", process_query_rate_limit},
{"v1/abort_txn", process_abort_txn},
{"v1/abort_tablet_job", process_abort_tablet_job},
{"v1/alter_ram_user", process_alter_ram_user},
{"v1/alter_iam", process_alter_iam},
{"v1/adjust_rate_limit", process_adjust_rate_limit},
{"v1/query_rate_limit", process_query_rate_limit},
};

auto cntl = static_cast<brpc::Controller*>(controller);
auto* cntl = static_cast<brpc::Controller*>(controller);
brpc::ClosureGuard closure_guard(done);

// Prepare input request info
Expand Down
82 changes: 72 additions & 10 deletions cloud/src/rate-limiter/rate_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,21 @@
#include <butil/strings/string_split.h>

#include <chrono>
#include <cstdint>
#include <memory>
#include <mutex>
#include <shared_mutex>

#include "common/bvars.h"
#include "common/config.h"
#include "common/configbase.h"

namespace doris::cloud {

void RateLimiter::init(google::protobuf::Service* service) {
std::map<std::string, int64_t> rpc_name_to_max_qps_limit;
std::unordered_map<std::string, int64_t> parse_specific_qps_limit(const std::string& list_str) {
std::unordered_map<std::string, int64_t> rpc_name_to_max_qps_limit;
std::vector<std::string> max_qps_limit_list;
butil::SplitString(config::specific_max_qps_limit, ';', &max_qps_limit_list);
butil::SplitString(list_str, ';', &max_qps_limit_list);
for (const auto& v : max_qps_limit_list) {
auto p = v.find(':');
if (p != std::string::npos && p != (v.size() - 1)) {
Expand All @@ -41,36 +43,96 @@ void RateLimiter::init(google::protobuf::Service* service) {
int64_t max_qps_limit = std::stoll(v.substr(p + 1));
if (max_qps_limit > 0) {
rpc_name_to_max_qps_limit[rpc_name] = max_qps_limit;
LOG(INFO) << "set rpc: " << rpc_name << " max_qps_limit: " << max_qps_limit;
}
} catch (...) {
LOG(WARNING) << "failed to set max_qps_limit to rpc: " << rpc_name
LOG(WARNING) << "failed to parse max_qps_limit to rpc: " << rpc_name
<< " config: " << v;
}
}
}
return rpc_name_to_max_qps_limit;
}

template <typename Callable>
void for_each_rpc_name(google::protobuf::Service* service, Callable cb) {
auto method_size = service->GetDescriptor()->method_count();
for (auto i = 0; i < method_size; ++i) {
std::string rpc_name = service->GetDescriptor()->method(i)->name();
int64_t max_qps_limit = config::default_max_qps_limit;
cb(rpc_name);
}
}

auto it = rpc_name_to_max_qps_limit.find(rpc_name);
if (it != rpc_name_to_max_qps_limit.end()) {
void RateLimiter::init(google::protobuf::Service* service) {
auto rpc_name_to_specific_limit = parse_specific_qps_limit(config::specific_max_qps_limit);
std::unique_lock write_lock(shared_mtx_);
for_each_rpc_name(service, [&](const std::string& rpc_name) {
auto it = rpc_name_to_specific_limit.find(rpc_name);
int64_t max_qps_limit = config::default_max_qps_limit;
if (it != rpc_name_to_specific_limit.end()) {
max_qps_limit = it->second;
}
limiters_[rpc_name] = std::make_shared<RpcRateLimiter>(rpc_name, max_qps_limit);
});
for (const auto& [k, _] : rpc_name_to_specific_limit) {
rpc_with_specific_limit_.insert(k);
}
}

std::shared_ptr<RpcRateLimiter> RateLimiter::get_rpc_rate_limiter(const std::string& rpc_name) {
// no need to be locked, because it is only modified during initialization
std::shared_lock read_lock(shared_mtx_);
auto it = limiters_.find(rpc_name);
if (it == limiters_.end()) {
return nullptr;
}
return it->second;
}

void RateLimiter::reset_rate_limit(google::protobuf::Service* service, int64_t default_qps_limit,
const std::string& specific_max_qps_limit) {
// TODO: merge specific_max_qps_limit
auto specific_limits = parse_specific_qps_limit(specific_max_qps_limit);

auto reset_specific_limit = [&](const std::string& rpc_name) -> bool {
if (auto it = specific_limits.find(rpc_name); it != specific_limits.end()) {
limiters_[rpc_name] = std::make_shared<RpcRateLimiter>(rpc_name, it->second);
return true;
}
return false;
};
auto reset_default_limit = [&](const std::string& rpc_name) {
if (rpc_with_specific_limit_.contains(rpc_name)) {
return;
}
limiters_[rpc_name] = std::make_shared<RpcRateLimiter>(rpc_name, default_qps_limit);
};

std::unique_lock write_lock(shared_mtx_);
for (const auto& [k, _] : specific_limits) {
rpc_with_specific_limit_.insert(k);
}
if (default_qps_limit < 0) {
for_each_rpc_name(service, std::move(reset_specific_limit));
return;
}
if (specific_limits.empty()) {
for_each_rpc_name(service, std::move(reset_default_limit));
return;
}
for_each_rpc_name(service, [&](const std::string& rpc_name) {
if (reset_specific_limit(rpc_name)) {
return;
}
reset_default_limit(rpc_name);
});
}

void RateLimiter::for_each_rpc_limiter(
std::function<void(std::string_view, std::shared_ptr<RpcRateLimiter>)> cb) {
for (const auto& [rpc_name, rpc_limiter] : limiters_) {
cb(rpc_name, rpc_limiter);
}
}

bool RpcRateLimiter::get_qps_token(const std::string& instance_id,
std::function<int()>& get_bvar_qps) {
if (!config::use_detailed_metrics || instance_id.empty()) {
Expand Down Expand Up @@ -110,4 +172,4 @@ bool RpcRateLimiter::QpsToken::get_token(std::function<int()>& get_bvar_qps) {
return current_qps_ < max_qps_limit_;
}

} // namespace doris::cloud
} // namespace doris::cloud
20 changes: 18 additions & 2 deletions cloud/src/rate-limiter/rate_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@

#include <brpc/server.h>
#include <bthread/mutex.h>
#include <google/protobuf/service.h>

#include <cstdint>
#include <memory>
#include <shared_mutex>
#include <string>
#include <string_view>
#include <unordered_map>

#include "common/config.h"
Expand All @@ -35,12 +38,22 @@ class RateLimiter {
public:
RateLimiter() = default;
~RateLimiter() = default;

void init(google::protobuf::Service* service);

std::shared_ptr<RpcRateLimiter> get_rpc_rate_limiter(const std::string& rpc_name);

void reset_rate_limit(google::protobuf::Service* service, int64_t default_qps_limit,
const std::string& specific_max_qps_limit);

void for_each_rpc_limiter(
std::function<void(std::string_view, std::shared_ptr<RpcRateLimiter>)> cb);

private:
// rpc_name -> RpcRateLimiter
std::unordered_map<std::string, std::shared_ptr<RpcRateLimiter>> limiters_;
std::unordered_set<std::string> rpc_with_specific_limit_;
std::shared_mutex shared_mtx_;
};

class RpcRateLimiter {
Expand All @@ -58,6 +71,10 @@ class RpcRateLimiter {
*/
bool get_qps_token(const std::string& instance_id, std::function<int()>& get_bvar_qps);

std::string_view rpc_name() const { return rpc_name_; }

int64_t max_qps_limit() const { return max_qps_limit_; }

// Todo: Recycle outdated instance_id

private:
Expand All @@ -75,12 +92,11 @@ class RpcRateLimiter {
int64_t max_qps_limit_;
};

private:
bthread::Mutex mutex_;
// instance_id -> QpsToken
std::unordered_map<std::string, std::shared_ptr<QpsToken>> qps_limiter_;
std::string rpc_name_;
int64_t max_qps_limit_;
};

} // namespace doris::cloud
} // namespace doris::cloud
83 changes: 83 additions & 0 deletions cloud/test/meta_service_http_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1454,4 +1454,87 @@ TEST(MetaServiceHttpTest, TxnLazyCommit) {
}
}

TEST(MetaServiceHttpTest, AdjustRateLimit) {
HttpContext ctx;
{
auto [status_code, content] = ctx.query<std::string>(
"adjust_rate_limit",
"default_qps_limit=10000&specific_max_qps_limit=get_cluster:10000");
ASSERT_EQ(status_code, 200);
}
{
auto [status_code, content] =
ctx.query<std::string>("adjust_rate_limit", "default_qps_limit=10000");
ASSERT_EQ(status_code, 200);
}
{
auto [status_code, content] = ctx.query<std::string>(
"adjust_rate_limit", "specific_max_qps_limit=get_cluster:10000");
ASSERT_EQ(status_code, 200);
}
{
auto [status_code, content] = ctx.query<std::string>("adjust_rate_limit", "");
ASSERT_EQ(status_code, 400);
std::string msg =
"default_qps_limit(int64) or specific_max_qps_limit(list of[rpcname:qps(int64);]) "
"is required as query param";
ASSERT_TRUE(content.find(msg) != std::string::npos);
}
{
auto [status_code, content] = ctx.query<std::string>("adjust_rate_limit", "key=abc");
ASSERT_EQ(status_code, 400);
std::string msg =
"default_qps_limit(int64) or specific_max_qps_limit(list of[rpcname:qps(int64);]) "
"is required as query param";
ASSERT_TRUE(content.find(msg) != std::string::npos);
}
{
auto [status_code, content] =
ctx.query<std::string>("adjust_rate_limit", "default_qps_limit=invalid");
ASSERT_EQ(status_code, 400);
std::string msg = "param `qps_limit` is not a legal int64 type:";
ASSERT_TRUE(content.find(msg) != std::string::npos);
}
{
auto [status_code, content] = ctx.query<std::string>(
"adjust_rate_limit",
"specific_max_qps_limit=get_cluster:10000&default_qps_limit=invalid");
ASSERT_EQ(status_code, 400);
std::string msg = "param `qps_limit` is not a legal int64 type:";
ASSERT_TRUE(content.find(msg) != std::string::npos);
}
{
auto [status_code, content] = ctx.query<std::string>(
"adjust_rate_limit",
"specific_max_qps_limit=get_cluster:invalid&default_qps_limit=10000");
// note: invalid so will not take effect, but return ok, by design
ASSERT_EQ(status_code, 200);
}
{
auto [status_code, content] = ctx.query<std::string>(
"adjust_rate_limit", "specific_max_qps_limit=xxx:10000&default_qps_limit=10000");
// note: invalid so will not take effect, but return ok, by design
ASSERT_EQ(status_code, 200);
}
}

TEST(MetaServiceHttpTest, QueryRateLimit) {
HttpContext ctx;
{
auto [status_code, content] = ctx.query<std::string>("query_rate_limit", "");
ASSERT_EQ(status_code, 200);
}
{
auto [status_code, content] =
ctx.query<std::string>("query_rate_limit", "rpc_name=get_cluster");
ASSERT_EQ(status_code, 200);
}
{
auto [status_code, content] = ctx.query<std::string>("query_rate_limit", "rpc_name=xxx");
ASSERT_EQ(status_code, 400);
std::string msg = "rpc_name=xxx is not exists";
ASSERT_TRUE(content.find(msg) != std::string::npos);
}
}

} // namespace doris::cloud
Loading

0 comments on commit cb4d256

Please sign in to comment.