diff --git a/dbms/src/Flash/BatchCoprocessorHandler.cpp b/dbms/src/Flash/BatchCoprocessorHandler.cpp index 7a432732815..0c0047c4031 100644 --- a/dbms/src/Flash/BatchCoprocessorHandler.cpp +++ b/dbms/src/Flash/BatchCoprocessorHandler.cpp @@ -76,6 +76,18 @@ grpc::Status BatchCoprocessorHandler::execute() tables_regions_info.tableCount(), dag_request.DebugString()); +#if SERVERLESS_PROXY != 0 + if (cop_context.db_context.isKeyspaceInBlocklist(cop_request->context().keyspace_id()) + || cop_context.db_context.isRegionsContainsInBlocklist(tables_regions_info.getAllRegionID())) + { + LOG_DEBUG( + log, + "cop request disabled for keyspace or regions in keyspace {}", + cop_request->context().keyspace_id()); + return recordError(grpc::StatusCode::INTERNAL, "cop request disabled"); + } +#endif + DAGContext dag_context( dag_request, std::move(tables_regions_info), diff --git a/dbms/src/Flash/Coprocessor/TablesRegionsInfo.cpp b/dbms/src/Flash/Coprocessor/TablesRegionsInfo.cpp index 15dbc303790..3bcdebd4917 100644 --- a/dbms/src/Flash/Coprocessor/TablesRegionsInfo.cpp +++ b/dbms/src/Flash/Coprocessor/TablesRegionsInfo.cpp @@ -141,4 +141,22 @@ TablesRegionsInfo TablesRegionsInfo::create( return tables_regions_info; } +std::vector TablesRegionsInfo::getAllRegionID() const +{ + std::vector all_regions; + for (const auto & ele : table_regions_info_map) + { + const auto & single_table_region = ele.second; + for (const auto & region : single_table_region.local_regions) + { + all_regions.push_back(region.first); + } + for (const auto & region : single_table_region.remote_regions) + { + all_regions.push_back(region.region_id); + } + } + return all_regions; +} + } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/TablesRegionsInfo.h b/dbms/src/Flash/Coprocessor/TablesRegionsInfo.h index ece7c6f7cc6..ed2956808ef 100644 --- a/dbms/src/Flash/Coprocessor/TablesRegionsInfo.h +++ b/dbms/src/Flash/Coprocessor/TablesRegionsInfo.h @@ -80,6 +80,8 @@ class TablesRegionsInfo } UInt64 tableCount() const { return table_regions_info_map.size(); } + std::vector getAllRegionID() const; + private: bool is_single_table; std::unordered_map table_regions_info_map; diff --git a/dbms/src/Flash/CoprocessorHandler.cpp b/dbms/src/Flash/CoprocessorHandler.cpp index 4b72669941c..eef656d5f74 100644 --- a/dbms/src/Flash/CoprocessorHandler.cpp +++ b/dbms/src/Flash/CoprocessorHandler.cpp @@ -130,6 +130,18 @@ grpc::Status CoprocessorHandler::execute() TablesRegionsInfo tables_regions_info(true); auto & table_regions_info = tables_regions_info.getSingleTableRegions(); +#if SERVERLESS_PROXY != 0 + if (cop_context.db_context.isKeyspaceInBlocklist(cop_request->context().keyspace_id()) + || cop_context.db_context.isRegionInBlocklist(cop_context.kv_context.region_id())) + { + LOG_DEBUG( + log, + "cop request disabled for keyspace or regions in keyspace {}", + cop_request->context().keyspace_id()); + return recordError(grpc::StatusCode::INTERNAL, "cop request disabled"); + } +#endif + const std::unordered_set bypass_lock_ts( cop_context.kv_context.resolved_locks().begin(), cop_context.kv_context.resolved_locks().end()); @@ -200,7 +212,12 @@ grpc::Status CoprocessorHandler::execute() } catch (LockException & e) { - LOG_WARNING(log, "LockException: region_id={}, message: {}", cop_request->context().region_id(), e.message()); + LOG_WARNING( + log, + "LockException: region_id={}, message: {}, is_txn_file={}", + cop_request->context().region_id(), + e.message(), + e.locks[0].second->is_txn_file()); GET_METRIC(tiflash_coprocessor_request_error, reason_meet_lock).Increment(); if constexpr (is_stream) { diff --git a/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp b/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp index 4e6becfa937..3e403e23bc3 100644 --- a/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp +++ b/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp @@ -54,6 +54,17 @@ void WNEstablishDisaggTaskHandler::prepare(const disaggregated::EstablishDisaggT tables_regions_info.regionCount(), tables_regions_info.tableCount()); +#if SERVERLESS_PROXY != 0 + if (context->isKeyspaceInBlocklist(meta.keyspace_id()) + || context->isRegionsContainsInBlocklist(tables_regions_info.getAllRegionID())) + { + throw TiFlashException( + Errors::Coprocessor::BadRequest, + "disaggregated request disabled for keyspace or regions in keyspace", + meta.keyspace_id()); + } +#endif + // set schema ver and start ts auto schema_ver = request->schema_ver(); context->setSetting("schema_version", schema_ver); diff --git a/dbms/src/Flash/Mpp/MPPHandler.cpp b/dbms/src/Flash/Mpp/MPPHandler.cpp index 52fc2f0fe30..0226c658942 100644 --- a/dbms/src/Flash/Mpp/MPPHandler.cpp +++ b/dbms/src/Flash/Mpp/MPPHandler.cpp @@ -80,6 +80,21 @@ grpc::Status MPPHandler::execute(const ContextPtr & context, mpp::DispatchTaskRe task = MPPTask::newTask(task_request.meta(), context); task->prepare(task_request); +#if SERVERLESS_PROXY != 0 + if (context->isKeyspaceInBlocklist(task_request.meta().keyspace_id()) + || context->isRegionsContainsInBlocklist(context->getDAGContext()->tables_regions_info.getAllRegionID())) + { + LOG_DEBUG( + log, + "mpp request disabled for keyspace or regions in keyspace {}", + task_request.meta().keyspace_id()); + auto * err = response->mutable_error(); + err->set_msg("mpp request disabled"); + handleError(task, "mpp request disabled"); + return grpc::Status::OK; + } +#endif + addRetryRegion(context, response); #ifndef NDEBUG diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index 8e448d40c5a..e11be60adc5 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -188,6 +188,8 @@ struct ContextShared JointThreadInfoJeallocMapPtr joint_memory_allocation_map; /// Joint thread-wise alloc/dealloc map + std::unordered_set keyspace_blocklist; + std::unordered_set region_blocklist; std::unordered_set store_id_blocklist; /// Those store id are blocked from batch cop request. class SessionKeyHash @@ -2227,6 +2229,37 @@ void Context::setMockMPPServerInfo(MockMPPServerInfo & info) mpp_server_info = info; } +void Context::initKeyspaceBlocklist(const std::unordered_set & keyspace_ids) +{ + auto lock = getLock(); + shared->keyspace_blocklist = keyspace_ids; +} +bool Context::isKeyspaceInBlocklist(const KeyspaceID keyspace_id) +{ + auto lock = getLock(); + return shared->keyspace_blocklist.count(keyspace_id) > 0; +} +void Context::initRegionBlocklist(const std::unordered_set & region_ids) +{ + auto lock = getLock(); + shared->region_blocklist = region_ids; +} +bool Context::isRegionInBlocklist(const RegionID region_id) +{ + auto lock = getLock(); + return shared->region_blocklist.count(region_id) > 0; +} +bool Context::isRegionsContainsInBlocklist(const std::vector & regions) +{ + auto lock = getLock(); + for (const auto region : regions) + { + if (isRegionInBlocklist(region)) + return true; + } + return false; +} + const std::unordered_set * Context::getStoreIdBlockList() const { return &shared->store_id_blocklist; diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index 10b555a8a72..534b677c15e 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -27,6 +27,7 @@ #include #include #include +#include #include #include @@ -561,6 +562,12 @@ class Context void mockConfigLoaded() { is_config_loaded = true; } + void initKeyspaceBlocklist(const std::unordered_set & keyspace_ids); + bool isKeyspaceInBlocklist(KeyspaceID keyspace_id); + void initRegionBlocklist(const std::unordered_set & region_ids); + bool isRegionInBlocklist(RegionID region_id); + bool isRegionsContainsInBlocklist(const std::vector & regions); + bool initializeStoreIdBlockList(const String &); const std::unordered_set * getStoreIdBlockList() const; diff --git a/dbms/src/Interpreters/loadMetadata.cpp b/dbms/src/Interpreters/loadMetadata.cpp index dd1e2a86f1b..a3b8a3e36b8 100644 --- a/dbms/src/Interpreters/loadMetadata.cpp +++ b/dbms/src/Interpreters/loadMetadata.cpp @@ -121,6 +121,16 @@ void loadMetadata(Context & context) if (db_name == SYSTEM_DATABASE) continue; +#if SERVERLESS_PROXY == 1 + // Ignore database owned by keyspace in blacklist + auto keyspace_id = SchemaNameMapper::getMappedNameKeyspaceID(db_name); + if (context.isKeyspaceInBlocklist(keyspace_id)) + { + LOG_WARNING(log, "database {} ignored because keyspace in blacklist, keyspace={}", db_name, keyspace_id); + continue; + } +#endif + databases.emplace(db_name, path + file); } diff --git a/dbms/src/Server/MetricsPrometheus.cpp b/dbms/src/Server/MetricsPrometheus.cpp index 045ad4b4f14..8daf2b24702 100644 --- a/dbms/src/Server/MetricsPrometheus.cpp +++ b/dbms/src/Server/MetricsPrometheus.cpp @@ -164,7 +164,13 @@ std::shared_ptr getHTTPServer( key_path, cert_path, ca_path, - Poco::Net::Context::VerificationMode::VERIFY_STRICT); +#if SERVERLESS_PROXY == 0 + Poco::Net::Context::VerificationMode::VERIFY_STRICT +#else + // mtls: metrics server allows anonymous pullers @iosmanthus + Poco::Net::Context::VerificationMode::VERIFY_RELAXED +#endif + ); auto check_common_name = [&](const Poco::Crypto::X509Certificate & cert) { return global_context.getSecurityConfig()->checkCommonName(cert); diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 994a352686e..f5eb33577b6 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -324,6 +324,10 @@ struct TiFlashProxyConfig args_map["engine-label"] = getProxyLabelByDisaggregatedMode(disaggregated_mode); if (disaggregated_mode != DisaggregatedMode::Compute && has_s3_config) args_map["engine-role-label"] = DISAGGREGATED_MODE_WRITE_ENGINE_ROLE; +#if SERVERLESS_PROXY == 1 + if (config.has("blacklist_file")) + args_map["blacklist-ile"] = config.getString("blacklist_file"); +#endif for (auto && [k, v] : args_map) val_map.emplace("--" + k, std::move(v)); @@ -961,6 +965,74 @@ void syncSchemaWithTiDB( global_context->initializeSchemaSyncService(); } +void loadBlockList( + [[maybe_unused]] const Poco::Util::LayeredConfiguration & config, + Context & global_context, + [[maybe_unused]] const LoggerPtr & log) +{ +#if SERVERLESS_PROXY != 1 + // We do not support blocking store by id in OP mode currently. + global_context.initializeStoreIdBlockList(""); +#else + global_context.initializeStoreIdBlockList(global_context.getSettingsRef().disagg_blocklist_wn_store_id); + + /// Load keyspace blacklist json file + LOG_INFO(log, "Loading blacklist file."); + auto blacklist_file_path = config.getString("blacklist_file", ""); + if (blacklist_file_path.length() == 0) + { + LOG_INFO(log, "blacklist file not enabled, ignore it."); + } + else + { + auto blacklist_file = Poco::File(blacklist_file_path); + if (blacklist_file.exists() && blacklist_file.isFile() && blacklist_file.canRead()) + { + // Read the json file + std::ifstream ifs(blacklist_file_path); + std::string json_content((std::istreambuf_iterator(ifs)), std::istreambuf_iterator()); + Poco::JSON::Parser parser; + Poco::Dynamic::Var json_var = parser.parse(json_content); + const auto & json_obj = json_var.extract(); + + // load keyspace list + auto keyspace_arr = json_obj->getArray("keyspace_ids"); + if (!keyspace_arr.isNull()) + { + std::unordered_set keyspace_blocklist; + for (size_t i = 0; i < keyspace_arr->size(); i++) + { + keyspace_blocklist.emplace(keyspace_arr->getElement(i)); + } + global_context.initKeyspaceBlocklist(keyspace_blocklist); + } + + // load region list + auto region_arr = json_obj->getArray("region_ids"); + if (!region_arr.isNull()) + { + std::unordered_set region_blocklist; + for (size_t i = 0; i < region_arr->size(); i++) + { + region_blocklist.emplace(region_arr->getElement(i)); + } + global_context.initRegionBlocklist(region_blocklist); + } + + LOG_INFO( + log, + "Load blacklist file done, total {} keyspaces and {} regions in blacklist.", + keyspace_arr.isNull() ? 0 : keyspace_arr->size(), + region_arr.isNull() ? 0 : region_arr->size()); + } + else + { + LOG_INFO(log, "blacklist file not exists or non-readble, ignore it."); + } + } +#endif +} + int Server::main(const std::vector & /*args*/) { setThreadName("TiFlashMain"); @@ -1547,8 +1619,7 @@ int Server::main(const std::vector & /*args*/) global_context->setFormatSchemaPath(format_schema_path.path() + "/"); format_schema_path.createDirectories(); - // We do not support blocking store by id in OP mode currently. - global_context->initializeStoreIdBlockList(""); + loadBlockList(config(), *global_context, log); LOG_INFO(log, "Loading metadata."); loadMetadataSystem(*global_context); // Load "system" database. Its engine keeps as Ordinary. diff --git a/dbms/src/Storages/KVStore/Decode/RegionTable.cpp b/dbms/src/Storages/KVStore/Decode/RegionTable.cpp index 4425bd932c0..f09a77cc093 100644 --- a/dbms/src/Storages/KVStore/Decode/RegionTable.cpp +++ b/dbms/src/Storages/KVStore/Decode/RegionTable.cpp @@ -54,6 +54,7 @@ RegionTable::Table & RegionTable::getOrCreateTable(const KeyspaceID keyspace_id, { // Load persisted info. it = tables.emplace(ks_table_id, table_id).first; + addTableToIndex(keyspace_id, table_id); LOG_INFO(log, "get new table, keyspace={} table_id={}", keyspace_id, table_id); } return it->second; @@ -148,6 +149,7 @@ void RegionTable::removeTable(KeyspaceID keyspace_id, TableID table_id) // Remove from table map. tables.erase(it); + removeTableFromIndex(keyspace_id, table_id); LOG_INFO(log, "remove table from RegionTable success, keyspace={} table_id={}", keyspace_id, table_id); } @@ -333,6 +335,22 @@ void RegionTable::handleInternalRegionsByTable( } } +void RegionTable::handleInternalRegionsByKeyspace( + KeyspaceID keyspace_id, + std::function && callback) const +{ + std::lock_guard lock(mutex); + auto table_set = keyspace_index.find(keyspace_id); + if (table_set != keyspace_index.end()) + { + for (auto table_id : table_set->second) + { + if (auto it = tables.find(KeyspaceTableID{keyspace_id, table_id}); it != tables.end()) + callback(table_id, it->second.regions); + } + } +} + std::vector RegionTable::getRegionIdsByTable(KeyspaceID keyspace_id, TableID table_id) const { fiu_do_on(FailPoints::force_set_num_regions_for_table, { @@ -435,6 +453,29 @@ void RegionTable::extendRegionRange(const RegionID region_id, const RegionRangeK } } +void RegionTable::addTableToIndex(KeyspaceID keyspace_id, TableID table_id) +{ + auto it = keyspace_index.find(keyspace_id); + if (it == keyspace_index.end()) + { + keyspace_index.emplace(keyspace_id, std::unordered_set{table_id}); + } + else + { + it->second.insert(table_id); + } +} +void RegionTable::removeTableFromIndex(KeyspaceID keyspace_id, TableID table_id) +{ + auto it = keyspace_index.find(keyspace_id); + if (it != keyspace_index.end()) + { + it->second.erase(table_id); + if (it->second.empty()) + keyspace_index.erase(it); + } +} + RegionPtrWithSnapshotFiles::RegionPtrWithSnapshotFiles( const Base & base_, std::vector && external_files_) diff --git a/dbms/src/Storages/KVStore/Decode/RegionTable.h b/dbms/src/Storages/KVStore/Decode/RegionTable.h index d0eaf4f80ec..790e196c352 100644 --- a/dbms/src/Storages/KVStore/Decode/RegionTable.h +++ b/dbms/src/Storages/KVStore/Decode/RegionTable.h @@ -118,6 +118,9 @@ class RegionTable : private boost::noncopyable KeyspaceID keyspace_id, TableID table_id, std::function && callback) const; + void handleInternalRegionsByKeyspace( + KeyspaceID keyspace_id, + std::function && callback) const; std::vector getRegionIdsByTable(KeyspaceID keyspace_id, TableID table_id) const; std::vector> getRegionsByTable(KeyspaceID keyspace_id, TableID table_id) const; @@ -182,6 +185,8 @@ class RegionTable : private boost::noncopyable InternalRegion & insertRegion(Table & table, const RegionRangeKeys & region_range_keys, RegionID region_id); InternalRegion & insertRegion(Table & table, const Region & region); InternalRegion & doGetInternalRegion(KeyspaceTableID ks_table_id, RegionID region_id); + void addTableToIndex(KeyspaceID keyspace_id, TableID table_id); + void removeTableFromIndex(KeyspaceID keyspace_id, TableID table_id); private: using TableMap = std::unordered_map>; @@ -189,6 +194,10 @@ class RegionTable : private boost::noncopyable using RegionInfoMap = std::unordered_map; RegionInfoMap regions; + + using KeyspaceIndex = std::unordered_map, boost::hash>; + KeyspaceIndex keyspace_index; + SafeTsMap safe_ts_map; Context * const context; diff --git a/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp b/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp index f143b693775..f1afd5ab909 100644 --- a/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp +++ b/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp @@ -305,13 +305,14 @@ CppStrWithView HandleReadPage(const EngineStoreServerWrap * server, BaseBuffView { auto uni_ps = server->tmt->getContext().getWriteNodePageStorage(); RaftDataReader reader(*uni_ps); - auto * page = new Page(reader.read(UniversalPageId(page_id.data, page_id.len))); - if (page->isValid()) + auto p = reader.read(UniversalPageId(page_id.data, page_id.len)); + if (p.isValid()) { LOG_TRACE( &Poco::Logger::get("ProxyFFI"), "FFI read page {} success", UniversalPageId(page_id.data, page_id.len)); + auto * page = new Page(std::move(p)); return CppStrWithView{ .inner = GenRawCppPtr(page, RawCppPtrTypeImpl::UniversalPage), .view = BaseBuffView{page->data.begin(), page->data.size()}, @@ -1009,11 +1010,7 @@ BaseBuffView GetLockByKey(const EngineStoreServerWrap * server, uint64_t region_ if (!value) { // key not exist - LOG_WARNING( - Logger::get(), - "Failed to get lock by key {}, region_id={}", - tikv_key.toDebugString(), - region_id); + LOG_DEBUG(Logger::get(), "Failed to get lock by key {}, region_id={}", tikv_key.toDebugString(), region_id); return BaseBuffView{}; } @@ -1072,4 +1069,4 @@ void ReportThreadAllocateBatch( } } -} // namespace DB \ No newline at end of file +} // namespace DB diff --git a/dbms/src/Storages/KVStore/FFI/ProxyFFIStatusService.cpp b/dbms/src/Storages/KVStore/FFI/ProxyFFIStatusService.cpp index 41b72968164..ba8900781f4 100644 --- a/dbms/src/Storages/KVStore/FFI/ProxyFFIStatusService.cpp +++ b/dbms/src/Storages/KVStore/FFI/ProxyFFIStatusService.cpp @@ -105,7 +105,6 @@ HttpRequestRes HandleHttpRequestSyncStatus( const size_t max_print_region = 30; static const std::chrono::minutes PRINT_LOG_INTERVAL = std::chrono::minutes{5}; static Timepoint last_print_log_time = Clock::now(); - // TODO(iosmanthus): TiDB should support tiflash replica. RegionTable & region_table = tmt.getRegionTable(); // Note that the IStorage instance could be not exist if there is only empty region for the table in this TiFlash instance. region_table.handleInternalRegionsByTable(keyspace_id, table_id, [&](const RegionTable::InternalRegions & regions) { @@ -287,6 +286,70 @@ HttpRequestRes HandleHttpRequestRemoteGC( }; } +// Acquiring the all the region ids created in this TiFlash node with given keyspace id. +HttpRequestRes HandleHttpRequestSyncRegion( + EngineStoreServerWrap * server, + std::string_view path, + const std::string & api_name, + std::string_view, + std::string_view) +{ + HttpRequestStatus status = HttpRequestStatus::Ok; + pingcap::pd::KeyspaceID keyspace_id = NullspaceID; + { + auto log = Logger::get("HandleHttpRequestSyncRegion"); + LOG_TRACE(log, "handling sync region request, path: {}, api_name: {}", path, api_name); + // schema: /keyspace/{keyspace_id} + auto query = path.substr(api_name.size()); + std::vector query_parts; + boost::split(query_parts, query, boost::is_any_of("/")); + if (query_parts.size() != 2 || query_parts[0] != "keyspace") + { + LOG_ERROR(log, "invalid SyncRegion request: {}", query); + status = HttpRequestStatus::ErrorParam; + return HttpRequestRes{ + .status = status, + .res = CppStrWithView{.inner = GenRawCppPtr(), .view = BaseBuffView{nullptr, 0}}}; + } + try + { + keyspace_id = std::stoll(query_parts[1]); + } + catch (...) + { + status = HttpRequestStatus::ErrorParam; + } + if (status != HttpRequestStatus::Ok) + return HttpRequestRes{ + .status = status, + .res = CppStrWithView{.inner = GenRawCppPtr(), .view = BaseBuffView{nullptr, 0}}}; + } + auto & tmt = *server->tmt; + std::stringstream ss; + Poco::JSON::Object::Ptr json = new Poco::JSON::Object(); + Poco::JSON::Array::Ptr list = new Poco::JSON::Array(); + RegionTable & region_table = tmt.getRegionTable(); + region_table.handleInternalRegionsByKeyspace( + keyspace_id, + [&](const TableID table_id, const RegionTable::InternalRegions & regions) { + if (!tmt.getStorages().get(keyspace_id, table_id)) + return; + for (const auto & region : regions) + { + list->add(region.first); + } + }); + json->set("count", list->size()); + json->set("regions", list); + json->stringify(ss); + auto * s = RawCppString::New(ss.str()); + return HttpRequestRes{ + .status = status, + .res = CppStrWithView{ + .inner = GenRawCppPtr(s, RawCppPtrTypeImpl::String), + .view = BaseBuffView{s->data(), s->size()}}}; +} + // Acquiring load schema to sync schema from TiKV in this TiFlash node with given keyspace id. HttpRequestRes HandleHttpRequestSyncSchema( EngineStoreServerWrap * server, @@ -391,6 +454,7 @@ using HANDLE_HTTP_URI_METHOD = HttpRequestRes (*)( static const std::map AVAILABLE_HTTP_URI = { {"/tiflash/sync-status/", HandleHttpRequestSyncStatus}, + {"/tiflash/sync-region/", HandleHttpRequestSyncRegion}, {"/tiflash/sync-schema/", HandleHttpRequestSyncSchema}, {"/tiflash/store-status", HandleHttpRequestStoreStatus}, {"/tiflash/remote/owner/info", HandleHttpRequestRemoteOwnerInfo}, diff --git a/dbms/src/Storages/KVStore/KVStore.h b/dbms/src/Storages/KVStore/KVStore.h index d18d2848613..1d8cbf37276 100644 --- a/dbms/src/Storages/KVStore/KVStore.h +++ b/dbms/src/Storages/KVStore/KVStore.h @@ -230,6 +230,7 @@ class KVStore final : private boost::noncopyable size_t getOngoingPrehandleSubtaskCount() const; EngineStoreApplyRes handleIngestSST(UInt64 region_id, SSTViewVec, UInt64 index, UInt64 term, TMTContext & tmt); size_t getMaxParallelPrehandleSize() const; + size_t getMaxPrehandleSubtaskSize() const; public: // Raft Read void addReadIndexEvent(Int64 f) { read_index_event_flag += f; } diff --git a/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp b/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp index e0b8ce26862..db6e0956560 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp @@ -302,6 +302,18 @@ PrehandleResult KVStore::preHandleSnapshotToFiles( } size_t KVStore::getMaxParallelPrehandleSize() const +{ +#if SERVERLESS_PROXY == 0 + return getMaxPrehandleSubtaskSize(); +#else + auto max_subtask_size = getMaxPrehandleSubtaskSize(); + // In serverless mode, IO takes more part in decoding stage, so we can increase parallel limit. + // In real test, the prehandling speed decreases if we use higher concurrency. + return std::min(4, max_subtask_size); +#endif +} + +size_t KVStore::getMaxPrehandleSubtaskSize() const { const auto & proxy_config = getProxyConfigSummay(); size_t total_concurrency = 0; @@ -356,6 +368,11 @@ static inline std::pair, size_t> getSplitKey( // so we must add 1 here. auto ongoing_count = kvstore->getOngoingPrehandleSubtaskCount() + 1; uint64_t want_split_parts = 0; + // If total_concurrency is 4, and prehandle-pool is sized 8, + // and if there are 4 ongoing snapshots, then we will not parallel prehandling any new snapshot. + // This is because in serverless, too much parallelism causes performance reduction. + // So, if there is already enough parallelism that is used to prehandle, + // it is not necessary to manually split a snapshot. auto total_concurrency = kvstore->getMaxParallelPrehandleSize(); if (total_concurrency + 1 > ongoing_count) { @@ -718,7 +735,7 @@ PrehandleResult KVStore::preHandleSSTsToDTFiles( // `split_keys` do not begin with 'z'. auto [split_keys, approx_bytes] = getSplitKey(log, this, new_region, sst_stream); - prehandling_trace.waitForSubtaskResources(region_id, split_keys.size() + 1, getMaxParallelPrehandleSize()); + prehandling_trace.waitForSubtaskResources(region_id, split_keys.size() + 1, getMaxPrehandleSubtaskSize()); ReadFromStreamResult result; if (split_keys.empty()) { diff --git a/dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp b/dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp index 7831dee1ab0..10b92b3abe1 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp @@ -246,8 +246,9 @@ std::shared_ptr RegionData::getLockByKey(const TiKVKey & key) c return tikv_val; } - // It is safe to ignore the missing lock key after restart, print a warning log and return nullptr - LOG_WARNING( + // It is safe to ignore the missing lock key after restart, print a debug log and return nullptr. + // In txn file prewrite, proxy will try to get the lock to check the txn existence, the logs may be massive. + LOG_DEBUG( Logger::get(), "Failed to get lock by key in region data, key={} map_size={} count={}", key.toDebugString(), diff --git a/dbms/src/Storages/KVStore/MultiRaft/RegionPersister.cpp b/dbms/src/Storages/KVStore/MultiRaft/RegionPersister.cpp index b9823f4c560..ac88c351718 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RegionPersister.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/RegionPersister.cpp @@ -398,6 +398,22 @@ RegionMap RegionPersister::restore( "region_id and page_id not match! region_id={} page_id={}", region->id(), page.page_id); +#if SERVERLESS_PROXY == 1 + if (global_context.isKeyspaceInBlocklist(region->getKeyspaceID())) + { + LOG_WARNING( + log, + "Region skip restore because keyspace in blacklist, region_id={} keyspace={}", + region->id(), + region->getKeyspaceID()); + return; + } + if (global_context.isRegionInBlocklist(region->id())) + { + LOG_WARNING(log, "Region skip restore because region_id in blacklist, region_id={}", region->id()); + return; + } +#endif regions.emplace(page.page_id, region); }; diff --git a/dbms/src/Storages/KVStore/Read/LockException.h b/dbms/src/Storages/KVStore/Read/LockException.h index fac190e9bc3..d0fb54660ec 100644 --- a/dbms/src/Storages/KVStore/Read/LockException.h +++ b/dbms/src/Storages/KVStore/Read/LockException.h @@ -16,6 +16,7 @@ #include #include +#include #include namespace DB @@ -34,10 +35,29 @@ class LockException : public Exception , locks(std::move(locks_)) { std::set locked_regions; +#if SERVERLESS_PROXY == 0 for (const auto & lock : locks) locked_regions.insert(lock.first); this->message(fmt::format("Key is locked ({} locks in regions {})", locks.size(), locked_regions)); +#else + std::set keys; + std::set primary_keys; + for (const auto & lock : locks) + { + locked_regions.insert(lock.first); + std::string key(lock.second->key()); + std::string primary_key(lock.second->primary_lock()); + keys.insert(TiKVKey(key.data(), key.size()).toDebugString()); + primary_keys.insert(TiKVKey(primary_key.data(), primary_key.size()).toDebugString()); + } + this->message(fmt::format( + "Key is locked ({} locks in regions {} key {} primary {})", + locks.size(), + locked_regions, + keys, + primary_keys)); +#endif } std::vector> locks; diff --git a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp index fdb10662b65..309e4c3fce6 100644 --- a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp +++ b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp @@ -62,7 +62,11 @@ namespace RecordKVFormat { case SHORT_VALUE_PREFIX: { +#if SERVERLESS_PROXY != 0 + size_t str_len = readVarUInt(data, len); +#else size_t str_len = readUInt8(data, len); +#endif if (len < str_len) throw Exception("content len shorter than short value len", ErrorCodes::LOGICAL_ERROR); // no need short value diff --git a/dbms/src/Storages/KVStore/TiKVHelpers/TiKVRecordFormat.cpp b/dbms/src/Storages/KVStore/TiKVHelpers/TiKVRecordFormat.cpp index 508a6cb7fac..ffbdbe268ab 100644 --- a/dbms/src/Storages/KVStore/TiKVHelpers/TiKVRecordFormat.cpp +++ b/dbms/src/Storages/KVStore/TiKVHelpers/TiKVRecordFormat.cpp @@ -36,4 +36,130 @@ TiKVKey genKey(const TiDB::TableInfo & table_info, std::vector keys) } return encodeAsTiKVKey(key + ss.releaseStr()); } + +TiKVValue encodeLockCfValue( + UInt8 lock_type, + const String & primary, + Timestamp ts, + UInt64 ttl, + const String * short_value, + Timestamp min_commit_ts) +{ + WriteBufferFromOwnString res; + res.write(lock_type); + TiKV::writeVarInt(static_cast(primary.size()), res); + res.write(primary.data(), primary.size()); + TiKV::writeVarUInt(ts, res); + TiKV::writeVarUInt(ttl, res); + if (short_value) + { +#if SERVERLESS_PROXY != 0 + TiKV::writeVarUInt(short_value->size(), res); +#else + res.write(SHORT_VALUE_PREFIX); +#endif + res.write(static_cast(short_value->size())); + res.write(short_value->data(), short_value->size()); + } + if (min_commit_ts) + { + res.write(MIN_COMMIT_TS_PREFIX); + encodeUInt64(min_commit_ts, res); + } + return TiKVValue(res.releaseStr()); +} + +DecodedWriteCFValue decodeWriteCfValue(const TiKVValue & value) +{ + const char * data = value.data(); + size_t len = value.dataSize(); + + auto write_type = RecordKVFormat::readUInt8(data, len); //write type + + bool can_ignore = write_type != CFModifyFlag::DelFlag && write_type != CFModifyFlag::PutFlag; + if (can_ignore) + return std::nullopt; + + Timestamp prewrite_ts = RecordKVFormat::readVarUInt(data, len); // ts + + std::string_view short_value; + while (len) + { + auto flag = RecordKVFormat::readUInt8(data, len); + switch (flag) + { + case RecordKVFormat::SHORT_VALUE_PREFIX: + { +#if SERVERLESS_PROXY != 0 + size_t slen = RecordKVFormat::readVarUInt(data, len); +#else + size_t slen = RecordKVFormat::readUInt8(data, len); +#endif + if (slen > len) + throw Exception("content len not equal to short value len", ErrorCodes::LOGICAL_ERROR); + short_value = RecordKVFormat::readRawString(data, len, slen); + break; + } + case RecordKVFormat::FLAG_OVERLAPPED_ROLLBACK: + // ignore + break; + case RecordKVFormat::GC_FENCE_PREFIX: + /** + * according to https://github.com/tikv/tikv/pull/9207, when meet `GC fence` flag, it is definitely a + * rewriting record and there must be a complete row written to tikv, just ignore it in tiflash. + */ + return std::nullopt; + case RecordKVFormat::LAST_CHANGE_PREFIX: + { + // Used to accelerate TiKV MVCC scan, useless for TiFlash. + UInt64 last_change_ts = readUInt64(data, len); + UInt64 versions_to_last_change = readVarUInt(data, len); + UNUSED(last_change_ts); + UNUSED(versions_to_last_change); + break; + } + case RecordKVFormat::TXN_SOURCE_PREFIX_FOR_WRITE: + { + // Used for CDC, useless for TiFlash. + UInt64 txn_source_prefic = readVarUInt(data, len); + UNUSED(txn_source_prefic); + break; + } + default: + throw Exception("invalid flag " + std::to_string(flag) + " in write cf", ErrorCodes::LOGICAL_ERROR); + } + } + + return InnerDecodedWriteCFValue{ + write_type, + prewrite_ts, + short_value.empty() ? nullptr : std::make_shared(short_value.data(), short_value.length()), + }; +} + +TiKVValue encodeWriteCfValue(UInt8 write_type, Timestamp ts, std::string_view short_value, bool gc_fence) +{ + WriteBufferFromOwnString res; + res.write(write_type); + TiKV::writeVarUInt(ts, res); + if (!short_value.empty()) + { + res.write(SHORT_VALUE_PREFIX); +#if SERVERLESS_PROXY != 0 + TiKV::writeVarUInt(short_value.size(), res); +#else + res.write(static_cast(short_value.size())); +#endif + res.write(short_value.data(), short_value.size()); + } + // just for test + res.write(FLAG_OVERLAPPED_ROLLBACK); + if (gc_fence) + { + res.write(GC_FENCE_PREFIX); + encodeUInt64(8888, res); + } + return TiKVValue(res.releaseStr()); +} + } // namespace DB::RecordKVFormat diff --git a/dbms/src/Storages/KVStore/TiKVHelpers/TiKVRecordFormat.h b/dbms/src/Storages/KVStore/TiKVHelpers/TiKVRecordFormat.h index 37ed4b65775..315d3490025 100644 --- a/dbms/src/Storages/KVStore/TiKVHelpers/TiKVRecordFormat.h +++ b/dbms/src/Storages/KVStore/TiKVHelpers/TiKVRecordFormat.h @@ -258,33 +258,13 @@ inline TiKVKey genKey(TableID tableId, HandleID handleId, Timestamp ts) return appendTs(key, ts); } -inline TiKVValue encodeLockCfValue( +TiKVValue encodeLockCfValue( UInt8 lock_type, const String & primary, Timestamp ts, UInt64 ttl, const String * short_value = nullptr, - Timestamp min_commit_ts = 0) -{ - WriteBufferFromOwnString res; - res.write(lock_type); - TiKV::writeVarInt(static_cast(primary.size()), res); - res.write(primary.data(), primary.size()); - TiKV::writeVarUInt(ts, res); - TiKV::writeVarUInt(ttl, res); - if (short_value) - { - res.write(SHORT_VALUE_PREFIX); - res.write(static_cast(short_value->size())); - res.write(short_value->data(), short_value->size()); - } - if (min_commit_ts) - { - res.write(MIN_COMMIT_TS_PREFIX); - encodeUInt64(min_commit_ts, res); - } - return TiKVValue(res.releaseStr()); -} + Timestamp min_commit_ts = 0); template inline R readVarInt(const char *& data, size_t & len) @@ -360,93 +340,9 @@ struct InnerDecodedWriteCFValue using DecodedWriteCFValue = std::optional; -inline DecodedWriteCFValue decodeWriteCfValue(const TiKVValue & value) -{ - const char * data = value.data(); - size_t len = value.dataSize(); - - auto write_type = RecordKVFormat::readUInt8(data, len); //write type - - bool can_ignore = write_type != CFModifyFlag::DelFlag && write_type != CFModifyFlag::PutFlag; - if (can_ignore) - return std::nullopt; +DecodedWriteCFValue decodeWriteCfValue(const TiKVValue & value); - Timestamp prewrite_ts = RecordKVFormat::readVarUInt(data, len); // ts - - std::string_view short_value; - while (len) - { - auto flag = RecordKVFormat::readUInt8(data, len); - switch (flag) - { - case RecordKVFormat::SHORT_VALUE_PREFIX: - { - size_t slen = RecordKVFormat::readUInt8(data, len); - if (slen > len) - throw Exception("content len not equal to short value len", ErrorCodes::LOGICAL_ERROR); - short_value = RecordKVFormat::readRawString(data, len, slen); - break; - } - case RecordKVFormat::FLAG_OVERLAPPED_ROLLBACK: - // ignore - break; - case RecordKVFormat::GC_FENCE_PREFIX: - /** - * according to https://github.com/tikv/tikv/pull/9207, when meet `GC fence` flag, it is definitely a - * rewriting record and there must be a complete row written to tikv, just ignore it in tiflash. - */ - return std::nullopt; - case RecordKVFormat::LAST_CHANGE_PREFIX: - { - // Used to accelerate TiKV MVCC scan, useless for TiFlash. - UInt64 last_change_ts = readUInt64(data, len); - UInt64 versions_to_last_change = readVarUInt(data, len); - UNUSED(last_change_ts); - UNUSED(versions_to_last_change); - break; - } - case RecordKVFormat::TXN_SOURCE_PREFIX_FOR_WRITE: - { - // Used for CDC, useless for TiFlash. - UInt64 txn_source_prefic = readVarUInt(data, len); - UNUSED(txn_source_prefic); - break; - } - default: - throw Exception("invalid flag " + std::to_string(flag) + " in write cf", ErrorCodes::LOGICAL_ERROR); - } - } - - return InnerDecodedWriteCFValue{ - write_type, - prewrite_ts, - short_value.empty() ? nullptr : std::make_shared(short_value.data(), short_value.length())}; -} - -inline TiKVValue encodeWriteCfValue( - UInt8 write_type, - Timestamp ts, - std::string_view short_value = {}, - bool gc_fence = false) -{ - WriteBufferFromOwnString res; - res.write(write_type); - TiKV::writeVarUInt(ts, res); - if (!short_value.empty()) - { - res.write(SHORT_VALUE_PREFIX); - res.write(static_cast(short_value.size())); - res.write(short_value.data(), short_value.size()); - } - // just for test - res.write(FLAG_OVERLAPPED_ROLLBACK); - if (gc_fence) - { - res.write(GC_FENCE_PREFIX); - encodeUInt64(8888, res); - } - return TiKVValue(res.releaseStr()); -} +TiKVValue encodeWriteCfValue(UInt8 write_type, Timestamp ts, std::string_view short_value = {}, bool gc_fence = false); template inline std::string DecodedTiKVKeyToDebugString(const DecodedTiKVKey & decoded_key) diff --git a/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp b/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp index e9b126df194..54eb5406a4a 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp @@ -1142,7 +1142,11 @@ CATCH TEST(ProxyMode, Normal) try { +#if SERVERLESS_PROXY == 0 ASSERT_EQ(SERVERLESS_PROXY, 0); +#else + ASSERT_EQ(SERVERLESS_PROXY, 1); +#endif } CATCH diff --git a/dbms/src/Storages/KVStore/tests/gtest_tikv_keyvalue.cpp b/dbms/src/Storages/KVStore/tests/gtest_tikv_keyvalue.cpp index c43dc3adf29..b9a6cf526dc 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_tikv_keyvalue.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_tikv_keyvalue.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -98,7 +99,9 @@ inline bool checkTableInvolveRange(const TableID table_id, const RangeRef & rang { const TiKVKey start_key = RecordKVFormat::genKey(table_id, std::numeric_limits::min()); const TiKVKey end_key = RecordKVFormat::genKey(table_id, std::numeric_limits::max()); - return !(end_key < range.first || (!range.second.empty() && start_key >= range.second)); + // clang-format off + return !(end_key < range.first|| (!range.second.empty() && start_key >= range.second)); // NOLINT(readability-simplify-boolean-expr) + // clang-format on } inline TiKVKey genIndex(const TableID tableId, const Int64 id) @@ -113,6 +116,42 @@ inline TiKVKey genIndex(const TableID tableId, const Int64 id) return RecordKVFormat::encodeAsTiKVKey(key); } +TEST(TiKVKeyValueTest, KeyFormat) +{ + Timestamp prewrite_ts = 5; + { + std::string short_value(128, 'F'); + auto v = RecordKVFormat::encodeWriteCfValue( + RecordKVFormat::CFModifyFlag::PutFlag, + prewrite_ts, + short_value, + false); + auto decoded = RecordKVFormat::decodeWriteCfValue(v); + ASSERT_TRUE(decoded.has_value()); + ASSERT_EQ(decoded->write_type, RecordKVFormat::CFModifyFlag::PutFlag); + ASSERT_EQ(decoded->prewrite_ts, prewrite_ts); + ASSERT_NE(decoded->short_value, nullptr); + ASSERT_EQ(*decoded->short_value, short_value); + } +#if SERVERLESS_PROXY != 0 + { + // For serverless branch, the short_value length use varUInt + std::string short_value(1025, 'F'); + auto v = RecordKVFormat::encodeWriteCfValue( + RecordKVFormat::CFModifyFlag::PutFlag, + prewrite_ts, + short_value, + false); + auto decoded = RecordKVFormat::decodeWriteCfValue(v); + ASSERT_TRUE(decoded.has_value()); + ASSERT_EQ(decoded->write_type, RecordKVFormat::CFModifyFlag::PutFlag); + ASSERT_EQ(decoded->prewrite_ts, prewrite_ts); + ASSERT_NE(decoded->short_value, nullptr); + ASSERT_EQ(*decoded->short_value, short_value); + } +#endif +} + TEST(TiKVKeyValueTest, PortedTests) { { diff --git a/dbms/src/Storages/Page/Page.h b/dbms/src/Storages/Page/Page.h index 7a5dc2977c3..61f5c01f4a4 100644 --- a/dbms/src/Storages/Page/Page.h +++ b/dbms/src/Storages/Page/Page.h @@ -47,7 +47,7 @@ struct FieldOffsetInsidePage bool operator<(const FieldOffsetInsidePage & rhs) const { return index < rhs.index; } }; -struct Page +class Page { public: static Page invalidPage() @@ -69,7 +69,7 @@ struct Page std::set field_offsets; private: - bool is_valid; + bool is_valid = false; public: inline bool isValid() const { return is_valid; } diff --git a/dbms/src/TiDB/Schema/TiDBSchemaSyncer.cpp b/dbms/src/TiDB/Schema/TiDBSchemaSyncer.cpp index c56cb6d51ea..7ff076a3e06 100644 --- a/dbms/src/TiDB/Schema/TiDBSchemaSyncer.cpp +++ b/dbms/src/TiDB/Schema/TiDBSchemaSyncer.cpp @@ -84,10 +84,17 @@ bool TiDBSchemaSyncer::syncSchemasByGetter(Context & c } else { +#if SERVERLESS_PROXY == 0 if (version <= cur_version) { return false; } +#else + if (version == cur_version) + { + return false; + } +#endif LOG_INFO( log, @@ -100,6 +107,19 @@ bool TiDBSchemaSyncer::syncSchemasByGetter(Context & c // first load all db and tables cur_version = syncAllSchemas(context, getter, version); } +#if SERVERLESS_PROXY == 1 + // if the `version` is less than `cur_version`, it means that the schema version in TiKV has been rolled back by restore. + // We should sync the schema again. + else if (version < cur_version) + { + LOG_INFO( + log, + "The latest schema version is less than current version, sync all schema, version={} cur_version={}", + version, + cur_version); + cur_version = syncAllSchemas(context, getter, version); + } +#endif else { // After the feature concurrent DDL, TiDB does `update schema version` before `set schema diff`, and they are done in separate transactions. diff --git a/format-diff.py b/format-diff.py index 0ffb6d74c2a..0fad0f9b692 100755 --- a/format-diff.py +++ b/format-diff.py @@ -1,4 +1,4 @@ -#!/usr/bin/python3 +#!/usr/bin/env python3 # Copyright 2023 PingCAP, Inc. # # Licensed under the Apache License, Version 2.0 (the "License");