diff --git a/conanfile.py b/conanfile.py index b707b7e7..d1659fea 100644 --- a/conanfile.py +++ b/conanfile.py @@ -10,7 +10,7 @@ class HomeObjectConan(ConanFile): name = "homeobject" - version = "3.0.19" + version = "3.0.20" homepage = "https://github.com/eBay/HomeObject" description = "Blob Store built on HomeStore" diff --git a/src/lib/homestore_backend/gc_manager.cpp b/src/lib/homestore_backend/gc_manager.cpp index 4b9ca70b..51beadd3 100644 --- a/src/lib/homestore_backend/gc_manager.cpp +++ b/src/lib/homestore_backend/gc_manager.cpp @@ -25,14 +25,14 @@ SISL_LOGGING_DECL(gcmgr) GCManager::GCManager(HSHomeObject* homeobject) : m_chunk_selector{homeobject->chunk_selector()}, m_hs_home_object{homeobject} { homestore::meta_service().register_handler( - GCManager::_gc_actor_meta_name, + _gc_actor_meta_name, [this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) { on_gc_actor_meta_blk_found(std::move(buf), voidptr_cast(mblk)); }, nullptr, true); homestore::meta_service().register_handler( - GCManager::_gc_reserved_chunk_meta_name, + _gc_reserved_chunk_meta_name, [this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) { on_reserved_chunk_meta_blk_found(std::move(buf), voidptr_cast(mblk)); }, @@ -44,7 +44,7 @@ GCManager::GCManager(HSHomeObject* homeobject) : true); homestore::meta_service().register_handler( - GCManager::_gc_task_meta_name, + _gc_task_meta_name, [this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) { on_gc_task_meta_blk_found(std::move(buf), voidptr_cast(mblk)); }, @@ -63,8 +63,8 @@ void GCManager::on_gc_task_meta_blk_found(sisl::byte_view const& buf, void* meta // here, we are under the protection of the lock of metaservice. however, we will also try to update pg and shard // metablk and then destroy the gc_task_sb, which will also try to acquire the lock of metaservice, as a result, a - // dead lock will happen. so here we will handle all the gc tasks after read all the their metablks - m_recovered_gc_tasks.emplace_back(GCManager::_gc_task_meta_name); + // dead lock will happen. so here we will handle all the gc tasks after read all the metablks + m_recovered_gc_tasks.emplace_back(_gc_task_meta_name); m_recovered_gc_tasks.back().load(buf, meta_cookie); } @@ -89,7 +89,7 @@ void GCManager::handle_all_recovered_gc_tasks() { } void GCManager::on_gc_actor_meta_blk_found(sisl::byte_view const& buf, void* meta_cookie) { - m_gc_actor_sbs.emplace_back(GCManager::_gc_actor_meta_name); + m_gc_actor_sbs.emplace_back(_gc_actor_meta_name); auto& gc_actor_sb = m_gc_actor_sbs.back(); gc_actor_sb.load(buf, meta_cookie); auto pdev_id = gc_actor_sb->pdev_id; @@ -100,8 +100,7 @@ void GCManager::on_gc_actor_meta_blk_found(sisl::byte_view const& buf, void* met } void GCManager::on_reserved_chunk_meta_blk_found(sisl::byte_view const& buf, void* meta_cookie) { - homestore::superblk< GCManager::gc_reserved_chunk_superblk > reserved_chunk_sb( - GCManager::_gc_reserved_chunk_meta_name); + homestore::superblk< gc_reserved_chunk_superblk > reserved_chunk_sb(_gc_reserved_chunk_meta_name); auto chunk_id = reserved_chunk_sb.load(buf, meta_cookie)->chunk_id; auto EXVchunk = m_chunk_selector->get_extend_vchunk(chunk_id); if (EXVchunk == nullptr) { @@ -184,8 +183,7 @@ folly::SemiFuture< bool > GCManager::submit_gc_task(task_priority priority, chun } std::shared_ptr< GCManager::pdev_gc_actor > -GCManager::try_create_pdev_gc_actor(uint32_t pdev_id, - const homestore::superblk< GCManager::gc_actor_superblk >& gc_actor_sb) { +GCManager::try_create_pdev_gc_actor(uint32_t pdev_id, const homestore::superblk< gc_actor_superblk >& gc_actor_sb) { auto const [it, happened] = m_pdev_gc_actors.try_emplace( pdev_id, std::make_shared< pdev_gc_actor >(gc_actor_sb, m_chunk_selector, m_hs_home_object)); RELEASE_ASSERT((it != m_pdev_gc_actors.end()), "Unexpected error in m_pdev_gc_actors!!!"); @@ -776,7 +774,7 @@ bool GCManager::pdev_gc_actor::copy_valid_data( #endif // TODO::involve ratelimiter in the following code, where read/write are scheduled. or do we need a central // ratelimter shared by all components except client io? - auto succeed_copying_shard = + const auto succeed_copying_shard = // 1 write the shard header to move_to_chunk data_service.async_alloc_write(header_sgs, hints, out_blkids) .thenValue([this, &hints, &move_to_chunk, &move_from_chunk, &is_last_shard, &shard_id, &blk_size, @@ -968,11 +966,9 @@ bool GCManager::pdev_gc_actor::copy_valid_data( move_from_chunk, move_to_chunk); return false; } - GCLOGD(task_id, pg_id, shard_id, "successfully copy blobs from move_from_chunk={} to move_to_chunk={}", move_from_chunk, move_to_chunk); } - GCLOGD(task_id, pg_id, NO_SHARD_ID, "all valid blobs are copied from move_from_chunk={} to move_to_chunk={}", move_from_chunk, move_to_chunk); @@ -1132,17 +1128,14 @@ bool GCManager::pdev_gc_actor::compare_blob_indexes( GCLOGW(task_id, pg_id, shard_id, "copied blob: move_to_chunk={}, blob_id={}, pba={}", k.chunk, k.blob, v.pbas().to_string()); } - GCLOGW(task_id, pg_id, NO_SHARD_ID, "start printing valid blobs from gc index table:"); for (const auto& [k, v] : valid_blob_indexes) { const auto shard_id = k.key().shard; GCLOGW(task_id, pg_id, shard_id, "valid blob: move_to_chunk={}, blob_id={}, pba={}", k.key().chunk, k.key().blob, v.pbas().to_string()); } - RELEASE_ASSERT(false, "copied blobs are not the same as the valid blobs got from gc index table"); } - return ret; } @@ -1325,6 +1318,7 @@ bool GCManager::pdev_gc_actor::process_after_gc_metablk_persisted( } } + // now, all the blob indexes have been replaced successfully, we can destroy the gc task superblk gc_task_sb.destroy(); const auto reclaimed_blk_count = m_chunk_selector->get_extend_vchunk(move_from_chunk)->get_used_blks() - diff --git a/src/lib/homestore_backend/hs_homeobject.cpp b/src/lib/homestore_backend/hs_homeobject.cpp index 42585113..14efaa58 100644 --- a/src/lib/homestore_backend/hs_homeobject.cpp +++ b/src/lib/homestore_backend/hs_homeobject.cpp @@ -349,6 +349,24 @@ void HSHomeObject::on_replica_restart() { homestore::meta_service().read_sub_sb(GCManager::_gc_reserved_chunk_meta_name); homestore::meta_service().read_sub_sb(GCManager::_gc_task_meta_name); + // it is safe to handle recovered gc tasks before log replay done. gc task superblk is persisted until all + // the valid blobs are copied from move_from_chunk to move_to_chunk and all the new pba indexes are flushed into + // gc indextable by cp(cp also flushes pg index table) before crash. moreover, gc superblk means there is an + // ongoing gc task for move_from_chunk and move_to_chunk, and there should be no new put_blob to these two chunk + // until this task is done. when replaying log for put_blob_commit, if the blob index is already in index + // table(all the index item has been flushed by cp before persisting gc task superblk), it will not override the + // existing item. + + // 2 we need to handle all the recovered gc tasks before replaying log. when log replay done, in + // ReplicationStateMachine::on_log_replay_done, we need select_specific_chunk for all the chunks with open shard + // to mark the states of these chunks to inuse. if crash happens after the shard metablk has been updated(the + // pchunk of this shard is changed to move_to_chunk)) but before reserved_chunk_superblk has been persisted + // (move_to_chunk is now still a reserved chunk), when log replay is done and try to select_specific_chunk for + // the chunk with open shard, since the state of move_to_chunk is reserved, and thus its state is GC and can not + // be selected, and will be stuck in on_log_replay_done. + + // after handling all the recovered gc tasks, move_to_chunk will be marked to inuse, and thus can be selected in + // on_log_replay_done, and the log replay can be completed successfully. gc_mgr_->handle_all_recovered_gc_tasks(); if (HS_BACKEND_DYNAMIC_CONFIG(enable_gc)) { @@ -411,6 +429,7 @@ void HSHomeObject::shutdown() { } LOGI("start shutting down HomeObject"); + #if 0 if (ho_timer_thread_handle_.first) { iomanager.cancel_timer(ho_timer_thread_handle_, true); @@ -432,7 +451,6 @@ void HSHomeObject::shutdown() { // to persist gc task metablk if there is any ongoing gc task. after stopping gc manager, there is no gc task // anymore, and thus now new gc task will be written to metaservice during homestore shutdown. gc_mgr_->stop(); - LOGI("start shutting down HomeStore"); homestore::HomeStore::instance()->shutdown(); homestore::HomeStore::reset_instance(); diff --git a/src/lib/homestore_backend/hs_homeobject.hpp b/src/lib/homestore_backend/hs_homeobject.hpp index f2f14d93..1668ec82 100644 --- a/src/lib/homestore_backend/hs_homeobject.hpp +++ b/src/lib/homestore_backend/hs_homeobject.hpp @@ -332,6 +332,9 @@ class HSHomeObject : public HomeObjectImpl { shared< homestore::ReplDev > repl_dev_; std::shared_ptr< BlobIndexTable > index_table_; PGMetrics metrics_; + int64_t gc_ready_lsn_; // the chunks in this pg can only be gc after commit_lsn >= gc_ready_lsn_ + bool ready_for_gc{false}; + mutable pg_state pg_state_{0}; // Snapshot receiver progress info, used as a checkpoint for recovery @@ -981,6 +984,8 @@ class HSHomeObject : public HomeObjectImpl { const HS_PG* _get_hs_pg_unlocked(pg_id_t pg_id) const; bool verify_blob(const void* blob, const shard_id_t shard_id, const blob_id_t blob_id, bool allow_delete_marker = false) const; + void on_pg_lsn_commit(pg_id_t pg_id, int64_t lsn); + void on_pg_lsn_rollback(pg_id_t pg_id, int64_t lsn); BlobManager::Result< std::vector< BlobInfo > > get_shard_blobs(shard_id_t shard_id); diff --git a/src/lib/homestore_backend/hs_pg_manager.cpp b/src/lib/homestore_backend/hs_pg_manager.cpp index 548af166..f3af406b 100644 --- a/src/lib/homestore_backend/hs_pg_manager.cpp +++ b/src/lib/homestore_backend/hs_pg_manager.cpp @@ -398,10 +398,10 @@ void HSHomeObject::on_pg_complete_replace_member(group_id_t group_id, const std: boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id), tid); } -//This function actually perform rollback for replace member task: Remove in_member, and ensure out_member exists +// This function actually perform rollback for replace member task: Remove in_member, and ensure out_member exists void HSHomeObject::on_pg_clean_replace_member_task(group_id_t group_id, const std::string& task_id, - const replica_member_info& member_out, - const replica_member_info& member_in, trace_id_t tid) { + const replica_member_info& member_out, + const replica_member_info& member_in, trace_id_t tid) { std::unique_lock lck(_pg_lock); for (const auto& iter : _pg_map) { auto& pg = iter.second; @@ -421,13 +421,13 @@ void HSHomeObject::on_pg_clean_replace_member_task(group_id_t group_id, const st LOGI("PG clean replace member task done (rollback), task_id={}, removed in_member={} (removed={}), " "ensured out_member={} (inserted={}), member_nums={}, trace_id={}", - task_id, boost::uuids::to_string(member_in.id), removed_count, - boost::uuids::to_string(member_out.id), inserted, pg->pg_info_.members.size(), tid); + task_id, boost::uuids::to_string(member_in.id), removed_count, boost::uuids::to_string(member_out.id), + inserted, pg->pg_info_.members.size(), tid); return; } } - LOGE("PG clean replace member task failed, pg not found, task_id={}, member_out={}, member_in={}, trace_id={}", task_id, - boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id), tid); + LOGE("PG clean replace member task failed, pg not found, task_id={}, member_out={}, member_in={}, trace_id={}", + task_id, boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id), tid); } bool HSHomeObject::reconcile_membership(pg_id_t pg_id) { @@ -457,7 +457,8 @@ bool HSHomeObject::reconcile_membership(pg_id_t pg_id) { // New member not in our records, add with default name PGMember new_member(member_id); new_members.insert(std::move(new_member)); - LOGE("Adding new member {} to pg={} membership, should not happen!", boost::uuids::to_string(member_id), hs_pg->pg_info_.id); + LOGE("Adding new member {} to pg={} membership, should not happen!", boost::uuids::to_string(member_id), + hs_pg->pg_info_.id); } } // Check if membership changed @@ -471,9 +472,7 @@ bool HSHomeObject::reconcile_membership(pg_id_t pg_id) { std::vector< peer_id_t > added_members; for (auto& old_member : hs_pg->pg_info_.members) { - if (new_members.find(old_member) == new_members.end()) { - removed_members.push_back(old_member.id); - } + if (new_members.find(old_member) == new_members.end()) { removed_members.push_back(old_member.id); } } for (auto& new_member : new_members) { @@ -482,7 +481,8 @@ bool HSHomeObject::reconcile_membership(pg_id_t pg_id) { } } - LOGI("Reconciling PG {} membership: removing {} members, adding {} members, old membership count {}, new membership count: {}", + LOGI("Reconciling PG {} membership: removing {} members, adding {} members, old membership count {}, new " + "membership count: {}", pg_id, removed_members.size(), added_members.size(), hs_pg->pg_info_.members.size(), new_members.size()); for (auto& member_id : removed_members) { @@ -808,7 +808,29 @@ bool HSHomeObject::can_chunks_in_pg_be_gc(pg_id_t pg_id) const { return false; } - return hs_pg->pg_sb_->state == PGState::ALIVE; + // we can gc the chunks of a pg only after the commit_lsn > gc_target_lsn. otherwise, the log replay of put_blob + // might lead to a data loss issue as following: + + /* + let`s say cp_lsn and dc_lsn is 10, lsn 11 is put_blob (blob -> pba-chunk-1), and lsn 12 is seal_shard(shard-1 , + chunk-1). + + 1 before crash, lsn 11 and lsn 12 are both committed. as a result , we have a "blob -> pba-chunk-1" in the wbcache + of indextable and a persisted superblk of shard-1 with a state sealed. + + 2 crash happens. after restart, "blob -> pba-chunk-1" is lost since it only existes in wbcache and not be flushed to + disk. but shard-1 has a stat of sealed since shard superblk is persisted before crash. now, since no open shard in + chunk-1, chunk-1 is selected for gc and all the blobs of shard-1 are moved to chunk-2 , and chunk-1 becomes a + reserved chunk. + + 3 since dc_lsn is 10, after log replay(only committing logs with lsn <= dc_lsn), we start committing lsn 11. since + "blob -> pba-chunk-1" does not exist in pg-index-table, on_blob_put_commit will insert a new item + "blob -> pba-chunk-1" to pg-index-table. this is where issue happens. blob belong to shard-1, which has been moved + to chunk-2. but on_blob_put_commit adds blob to indextable with a stale pba belongs to chunk-1 , which is now a + reserved chunk and will be purged later.ing stale blocks that no longer contain the correct shard’s data. + */ + + return hs_pg->pg_sb_->state == PGState::ALIVE && hs_pg->ready_for_gc; } void HSHomeObject::destroy_hs_resources(pg_id_t pg_id) { chunk_selector_->reset_pg_chunks(pg_id); } @@ -977,6 +999,7 @@ HSHomeObject::HS_PG::HS_PG(PGInfo info, shared< homestore::ReplDev > rdev, share repl_dev_{std::move(rdev)}, index_table_{std::move(index_table)}, metrics_{*this}, + gc_ready_lsn_{repl_dev_->get_last_append_lsn()}, snp_rcvr_info_sb_{_snp_rcvr_meta_name}, snp_rcvr_shard_list_sb_{_snp_rcvr_shard_list_meta_name} { RELEASE_ASSERT(pg_chunk_ids != nullptr, "PG chunks null, pg={}", pg_info_.id); @@ -1014,7 +1037,11 @@ HSHomeObject::HS_PG::HS_PG(PGInfo info, shared< homestore::ReplDev > rdev, share } HSHomeObject::HS_PG::HS_PG(superblk< pg_info_superblk >&& sb, shared< ReplDev > rdev) : - PG{pg_info_from_sb(sb)}, pg_sb_{std::move(sb)}, repl_dev_{std::move(rdev)}, metrics_{*this} { + PG{pg_info_from_sb(sb)}, + pg_sb_{std::move(sb)}, + repl_dev_{std::move(rdev)}, + metrics_{*this}, + gc_ready_lsn_{repl_dev_->get_last_append_lsn()} { durable_entities_.blob_sequence_num = pg_sb_->blob_sequence_num; durable_entities_.active_blob_count = pg_sb_->active_blob_count; durable_entities_.tombstone_blob_count = pg_sb_->tombstone_blob_count; @@ -1320,4 +1347,42 @@ void HSHomeObject::update_pg_meta_after_gc(const pg_id_t pg_id, const homestore: } } +void HSHomeObject::on_pg_lsn_commit(pg_id_t pg_id, int64_t lsn) { + std::lock_guard lck(_pg_lock); + auto iter = _pg_map.find(pg_id); + if (iter == _pg_map.end()) { + LOGW("pg_id={} is not found in pg_map when on_pg_lsn_commit is called", pg_id); + return; + } + + auto hs_pg = dynamic_cast< HS_PG* >(iter->second.get()); + if (hs_pg == nullptr) { + LOGW("pg_id={} is not found in pg_map when on_pg_lsn_commit is called", pg_id); + return; + } + + if (hs_pg->gc_ready_lsn_ <= lsn && !(hs_pg->ready_for_gc)) { + LOGD("set ready_for_gc to true for pg_id={}, gc_ready_lsn_={}, committed_lsn={}", pg_id, hs_pg->gc_ready_lsn_, + lsn); + hs_pg->ready_for_gc = true; + } +} + +void HSHomeObject::on_pg_lsn_rollback(pg_id_t pg_id, int64_t lsn) { + std::lock_guard lck(_pg_lock); + auto iter = _pg_map.find(pg_id); + if (iter == _pg_map.end()) { + LOGW("pg_id={} is not found in pg_map when on_pg_lsn_commit is called", pg_id); + return; + } + + auto hs_pg = dynamic_cast< HS_PG* >(iter->second.get()); + if (hs_pg == nullptr) { + LOGW("pg_id={} is not found in pg_map when on_pg_lsn_commit is called", pg_id); + return; + } + + hs_pg->gc_ready_lsn_ = std::min(lsn - 1, hs_pg->gc_ready_lsn_); +} + } // namespace homeobject diff --git a/src/lib/homestore_backend/replication_state_machine.cpp b/src/lib/homestore_backend/replication_state_machine.cpp index a508bad9..654387fc 100644 --- a/src/lib/homestore_backend/replication_state_machine.cpp +++ b/src/lib/homestore_backend/replication_state_machine.cpp @@ -70,6 +70,11 @@ void ReplicationStateMachine::notify_committed_lsn(int64_t lsn) { handle_no_space_left(lsn, chunk_id); reset_no_space_left_error_info(); } + + const auto group_id = repl_dev()->group_id(); + auto pg_id_opt = home_object_->get_pg_id_with_group_id(group_id); + RELEASE_ASSERT(pg_id_opt.has_value(), "pg not found for group={} in notify_committed_lsn", group_id); + home_object_->on_pg_lsn_commit(pg_id_opt.value(), lsn); } bool ReplicationStateMachine::on_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, @@ -106,12 +111,16 @@ bool ReplicationStateMachine::on_pre_commit(int64_t lsn, sisl::blob const& heade void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key, cintrusive< homestore::repl_req_ctx >& ctx) { - LOGI("on_rollback with lsn={}", lsn); + const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes()); if (msg_header->corrupted()) { - LOGE("corrupted message in rollback, lsn={}", lsn); + RELEASE_ASSERT(false, "corrupted message in rollback, lsn={}", lsn); return; } + + const auto& pg_id = msg_header->pg_id; + LOGI("on_rollback with lsn={}, pg={}", lsn, pg_id); + switch (msg_header->msg_type) { case ReplicationMessageType::CREATE_SHARD_MSG: case ReplicationMessageType::SEAL_SHARD_MSG: { @@ -144,6 +153,8 @@ void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const& header, // if target_lsn is int64_max, it`s is also ok to reset_no_space_left_error_info reset_no_space_left_error_info(); + + home_object_->on_pg_lsn_rollback(pg_id, lsn); } void ReplicationStateMachine::on_config_rollback(int64_t lsn) { @@ -158,6 +169,11 @@ void ReplicationStateMachine::on_config_rollback(int64_t lsn) { // if target_lsn is int64_max, it`s is also ok to reset_no_space_left_error_info reset_no_space_left_error_info(); + + const auto group_id = repl_dev()->group_id(); + auto pg_id_opt = home_object_->get_pg_id_with_group_id(group_id); + RELEASE_ASSERT(pg_id_opt.has_value(), "pg not found for group={} in on_config_rollback", group_id); + home_object_->on_pg_lsn_rollback(pg_id_opt.value(), lsn); } void ReplicationStateMachine::on_restart() { LOGD("ReplicationStateMachine::on_restart"); } @@ -1039,6 +1055,7 @@ void ReplicationStateMachine::on_log_replay_done(const homestore::group_id_t& gr const auto pg_id = pg_id_opt.value(); RELEASE_ASSERT(home_object_->pg_exists(pg_id), "pg={} should exist, but not! fatal error!", pg_id); + LOGI("log replay done for pg={}, group_id={}", pg_id, group_id); const auto& shards_in_pg = (const_cast< HSHomeObject::HS_PG* >(home_object_->_get_hs_pg_unlocked(pg_id)))->shards_; auto chunk_selector = home_object_->chunk_selector();