Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions programs/server/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1087,6 +1087,7 @@ cluster_settings:

# Metastore server config
metastore_server:
enable_ipv6: true
http_port: 9444
server_id: 1

Expand Down
100 changes: 55 additions & 45 deletions src/Coordination/MetaStoreServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include "config.h"

#include <Common/getMultipleKeysFromConfig.h>
#include <Common/setThreadName.h>

#include <Poco/Util/Application.h>
Expand All @@ -20,11 +21,11 @@ namespace DB

namespace ErrorCodes
{
extern const int ACCESS_DENIED;
extern const int RAFT_ERROR;
extern const int NO_ELEMENTS_IN_CONFIG;
extern const int SUPPORT_IS_DISABLED;
extern const int LOGICAL_ERROR;
extern const int ACCESS_DENIED;
extern const int RAFT_ERROR;
extern const int NO_ELEMENTS_IN_CONFIG;
extern const int SUPPORT_IS_DISABLED;
extern const int LOGICAL_ERROR;
}

namespace
Expand Down Expand Up @@ -125,16 +126,19 @@ MetaStoreServer::MetaStoreServer(
: server_id(server_id_)
, coordination_settings(coordination_settings_)
, state_machine(nuraft::cs_new<MetaStateMachine>(
snapshots_queue,
getSnapshotsPathFromConfig(config, standalone_metastore),
getStoragePathFromConfig(config, standalone_metastore),
coordination_settings))
snapshots_queue,
getSnapshotsPathFromConfig(config, standalone_metastore),
getStoragePathFromConfig(config, standalone_metastore),
coordination_settings))
, state_manager(nuraft::cs_new<MetaStateManager>(server_id, "metastore_server", config, coordination_settings, standalone_metastore))
, log(&Poco::Logger::get("MetaStoreServer"))
, namespace_whitelist(getNamespaceWhitelistFromConfig(config))
{
if (coordination_settings->quorum_reads)
LOG_WARNING(log, "Quorum reads enabled, MetaStoreServer will work slower.");

enable_ipv6 = config.getBool("metastore_server.enable_ipv6", true);
listen_hosts = DB::getMultipleValuesFromConfig(config, "", "listen_host");
}

void MetaStoreServer::startup()
Expand All @@ -158,8 +162,10 @@ void MetaStoreServer::startup()
else
{
params.heart_beat_interval_ = static_cast<nuraft::int32>(coordination_settings->heart_beat_interval_ms.totalMilliseconds());
params.election_timeout_lower_bound_ = static_cast<nuraft::int32>(coordination_settings->election_timeout_lower_bound_ms.totalMilliseconds());
params.election_timeout_upper_bound_ = static_cast<nuraft::int32>(coordination_settings->election_timeout_upper_bound_ms.totalMilliseconds());
params.election_timeout_lower_bound_
= static_cast<nuraft::int32>(coordination_settings->election_timeout_lower_bound_ms.totalMilliseconds());
params.election_timeout_upper_bound_
= static_cast<nuraft::int32>(coordination_settings->election_timeout_upper_bound_ms.totalMilliseconds());
}

params.reserved_log_items_ = static_cast<nuraft::int32>(coordination_settings->reserved_log_items);
Expand All @@ -179,8 +185,8 @@ void MetaStoreServer::startup()
#if USE_SSL
setSSLParams(asio_opts);
#else
throw Exception{"SSL support for NuRaft is disabled because proton was built without SSL support.",
ErrorCodes::SUPPORT_IS_DISABLED};
throw Exception{
"SSL support for NuRaft is disabled because proton was built without SSL support.", ErrorCodes::SUPPORT_IS_DISABLED};
#endif
}

Expand All @@ -190,27 +196,34 @@ void MetaStoreServer::startup()
throw Exception(ErrorCodes::RAFT_ERROR, "Cannot allocate RAFT instance");
}

