Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Make tiflash compatible with the cse branch #9765

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
12 changes: 12 additions & 0 deletions dbms/src/Flash/BatchCoprocessorHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,18 @@ grpc::Status BatchCoprocessorHandler::execute()
tables_regions_info.tableCount(),
dag_request.DebugString());

#if SERVERLESS_PROXY != 0
if (cop_context.db_context.isKeyspaceInBlocklist(cop_request->context().keyspace_id())
|| cop_context.db_context.isRegionsContainsInBlocklist(tables_regions_info.getAllRegionID()))
{
LOG_DEBUG(
log,
"cop request disabled for keyspace or regions in keyspace {}",
cop_request->context().keyspace_id());
return recordError(grpc::StatusCode::INTERNAL, "cop request disabled");
}
#endif

DAGContext dag_context(
dag_request,
std::move(tables_regions_info),
Expand Down
18 changes: 18 additions & 0 deletions dbms/src/Flash/Coprocessor/TablesRegionsInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,22 @@ TablesRegionsInfo TablesRegionsInfo::create(
return tables_regions_info;
}

std::vector<RegionID> TablesRegionsInfo::getAllRegionID() const
{
std::vector<RegionID> all_regions;
for (const auto & ele : table_regions_info_map)
{
const auto & single_table_region = ele.second;
for (const auto & region : single_table_region.local_regions)
{
all_regions.push_back(region.first);
}
for (const auto & region : single_table_region.remote_regions)
{
all_regions.push_back(region.region_id);
}
}
return all_regions;
}

} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/TablesRegionsInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ class TablesRegionsInfo
}
UInt64 tableCount() const { return table_regions_info_map.size(); }

std::vector<RegionID> getAllRegionID() const;

private:
bool is_single_table;
std::unordered_map<TableID, SingleTableRegions> table_regions_info_map;
Expand Down
19 changes: 18 additions & 1 deletion dbms/src/Flash/CoprocessorHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,18 @@ grpc::Status CoprocessorHandler<is_stream>::execute()
TablesRegionsInfo tables_regions_info(true);
auto & table_regions_info = tables_regions_info.getSingleTableRegions();

#if SERVERLESS_PROXY != 0
if (cop_context.db_context.isKeyspaceInBlocklist(cop_request->context().keyspace_id())
|| cop_context.db_context.isRegionInBlocklist(cop_context.kv_context.region_id()))
{
LOG_DEBUG(
log,
"cop request disabled for keyspace or regions in keyspace {}",
cop_request->context().keyspace_id());
return recordError(grpc::StatusCode::INTERNAL, "cop request disabled");
}
#endif

