Skip to content

Commit

Permalink
[feature](meta-service) Support dynamic configuration for meta service
Browse files Browse the repository at this point in the history
  • Loading branch information
TangSiyang2001 committed Dec 13, 2024
1 parent 1ffecfd commit b4ab2b7
Show file tree
Hide file tree
Showing 6 changed files with 213 additions and 3 deletions.
3 changes: 3 additions & 0 deletions cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#pragma once

#include <string_view>

#include "configbase.h"

namespace doris::cloud::config {
Expand Down Expand Up @@ -225,4 +227,5 @@ CONF_Int32(max_tablet_index_num_per_batch, "1000");
CONF_mInt64(max_num_aborted_txn, "100");

CONF_Bool(enable_check_instance_id, "true");

} // namespace doris::cloud::config
106 changes: 106 additions & 0 deletions cloud/src/common/configbase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
#include <algorithm>
#include <cerrno>
#include <cstring>
#include <filesystem>
#include <fstream>
#include <iostream>
#include <list>
#include <map>
#include <mutex>
#include <sstream>

#define __IN_CONFIGBASE_CPP__
Expand All @@ -30,9 +32,13 @@

namespace doris::cloud::config {

std::string_view g_conf_path {};

std::map<std::string, Register::Field>* Register::_s_field_map = nullptr;
std::map<std::string, std::function<bool()>>* RegisterConfValidator::_s_field_validator = nullptr;
std::map<std::string, std::string>* full_conf_map = nullptr;
std::mutex mutable_string_config_lock;
std::mutex conf_persist_lock;

// trim string
std::string& trim(std::string& s) {
Expand Down Expand Up @@ -224,6 +230,38 @@ bool Properties::load(const char* conf_file, bool must_exist) {
return true;
}

// dump props to conf file
bool Properties::dump(const std::string& conffile) {
std::string conffile_tmp = conffile + ".tmp";

if (std::filesystem::exists(conffile)) {
// copy for modify
std::ifstream in(conffile, std::ios::binary);
std::ofstream out(conffile_tmp, std::ios::binary);
out << in.rdbuf();
in.close();
out.close();
}

std::ofstream file_writer;

file_writer.open(conffile_tmp, std::ios::out | std::ios::app);

file_writer << std::endl;

for (auto const& iter : file_conf_map) {
file_writer << iter.first << " = " << iter.second << std::endl;
}

file_writer.close();
if (!file_writer.good()) {
return false;
}

std::filesystem::rename(conffile_tmp, conffile);
return std::filesystem::exists(conffile);
}

template <typename T>
bool Properties::get_or_default(const char* key, const char* defstr, T& retval,
bool* is_retval_set) const {
Expand Down Expand Up @@ -297,6 +335,37 @@ std::ostream& operator<<(std::ostream& out, const std::vector<T>& v) {
continue; \
}

#define UPDATE_FIELD(FIELD, VALUE, TYPE, PERSIST) \
if (strcmp((FIELD).type, #TYPE) == 0) { \
TYPE new_value; \
if (!convert((VALUE), new_value)) { \
std::cerr << "convert " << VALUE << "as" << #TYPE << "failed"; \
return false; \
} \
TYPE& ref_conf_value = *reinterpret_cast<TYPE*>((FIELD).storage); \
TYPE old_value = ref_conf_value; \
if (RegisterConfValidator::_s_field_validator != nullptr) { \
auto validator = RegisterConfValidator::_s_field_validator->find((FIELD).name); \
if (validator != RegisterConfValidator::_s_field_validator->end() && \
!(validator->second)()) { \
ref_conf_value = old_value; \
std::cerr << "validate " << (FIELD).name << "=" << new_value << " failed" \
<< std::endl; \
return false; \
} \
} \
ref_conf_value = new_value; \
if (full_conf_map != nullptr) { \
std::ostringstream oss; \
oss << new_value; \
(*full_conf_map)[(FIELD).name] = oss.str(); \
} \
if (PERSIST) { \
props.set_force(std::string((FIELD).name), VALUE); \
} \
return true; \
}

// init conf fields
bool init(const char* conf_file, bool fill_conf_map, bool must_exist, bool set_to_default) {
Properties props;
Expand Down Expand Up @@ -328,4 +397,41 @@ bool init(const char* conf_file, bool fill_conf_map, bool must_exist, bool set_t
return true;
}

bool do_set_config(const Register::Field& feild, const std::string& value, bool need_persist,
Properties& props) {
UPDATE_FIELD(feild, value, bool, need_persist);
UPDATE_FIELD(feild, value, int16_t, need_persist);
UPDATE_FIELD(feild, value, int32_t, need_persist);
UPDATE_FIELD(feild, value, int64_t, need_persist);
UPDATE_FIELD(feild, value, double, need_persist);
{
// add lock to ensure thread safe
std::lock_guard<std::mutex> lock(mutable_string_config_lock);
UPDATE_FIELD(feild, value, std::string, need_persist);
}
return false;
}

bool set_config(const std::string& field, const std::string& value, bool need_persist) {
auto it = Register::_s_field_map->find(field);
if (it == Register::_s_field_map->end()) {
return false;
}
if (!it->second.valmutable) {
return false;
}
Properties props;
if (!do_set_config(it->second, value, need_persist, props)) {
std::cerr << "not supported to modify: " << field << "=" << value << std::endl;
return false;
}

if (need_persist) {
// lock to make sure only one thread can modify the be_custom.conf
std::lock_guard<std::mutex> l(conf_persist_lock);
return props.dump(config::g_conf_path.data());
}
return true;
}

} // namespace doris::cloud::config
3 changes: 3 additions & 0 deletions cloud/src/common/configbase.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

namespace doris::cloud::config {

extern std::string_view g_conf_path;

class Register {
public:
struct Field {
Expand Down Expand Up @@ -170,4 +172,5 @@ extern std::map<std::string, std::string>* full_conf_map;
bool init(const char* conf_file, bool fill_conf_map = false, bool must_exist = true,
bool set_to_default = true);

bool set_config(const std::string& field, const std::string& value, bool need_persist = false);
} // namespace doris::cloud::config
7 changes: 4 additions & 3 deletions cloud/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

#include "common/arg_parser.h"
#include "common/config.h"
#include "common/configbase.h"
#include "common/encryption_util.h"
#include "common/logging.h"
#include "meta-service/mem_txn_kv.h"
Expand Down Expand Up @@ -193,9 +194,9 @@ int main(int argc, char** argv) {
return -1;
}

auto conf_file = args.get<std::string>(ARG_CONF);
if (!config::init(conf_file.c_str(), true)) {
std::cerr << "failed to init config file, conf=" << conf_file << std::endl;
config::g_conf_path = args.get<std::string>(ARG_CONF);
if (!config::init(config::g_conf_path.data(), true)) {
std::cerr << "failed to init config file, conf=" << config::g_conf_path << std::endl;
return -1;
}

Expand Down
30 changes: 30 additions & 0 deletions cloud/src/meta-service/meta_service_http.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <brpc/controller.h>
#include <brpc/http_status_code.h>
#include <brpc/uri.h>
#include <fmt/core.h>
#include <fmt/format.h>
#include <gen_cpp/cloud.pb.h>
#include <glog/logging.h>
Expand All @@ -44,7 +45,9 @@
#include <vector>

#include "common/config.h"
#include "common/configbase.h"
#include "common/logging.h"
#include "common/string_util.h"
#include "meta-service/keys.h"
#include "meta-service/txn_kv.h"
#include "meta-service/txn_kv_error.h"
Expand Down Expand Up @@ -448,6 +451,31 @@ static HttpResponse process_query_rate_limit(MetaServiceImpl* service, brpc::Con
return http_json_reply(MetaServiceCode::OK, "", sb.GetString());
}

static HttpResponse process_update_config(MetaServiceImpl* service, brpc::Controller* cntl) {
const auto& uri = cntl->http_request().uri();
bool persist = (http_query(uri, "persist") == "true");
auto configs = std::string {http_query(uri, "configs")};
if (configs.empty()) [[unlikely]] {
return http_json_reply(MetaServiceCode::INVALID_ARGUMENT,
"query param `config` should not be empty");
}
auto conf_list = split(configs, ',');
for (const auto& conf : conf_list) {
auto conf_pair = split(conf, '=');
if (conf_pair.size() != 2) [[unlikely]] {
return http_json_reply(MetaServiceCode::INVALID_ARGUMENT,
std::format("config {} is invalid", configs));
}
trim(conf_pair[0]);
trim(conf_pair[1]);
if (!config::set_config(conf_pair[0], conf_pair[1], persist)) {
return http_json_reply(MetaServiceCode::INVALID_ARGUMENT,
std::format("set {}={} failed", conf_pair[0], conf_pair[1]));
}
}
return http_json_reply(MetaServiceCode::OK, "");
}

static HttpResponse process_decode_key(MetaServiceImpl*, brpc::Controller* ctrl) {
auto& uri = ctrl->http_request().uri();
std::string_view key = http_query(uri, "key");
Expand Down Expand Up @@ -732,12 +760,14 @@ void MetaServiceImpl::http(::google::protobuf::RpcController* controller,
{"alter_iam", process_alter_iam},
{"adjust_rate_limit", process_adjust_rate_limit},
{"list_rate_limit", process_query_rate_limit},
{"update_config", process_update_config},
{"v1/abort_txn", process_abort_txn},
{"v1/abort_tablet_job", process_abort_tablet_job},
{"v1/alter_ram_user", process_alter_ram_user},
{"v1/alter_iam", process_alter_iam},
{"v1/adjust_rate_limit", process_adjust_rate_limit},
{"v1/list_rate_limit", process_query_rate_limit},
{"v1/update_config", process_update_config},
};

auto* cntl = static_cast<brpc::Controller*>(controller);
Expand Down
67 changes: 67 additions & 0 deletions cloud/test/meta_service_http_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@
#include <rapidjson/stringbuffer.h>

#include <cstddef>
#include <cstdint>
#include <filesystem>
#include <optional>
#include <string>

#include "common/config.h"
#include "common/configbase.h"
#include "common/logging.h"
#include "common/util.h"
#include "cpp/sync_point.h"
Expand Down Expand Up @@ -1625,4 +1629,67 @@ TEST(MetaServiceHttpTest, QueryRateLimit) {
}
}

TEST(MetaServiceHttpTest, UpdateConfig) {
HttpContext ctx;
{
auto [status_code, content] = ctx.query<std::string>("update_config", "");
ASSERT_EQ(status_code, 400);
std::string msg = "query param `config` should not be empty";
ASSERT_NE(content.find(msg), std::string::npos);
}
{
auto [status_code, content] = ctx.query<std::string>("update_config", "configs=aaa");
ASSERT_EQ(status_code, 400);
std::string msg = "config aaa is invalid";
ASSERT_NE(content.find(msg), std::string::npos);
}
{
auto [status_code, content] = ctx.query<std::string>("update_config", "configs=aaa=bbb");
ASSERT_EQ(status_code, 400);
std::string msg = "set aaa=bbb failed";
ASSERT_NE(content.find(msg), std::string::npos);
}
{
auto [status_code, content] =
ctx.query<std::string>("update_config", "configs=recycle_interval_seconds=3599");
ASSERT_EQ(status_code, 200);
ASSERT_EQ(config::recycle_interval_seconds, 3599);
}
{
auto [status_code, content] = ctx.query<std::string>(
"update_config", "configs=recycle_interval_seconds=3601,retention_seconds=259201");
ASSERT_EQ(status_code, 200);
ASSERT_EQ(config::retention_seconds, 259201);
ASSERT_EQ(config::recycle_interval_seconds, 3601);
}
{
config::g_conf_path = "./doris_cloud.conf";
auto [status_code, content] = ctx.query<std::string>(
"update_config",
"configs=recycle_interval_seconds=3659,retention_seconds=259219&persist=true");
ASSERT_EQ(status_code, 200);
ASSERT_EQ(config::recycle_interval_seconds, 3659);
ASSERT_EQ(config::retention_seconds, 259219);
config::Properties props;
ASSERT_TRUE(props.load(config::g_conf_path.data(), true));
{
bool new_val_set = false;
int64_t recycle_interval_s = 0;
ASSERT_TRUE(props.get_or_default("recycle_interval_seconds", nullptr,
recycle_interval_s, &new_val_set));
ASSERT_TRUE(new_val_set);
ASSERT_EQ(recycle_interval_s, 3659);
}
{
bool new_val_set = false;
int64_t retention_s = 0;
ASSERT_TRUE(
props.get_or_default("retention_seconds", nullptr, retention_s, &new_val_set));
ASSERT_TRUE(new_val_set);
ASSERT_EQ(retention_s, 259219);
}
std::filesystem::remove(config::g_conf_path);
}
}

} // namespace doris::cloud

0 comments on commit b4ab2b7

Please sign in to comment.