void MetaStoreServer::launchRaftServer(
const nuraft::raft_params & params,
const nuraft::asio_service::options & asio_opts)
void MetaStoreServer::launchRaftServer(const nuraft::raft_params & params, const nuraft::asio_service::options & asio_opts)
{
nuraft::raft_server::init_options init_options;

init_options.skip_initial_election_timeout_ = state_manager->shouldStartAsFollower();
init_options.start_server_in_constructor_ = false;
init_options.raft_callback_ = [this] (nuraft::cb_func::Type type, nuraft::cb_func::Param * param)
{
return callbackFunc(type, param);
};
init_options.raft_callback_ = [this](nuraft::cb_func::Type type, nuraft::cb_func::Param * param) { return callbackFunc(type, param); };

nuraft::ptr<nuraft::logger> logger = nuraft::cs_new<LoggerWrapper>("RaftInstance", coordination_settings->raft_logs_level);
asio_service = nuraft::cs_new<nuraft::asio_service>(asio_opts, logger);
asio_listener = asio_service->create_rpc_listener(state_manager->getPort(), logger);

if (!asio_listener)
return;
if (listen_hosts.empty())
{
auto asio_listener = asio_service->create_rpc_listener(state_manager->getPort(), logger, enable_ipv6);
if (!asio_listener)
throw Exception(ErrorCodes::RAFT_ERROR, "Cannot create MetaStore Raft listener on port {}", state_manager->getPort());

std::vector<nuraft::ptr<nuraft::rpc_listener>> asio_listeners{asio_listener};
asio_listeners.emplace_back(std::move(asio_listener));
}
else
{
for (const auto & listen_host : listen_hosts)
{
auto asio_listener = asio_service->create_rpc_listener(listen_host, state_manager->getPort(), logger);
if (asio_listener)
asio_listeners.emplace_back(std::move(asio_listener));
}
}

nuraft::ptr<nuraft::delayed_task_scheduler> scheduler = asio_service;
nuraft::ptr<nuraft::rpc_client_factory> rpc_cli_factory = asio_service;
Expand All @@ -219,14 +232,15 @@ void MetaStoreServer::launchRaftServer(
nuraft::ptr<nuraft::state_machine> casted_state_machine = state_machine;

/// raft_server creates unique_ptr from it
nuraft::context * ctx = new nuraft::context(
casted_state_manager, casted_state_machine,
asio_listeners, logger, rpc_cli_factory, scheduler, params);
nuraft::context * ctx
= new nuraft::context(casted_state_manager, casted_state_machine, asio_listeners, logger, rpc_cli_factory, scheduler, params);

raft_instance = nuraft::cs_new<nuraft::raft_server>(ctx, init_options);

raft_instance->start_server(init_options.skip_initial_election_timeout_);
asio_listener->listen(raft_instance);

for (const auto & asio_listener : asio_listeners)
asio_listener->listen(raft_instance);
}

void MetaStoreServer::shutdownRaftServer()
Expand All @@ -242,10 +256,13 @@ void MetaStoreServer::shutdownRaftServer()
raft_instance->shutdown();
raft_instance.reset();

if (asio_listener)
for (const auto & asio_listener : asio_listeners)
{
asio_listener->stop();
asio_listener->shutdown();
if (asio_listener)
{
asio_listener->stop();
asio_listener->shutdown();
}
}

if (asio_service)
Expand Down Expand Up @@ -317,7 +334,7 @@ std::vector<String> MetaStoreServer::localMultiGetByKeys(const std::vector<Strin
return values;
}

std::vector<std::pair<String, String> > MetaStoreServer::localRangeGetByNamespace(const String & prefix_, const String & namespace_) const
std::vector<std::pair<String, String>> MetaStoreServer::localRangeGetByNamespace(const String & prefix_, const String & namespace_) const
{
Coordination::KVListResponse resp;
Coordination::KVNamespaceAndPrefixHelper helper(checkNamespace(namespace_), prefix_);
Expand All @@ -331,9 +348,7 @@ Coordination::KVResponsePtr MetaStoreServer::putRequest(const Coordination::KVRe
Coordination::KVNamespaceAndPrefixHelper helper(checkNamespace(namespace_));
const auto & entry = getBufferFromKVRequest(helper.handle(request));
auto ret = raft_instance->append_entries({entry});
if (ret->get_accepted() &&
ret->get_result_code() == nuraft::cmd_result_code::OK &&
ret->get())
if (ret->get_accepted() && ret->get_result_code() == nuraft::cmd_result_code::OK && ret->get())
return helper.handle(parseKVResponse(ret->get()));

/// error response
Expand Down Expand Up @@ -378,25 +393,22 @@ nuraft::cb_func::ReturnCode MetaStoreServer::callbackFunc(nuraft::cb_func::Type
if (next_index < last_commited || next_index - last_commited <= 1)
commited_store = true;

auto set_initialized = [this] ()
{
auto set_initialized = [this]() {
std::unique_lock lock(initialized_mutex);
initialized_flag = true;
initialized_cv.notify_all();
};

switch (type)
{
case nuraft::cb_func::BecomeLeader:
{
case nuraft::cb_func::BecomeLeader: {
/// We become leader and store is empty or we already committed it
if (commited_store || initial_batch_committed)
set_initialized();
return nuraft::cb_func::ReturnCode::Ok;
}
case nuraft::cb_func::BecomeFollower:
case nuraft::cb_func::GotAppendEntryReqFromLeader:
{
case nuraft::cb_func::GotAppendEntryReqFromLeader: {
if (param->leaderId != -1)
{
auto leader_index = raft_instance->get_leader_committed_log_idx();
Expand All @@ -410,13 +422,11 @@ nuraft::cb_func::ReturnCode MetaStoreServer::callbackFunc(nuraft::cb_func::Type
}
return nuraft::cb_func::ReturnCode::Ok;
}
case nuraft::cb_func::BecomeFresh:
{
case nuraft::cb_func::BecomeFresh: {
set_initialized(); /// We are fresh follower, ready to serve requests.
return nuraft::cb_func::ReturnCode::Ok;
}
case nuraft::cb_func::InitialBatchCommited:
{
case nuraft::cb_func::InitialBatchCommited: {
if (param->myId == param->leaderId) /// We have committed our log store and we are leader, ready to serve requests.
set_initialized();
initial_batch_committed = true;
Expand Down
4 changes: 3 additions & 1 deletion src/Coordination/MetaStoreServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class MetaStoreServer : public IMetaStoreServer

nuraft::ptr<nuraft::raft_server> raft_instance;
nuraft::ptr<nuraft::asio_service> asio_service;
nuraft::ptr<nuraft::rpc_listener> asio_listener;
std::vector<nuraft::ptr<nuraft::rpc_listener>> asio_listeners;

std::mutex append_entries_mutex;

Expand All @@ -52,6 +52,8 @@ class MetaStoreServer : public IMetaStoreServer
Poco::Logger * log;

std::unordered_set<std::string> namespace_whitelist;
bool enable_ipv6 = false;
std::vector<std::string> listen_hosts;

nuraft::cb_func::ReturnCode callbackFunc(nuraft::cb_func::Type type, nuraft::cb_func::Param * param);

Expand Down
Loading