const std::unordered_set<UInt64> bypass_lock_ts(
cop_context.kv_context.resolved_locks().begin(),
cop_context.kv_context.resolved_locks().end());
Expand Down Expand Up @@ -200,7 +212,12 @@ grpc::Status CoprocessorHandler<is_stream>::execute()
}
catch (LockException & e)
{
LOG_WARNING(log, "LockException: region_id={}, message: {}", cop_request->context().region_id(), e.message());
LOG_WARNING(
log,
"LockException: region_id={}, message: {}, is_txn_file={}",
cop_request->context().region_id(),
e.message(),
e.locks[0].second->is_txn_file());
GET_METRIC(tiflash_coprocessor_request_error, reason_meet_lock).Increment();
if constexpr (is_stream)
{
Expand Down
11 changes: 11 additions & 0 deletions dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,17 @@ void WNEstablishDisaggTaskHandler::prepare(const disaggregated::EstablishDisaggT
tables_regions_info.regionCount(),
tables_regions_info.tableCount());

#if SERVERLESS_PROXY != 0
if (context->isKeyspaceInBlocklist(meta.keyspace_id())
|| context->isRegionsContainsInBlocklist(tables_regions_info.getAllRegionID()))
{
throw TiFlashException(
Errors::Coprocessor::BadRequest,
"disaggregated request disabled for keyspace or regions in keyspace",
meta.keyspace_id());
}
#endif

// set schema ver and start ts
auto schema_ver = request->schema_ver();
context->setSetting("schema_version", schema_ver);
Expand Down
15 changes: 15 additions & 0 deletions dbms/src/Flash/Mpp/MPPHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,21 @@ grpc::Status MPPHandler::execute(const ContextPtr & context, mpp::DispatchTaskRe
task = MPPTask::newTask(task_request.meta(), context);
task->prepare(task_request);

#if SERVERLESS_PROXY != 0
if (context->isKeyspaceInBlocklist(task_request.meta().keyspace_id())
|| context->isRegionsContainsInBlocklist(context->getDAGContext()->tables_regions_info.getAllRegionID()))
{
LOG_DEBUG(
log,
"mpp request disabled for keyspace or regions in keyspace {}",
task_request.meta().keyspace_id());
auto * err = response->mutable_error();
err->set_msg("mpp request disabled");
handleError(task, "mpp request disabled");
return grpc::Status::OK;
}
#endif

addRetryRegion(context, response);

#ifndef NDEBUG
Expand Down
33 changes: 33 additions & 0 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ struct ContextShared

JointThreadInfoJeallocMapPtr joint_memory_allocation_map; /// Joint thread-wise alloc/dealloc map

std::unordered_set<KeyspaceID> keyspace_blocklist;
std::unordered_set<RegionID> region_blocklist;
std::unordered_set<uint64_t> store_id_blocklist; /// Those store id are blocked from batch cop request.

class SessionKeyHash
Expand Down Expand Up @@ -2227,6 +2229,37 @@ void Context::setMockMPPServerInfo(MockMPPServerInfo & info)
mpp_server_info = info;
}

void Context::initKeyspaceBlocklist(const std::unordered_set<KeyspaceID> & keyspace_ids)
{
auto lock = getLock();
shared->keyspace_blocklist = keyspace_ids;
}
bool Context::isKeyspaceInBlocklist(const KeyspaceID keyspace_id)
{
auto lock = getLock();
return shared->keyspace_blocklist.count(keyspace_id) > 0;
}
void Context::initRegionBlocklist(const std::unordered_set<RegionID> & region_ids)
{
auto lock = getLock();
shared->region_blocklist = region_ids;
}
bool Context::isRegionInBlocklist(const RegionID region_id)
{
auto lock = getLock();
return shared->region_blocklist.count(region_id) > 0;
}
bool Context::isRegionsContainsInBlocklist(const std::vector<RegionID> & regions)
{
auto lock = getLock();
for (const auto region : regions)
{
if (isRegionInBlocklist(region))
return true;
}
return false;
}

const std::unordered_set<uint64_t> * Context::getStoreIdBlockList() const
{
return &shared->store_id_blocklist;
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <Interpreters/TimezoneInfo.h>
#include <Server/ServerInfo.h>
#include <Storages/DeltaMerge/LocalIndexerScheduler_fwd.h>
#include <Storages/KVStore/Types.h>
#include <common/MultiVersion.h>

#include <chrono>
Expand Down Expand Up @@ -561,6 +562,12 @@ class Context

void mockConfigLoaded() { is_config_loaded = true; }

void initKeyspaceBlocklist(const std::unordered_set<KeyspaceID> & keyspace_ids);
bool isKeyspaceInBlocklist(KeyspaceID keyspace_id);
void initRegionBlocklist(const std::unordered_set<RegionID> & region_ids);
bool isRegionInBlocklist(RegionID region_id);
bool isRegionsContainsInBlocklist(const std::vector<RegionID> & regions);

bool initializeStoreIdBlockList(const String &);
const std::unordered_set<uint64_t> * getStoreIdBlockList() const;

Expand Down
10 changes: 10 additions & 0 deletions dbms/src/Interpreters/loadMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,16 @@ void loadMetadata(Context & context)
if (db_name == SYSTEM_DATABASE)
continue;

#if SERVERLESS_PROXY == 1
// Ignore database owned by keyspace in blacklist
auto keyspace_id = SchemaNameMapper::getMappedNameKeyspaceID(db_name);
if (context.isKeyspaceInBlocklist(keyspace_id))
{
LOG_WARNING(log, "database {} ignored because keyspace in blacklist, keyspace={}", db_name, keyspace_id);
continue;
}
#endif

databases.emplace(db_name, path + file);
}

Expand Down
8 changes: 7 additions & 1 deletion dbms/src/Server/MetricsPrometheus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,13 @@ std::shared_ptr<Poco::Net::HTTPServer> getHTTPServer(
key_path,
cert_path,
ca_path,
Poco::Net::Context::VerificationMode::VERIFY_STRICT);
#if SERVERLESS_PROXY == 0
Poco::Net::Context::VerificationMode::VERIFY_STRICT
#else
// mtls: metrics server allows anonymous pullers @iosmanthus
Poco::Net::Context::VerificationMode::VERIFY_RELAXED
#endif
);

auto check_common_name = [&](const Poco::Crypto::X509Certificate & cert) {
return global_context.getSecurityConfig()->checkCommonName(cert);
Expand Down
75 changes: 73 additions & 2 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,10 @@ struct TiFlashProxyConfig
args_map["engine-label"] = getProxyLabelByDisaggregatedMode(disaggregated_mode);
if (disaggregated_mode != DisaggregatedMode::Compute && has_s3_config)
args_map["engine-role-label"] = DISAGGREGATED_MODE_WRITE_ENGINE_ROLE;
#if SERVERLESS_PROXY == 1
if (config.has("blacklist_file"))
args_map["blacklist-ile"] = config.getString("blacklist_file");
#endif

for (auto && [k, v] : args_map)
val_map.emplace("--" + k, std::move(v));
Expand Down Expand Up @@ -961,6 +965,74 @@ void syncSchemaWithTiDB(
global_context->initializeSchemaSyncService();
}

void loadBlockList(
[[maybe_unused]] const Poco::Util::LayeredConfiguration & config,
Context & global_context,
[[maybe_unused]] const LoggerPtr & log)
{
#if SERVERLESS_PROXY != 1
// We do not support blocking store by id in OP mode currently.
global_context.initializeStoreIdBlockList("");
#else
global_context.initializeStoreIdBlockList(global_context.getSettingsRef().disagg_blocklist_wn_store_id);

/// Load keyspace blacklist json file
LOG_INFO(log, "Loading blacklist file.");
auto blacklist_file_path = config.getString("blacklist_file", "");
if (blacklist_file_path.length() == 0)
{
LOG_INFO(log, "blacklist file not enabled, ignore it.");
}
else
{
auto blacklist_file = Poco::File(blacklist_file_path);
if (blacklist_file.exists() && blacklist_file.isFile() && blacklist_file.canRead())
{
// Read the json file
std::ifstream ifs(blacklist_file_path);
std::string json_content((std::istreambuf_iterator<char>(ifs)), std::istreambuf_iterator<char>());
Poco::JSON::Parser parser;
Poco::Dynamic::Var json_var = parser.parse(json_content);
const auto & json_obj = json_var.extract<Poco::JSON::Object::Ptr>();

// load keyspace list
auto keyspace_arr = json_obj->getArray("keyspace_ids");
if (!keyspace_arr.isNull())
{
std::unordered_set<KeyspaceID> keyspace_blocklist;
for (size_t i = 0; i < keyspace_arr->size(); i++)
{
keyspace_blocklist.emplace(keyspace_arr->getElement<KeyspaceID>(i));
}
global_context.initKeyspaceBlocklist(keyspace_blocklist);
}

// load region list
auto region_arr = json_obj->getArray("region_ids");
if (!region_arr.isNull())
{
std::unordered_set<RegionID> region_blocklist;
for (size_t i = 0; i < region_arr->size(); i++)
{
region_blocklist.emplace(region_arr->getElement<RegionID>(i));
}
global_context.initRegionBlocklist(region_blocklist);
}

LOG_INFO(
log,
"Load blacklist file done, total {} keyspaces and {} regions in blacklist.",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Load blacklist file done, total {} keyspaces and {} regions in blacklist.",
"Load blocklist file done, total {} keyspaces and {} regions in blacklist.",

keyspace_arr.isNull() ? 0 : keyspace_arr->size(),
region_arr.isNull() ? 0 : region_arr->size());
}
else
{
LOG_INFO(log, "blacklist file not exists or non-readble, ignore it.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
LOG_INFO(log, "blacklist file not exists or non-readble, ignore it.");
LOG_INFO(log, "blocklist file not exists or non-readble, ignore it.");

}
}
#endif
}

int Server::main(const std::vector<std::string> & /*args*/)
{
setThreadName("TiFlashMain");
Expand Down Expand Up @@ -1547,8 +1619,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->setFormatSchemaPath(format_schema_path.path() + "/");
format_schema_path.createDirectories();

// We do not support blocking store by id in OP mode currently.
global_context->initializeStoreIdBlockList("");
loadBlockList(config(), *global_context, log);

LOG_INFO(log, "Loading metadata.");
loadMetadataSystem(*global_context); // Load "system" database. Its engine keeps as Ordinary.
Expand Down
41 changes: 41 additions & 0 deletions dbms/src/Storages/KVStore/Decode/RegionTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ RegionTable::Table & RegionTable::getOrCreateTable(const KeyspaceID keyspace_id,
{
// Load persisted info.
it = tables.emplace(ks_table_id, table_id).first;
addTableToIndex(keyspace_id, table_id);
LOG_INFO(log, "get new table, keyspace={} table_id={}", keyspace_id, table_id);
}
return it->second;
Expand Down Expand Up @@ -148,6 +149,7 @@ void RegionTable::removeTable(KeyspaceID keyspace_id, TableID table_id)

// Remove from table map.
tables.erase(it);
removeTableFromIndex(keyspace_id, table_id);

LOG_INFO(log, "remove table from RegionTable success, keyspace={} table_id={}", keyspace_id, table_id);
}
Expand Down Expand Up @@ -333,6 +335,22 @@ void RegionTable::handleInternalRegionsByTable(
}
}

void RegionTable::handleInternalRegionsByKeyspace(
KeyspaceID keyspace_id,
std::function<void(const TableID table_id, const InternalRegions &)> && callback) const
{
std::lock_guard lock(mutex);
auto table_set = keyspace_index.find(keyspace_id);
if (table_set != keyspace_index.end())
{
for (auto table_id : table_set->second)
{
if (auto it = tables.find(KeyspaceTableID{keyspace_id, table_id}); it != tables.end())
callback(table_id, it->second.regions);
}
}
}

std::vector<RegionID> RegionTable::getRegionIdsByTable(KeyspaceID keyspace_id, TableID table_id) const
{
fiu_do_on(FailPoints::force_set_num_regions_for_table, {
Expand Down Expand Up @@ -435,6 +453,29 @@ void RegionTable::extendRegionRange(const RegionID region_id, const RegionRangeK
}
}

void RegionTable::addTableToIndex(KeyspaceID keyspace_id, TableID table_id)
{
auto it = keyspace_index.find(keyspace_id);
if (it == keyspace_index.end())
{
keyspace_index.emplace(keyspace_id, std::unordered_set<TableID>{table_id});
}
else
{
it->second.insert(table_id);
}
}
void RegionTable::removeTableFromIndex(KeyspaceID keyspace_id, TableID table_id)
{
auto it = keyspace_index.find(keyspace_id);
if (it != keyspace_index.end())
{
it->second.erase(table_id);
if (it->second.empty())
keyspace_index.erase(it);
}
}

RegionPtrWithSnapshotFiles::RegionPtrWithSnapshotFiles(
const Base & base_,
std::vector<DM::ExternalDTFileInfo> && external_files_)
Expand Down
Loading