Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "3.0.19"
version = "3.0.20"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeStore"
Expand Down
26 changes: 10 additions & 16 deletions src/lib/homestore_backend/gc_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ SISL_LOGGING_DECL(gcmgr)
GCManager::GCManager(HSHomeObject* homeobject) :
m_chunk_selector{homeobject->chunk_selector()}, m_hs_home_object{homeobject} {
homestore::meta_service().register_handler(
GCManager::_gc_actor_meta_name,
_gc_actor_meta_name,
[this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) {
on_gc_actor_meta_blk_found(std::move(buf), voidptr_cast(mblk));
},
nullptr, true);

homestore::meta_service().register_handler(
GCManager::_gc_reserved_chunk_meta_name,
_gc_reserved_chunk_meta_name,
[this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) {
on_reserved_chunk_meta_blk_found(std::move(buf), voidptr_cast(mblk));
},
Expand All @@ -44,7 +44,7 @@ GCManager::GCManager(HSHomeObject* homeobject) :
true);

homestore::meta_service().register_handler(
GCManager::_gc_task_meta_name,
_gc_task_meta_name,
[this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) {
on_gc_task_meta_blk_found(std::move(buf), voidptr_cast(mblk));
},
Expand All @@ -63,8 +63,8 @@ void GCManager::on_gc_task_meta_blk_found(sisl::byte_view const& buf, void* meta

// here, we are under the protection of the lock of metaservice. however, we will also try to update pg and shard
// metablk and then destroy the gc_task_sb, which will also try to acquire the lock of metaservice, as a result, a
// dead lock will happen. so here we will handle all the gc tasks after read all the their metablks
m_recovered_gc_tasks.emplace_back(GCManager::_gc_task_meta_name);
// dead lock will happen. so here we will handle all the gc tasks after read all the metablks
m_recovered_gc_tasks.emplace_back(_gc_task_meta_name);
m_recovered_gc_tasks.back().load(buf, meta_cookie);
}

Expand All @@ -89,7 +89,7 @@ void GCManager::handle_all_recovered_gc_tasks() {
}

void GCManager::on_gc_actor_meta_blk_found(sisl::byte_view const& buf, void* meta_cookie) {
m_gc_actor_sbs.emplace_back(GCManager::_gc_actor_meta_name);
m_gc_actor_sbs.emplace_back(_gc_actor_meta_name);
auto& gc_actor_sb = m_gc_actor_sbs.back();
gc_actor_sb.load(buf, meta_cookie);
auto pdev_id = gc_actor_sb->pdev_id;
Expand All @@ -100,8 +100,7 @@ void GCManager::on_gc_actor_meta_blk_found(sisl::byte_view const& buf, void* met
}

void GCManager::on_reserved_chunk_meta_blk_found(sisl::byte_view const& buf, void* meta_cookie) {
homestore::superblk< GCManager::gc_reserved_chunk_superblk > reserved_chunk_sb(
GCManager::_gc_reserved_chunk_meta_name);
homestore::superblk< gc_reserved_chunk_superblk > reserved_chunk_sb(_gc_reserved_chunk_meta_name);
auto chunk_id = reserved_chunk_sb.load(buf, meta_cookie)->chunk_id;
auto EXVchunk = m_chunk_selector->get_extend_vchunk(chunk_id);
if (EXVchunk == nullptr) {
Expand Down Expand Up @@ -184,8 +183,7 @@ folly::SemiFuture< bool > GCManager::submit_gc_task(task_priority priority, chun
}

std::shared_ptr< GCManager::pdev_gc_actor >
GCManager::try_create_pdev_gc_actor(uint32_t pdev_id,
const homestore::superblk< GCManager::gc_actor_superblk >& gc_actor_sb) {
GCManager::try_create_pdev_gc_actor(uint32_t pdev_id, const homestore::superblk< gc_actor_superblk >& gc_actor_sb) {
auto const [it, happened] = m_pdev_gc_actors.try_emplace(
pdev_id, std::make_shared< pdev_gc_actor >(gc_actor_sb, m_chunk_selector, m_hs_home_object));
RELEASE_ASSERT((it != m_pdev_gc_actors.end()), "Unexpected error in m_pdev_gc_actors!!!");
Expand Down Expand Up @@ -776,7 +774,7 @@ bool GCManager::pdev_gc_actor::copy_valid_data(
#endif
// TODO::involve ratelimiter in the following code, where read/write are scheduled. or do we need a central
// ratelimter shared by all components except client io?
auto succeed_copying_shard =
const auto succeed_copying_shard =
// 1 write the shard header to move_to_chunk
data_service.async_alloc_write(header_sgs, hints, out_blkids)
.thenValue([this, &hints, &move_to_chunk, &move_from_chunk, &is_last_shard, &shard_id, &blk_size,
Expand Down Expand Up @@ -968,11 +966,9 @@ bool GCManager::pdev_gc_actor::copy_valid_data(
move_from_chunk, move_to_chunk);
return false;
}

GCLOGD(task_id, pg_id, shard_id, "successfully copy blobs from move_from_chunk={} to move_to_chunk={}",
move_from_chunk, move_to_chunk);
}

GCLOGD(task_id, pg_id, NO_SHARD_ID, "all valid blobs are copied from move_from_chunk={} to move_to_chunk={}",
move_from_chunk, move_to_chunk);

Expand Down Expand Up @@ -1132,17 +1128,14 @@ bool GCManager::pdev_gc_actor::compare_blob_indexes(
GCLOGW(task_id, pg_id, shard_id, "copied blob: move_to_chunk={}, blob_id={}, pba={}", k.chunk, k.blob,
v.pbas().to_string());
}

GCLOGW(task_id, pg_id, NO_SHARD_ID, "start printing valid blobs from gc index table:");
for (const auto& [k, v] : valid_blob_indexes) {
const auto shard_id = k.key().shard;
GCLOGW(task_id, pg_id, shard_id, "valid blob: move_to_chunk={}, blob_id={}, pba={}", k.key().chunk,
k.key().blob, v.pbas().to_string());
}

RELEASE_ASSERT(false, "copied blobs are not the same as the valid blobs got from gc index table");
}

return ret;
}

Expand Down Expand Up @@ -1325,6 +1318,7 @@ bool GCManager::pdev_gc_actor::process_after_gc_metablk_persisted(
}
}

// now, all the blob indexes have been replaced successfully, we can destroy the gc task superblk
gc_task_sb.destroy();

const auto reclaimed_blk_count = m_chunk_selector->get_extend_vchunk(move_from_chunk)->get_used_blks() -
Expand Down
20 changes: 19 additions & 1 deletion src/lib/homestore_backend/hs_homeobject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,24 @@ void HSHomeObject::on_replica_restart() {
homestore::meta_service().read_sub_sb(GCManager::_gc_reserved_chunk_meta_name);
homestore::meta_service().read_sub_sb(GCManager::_gc_task_meta_name);

// it is safe to handle recovered gc tasks before log replay done. gc task superblk is persisted until all
// the valid blobs are copied from move_from_chunk to move_to_chunk and all the new pba indexes are flushed into
// gc indextable by cp(cp also flushes pg index table) before crash. moreover, gc superblk means there is an
// ongoing gc task for move_from_chunk and move_to_chunk, and there should be no new put_blob to these two chunk
// until this task is done. when replaying log for put_blob_commit, if the blob index is already in index
// table(all the index item has been flushed by cp before persisting gc task superblk), it will not override the
// existing item.

// 2 we need to handle all the recovered gc tasks before replaying log. when log replay done, in
// ReplicationStateMachine::on_log_replay_done, we need select_specific_chunk for all the chunks with open shard
// to mark the states of these chunks to inuse. if crash happens after the shard metablk has been updated(the
// pchunk of this shard is changed to move_to_chunk)) but before reserved_chunk_superblk has been persisted
// (move_to_chunk is now still a reserved chunk), when log replay is done and try to select_specific_chunk for
// the chunk with open shard, since the state of move_to_chunk is reserved, and thus its state is GC and can not
// be selected, and will be stuck in on_log_replay_done.

// after handling all the recovered gc tasks, move_to_chunk will be marked to inuse, and thus can be selected in
// on_log_replay_done, and the log replay can be completed successfully.
gc_mgr_->handle_all_recovered_gc_tasks();

if (HS_BACKEND_DYNAMIC_CONFIG(enable_gc)) {
Expand Down Expand Up @@ -411,6 +429,7 @@ void HSHomeObject::shutdown() {
}

LOGI("start shutting down HomeObject");

#if 0
if (ho_timer_thread_handle_.first) {
iomanager.cancel_timer(ho_timer_thread_handle_, true);
Expand All @@ -432,7 +451,6 @@ void HSHomeObject::shutdown() {
// to persist gc task metablk if there is any ongoing gc task. after stopping gc manager, there is no gc task
// anymore, and thus now new gc task will be written to metaservice during homestore shutdown.
gc_mgr_->stop();

LOGI("start shutting down HomeStore");
homestore::HomeStore::instance()->shutdown();
homestore::HomeStore::reset_instance();
Expand Down
5 changes: 5 additions & 0 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,9 @@ class HSHomeObject : public HomeObjectImpl {
shared< homestore::ReplDev > repl_dev_;
std::shared_ptr< BlobIndexTable > index_table_;
PGMetrics metrics_;
int64_t gc_ready_lsn_; // the chunks in this pg can only be gc after commit_lsn >= gc_ready_lsn_
bool ready_for_gc{false};

mutable pg_state pg_state_{0};

// Snapshot receiver progress info, used as a checkpoint for recovery
Expand Down Expand Up @@ -981,6 +984,8 @@ class HSHomeObject : public HomeObjectImpl {
const HS_PG* _get_hs_pg_unlocked(pg_id_t pg_id) const;
bool verify_blob(const void* blob, const shard_id_t shard_id, const blob_id_t blob_id,
bool allow_delete_marker = false) const;
void on_pg_lsn_commit(pg_id_t pg_id, int64_t lsn);
void on_pg_lsn_rollback(pg_id_t pg_id, int64_t lsn);

BlobManager::Result< std::vector< BlobInfo > > get_shard_blobs(shard_id_t shard_id);

Expand Down
93 changes: 79 additions & 14 deletions src/lib/homestore_backend/hs_pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -398,10 +398,10 @@ void HSHomeObject::on_pg_complete_replace_member(group_id_t group_id, const std:
boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id), tid);
}

//This function actually perform rollback for replace member task: Remove in_member, and ensure out_member exists
// This function actually perform rollback for replace member task: Remove in_member, and ensure out_member exists
void HSHomeObject::on_pg_clean_replace_member_task(group_id_t group_id, const std::string& task_id,
const replica_member_info& member_out,
const replica_member_info& member_in, trace_id_t tid) {
const replica_member_info& member_out,
const replica_member_info& member_in, trace_id_t tid) {
std::unique_lock lck(_pg_lock);
for (const auto& iter : _pg_map) {
auto& pg = iter.second;
Expand All @@ -421,13 +421,13 @@ void HSHomeObject::on_pg_clean_replace_member_task(group_id_t group_id, const st

LOGI("PG clean replace member task done (rollback), task_id={}, removed in_member={} (removed={}), "
"ensured out_member={} (inserted={}), member_nums={}, trace_id={}",
task_id, boost::uuids::to_string(member_in.id), removed_count,
boost::uuids::to_string(member_out.id), inserted, pg->pg_info_.members.size(), tid);
task_id, boost::uuids::to_string(member_in.id), removed_count, boost::uuids::to_string(member_out.id),
inserted, pg->pg_info_.members.size(), tid);
return;
}
}
LOGE("PG clean replace member task failed, pg not found, task_id={}, member_out={}, member_in={}, trace_id={}", task_id,
boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id), tid);
LOGE("PG clean replace member task failed, pg not found, task_id={}, member_out={}, member_in={}, trace_id={}",
task_id, boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id), tid);
}

bool HSHomeObject::reconcile_membership(pg_id_t pg_id) {
Expand Down Expand Up @@ -457,7 +457,8 @@ bool HSHomeObject::reconcile_membership(pg_id_t pg_id) {
// New member not in our records, add with default name
PGMember new_member(member_id);
new_members.insert(std::move(new_member));
LOGE("Adding new member {} to pg={} membership, should not happen!", boost::uuids::to_string(member_id), hs_pg->pg_info_.id);
LOGE("Adding new member {} to pg={} membership, should not happen!", boost::uuids::to_string(member_id),
hs_pg->pg_info_.id);
}
}
// Check if membership changed
Expand All @@ -471,9 +472,7 @@ bool HSHomeObject::reconcile_membership(pg_id_t pg_id) {
std::vector< peer_id_t > added_members;

for (auto& old_member : hs_pg->pg_info_.members) {
if (new_members.find(old_member) == new_members.end()) {
removed_members.push_back(old_member.id);
}
if (new_members.find(old_member) == new_members.end()) { removed_members.push_back(old_member.id); }
}

for (auto& new_member : new_members) {
Expand All @@ -482,7 +481,8 @@ bool HSHomeObject::reconcile_membership(pg_id_t pg_id) {
}
}

LOGI("Reconciling PG {} membership: removing {} members, adding {} members, old membership count {}, new membership count: {}",
LOGI("Reconciling PG {} membership: removing {} members, adding {} members, old membership count {}, new "
"membership count: {}",
pg_id, removed_members.size(), added_members.size(), hs_pg->pg_info_.members.size(), new_members.size());

for (auto& member_id : removed_members) {
Expand Down Expand Up @@ -808,7 +808,29 @@ bool HSHomeObject::can_chunks_in_pg_be_gc(pg_id_t pg_id) const {
return false;
}

return hs_pg->pg_sb_->state == PGState::ALIVE;
// we can gc the chunks of a pg only after the commit_lsn > gc_target_lsn. otherwise, the log replay of put_blob
// might lead to a data loss issue as following:

/*
let`s say cp_lsn and dc_lsn is 10, lsn 11 is put_blob (blob -> pba-chunk-1), and lsn 12 is seal_shard(shard-1 ,
chunk-1).

1 before crash, lsn 11 and lsn 12 are both committed. as a result , we have a "blob -> pba-chunk-1" in the wbcache
of indextable and a persisted superblk of shard-1 with a state sealed.

2 crash happens. after restart, "blob -> pba-chunk-1" is lost since it only existes in wbcache and not be flushed to
disk. but shard-1 has a stat of sealed since shard superblk is persisted before crash. now, since no open shard in
chunk-1, chunk-1 is selected for gc and all the blobs of shard-1 are moved to chunk-2 , and chunk-1 becomes a
reserved chunk.

3 since dc_lsn is 10, after log replay(only committing logs with lsn <= dc_lsn), we start committing lsn 11. since
"blob -> pba-chunk-1" does not exist in pg-index-table, on_blob_put_commit will insert a new item
"blob -> pba-chunk-1" to pg-index-table. this is where issue happens. blob belong to shard-1, which has been moved
to chunk-2. but on_blob_put_commit adds blob to indextable with a stale pba belongs to chunk-1 , which is now a
reserved chunk and will be purged later.ing stale blocks that no longer contain the correct shard’s data.
*/

return hs_pg->pg_sb_->state == PGState::ALIVE && hs_pg->ready_for_gc;
}

void HSHomeObject::destroy_hs_resources(pg_id_t pg_id) { chunk_selector_->reset_pg_chunks(pg_id); }
Expand Down Expand Up @@ -977,6 +999,7 @@ HSHomeObject::HS_PG::HS_PG(PGInfo info, shared< homestore::ReplDev > rdev, share
repl_dev_{std::move(rdev)},
index_table_{std::move(index_table)},
metrics_{*this},
gc_ready_lsn_{repl_dev_->get_last_append_lsn()},
snp_rcvr_info_sb_{_snp_rcvr_meta_name},
snp_rcvr_shard_list_sb_{_snp_rcvr_shard_list_meta_name} {
RELEASE_ASSERT(pg_chunk_ids != nullptr, "PG chunks null, pg={}", pg_info_.id);
Expand Down Expand Up @@ -1014,7 +1037,11 @@ HSHomeObject::HS_PG::HS_PG(PGInfo info, shared< homestore::ReplDev > rdev, share
}

HSHomeObject::HS_PG::HS_PG(superblk< pg_info_superblk >&& sb, shared< ReplDev > rdev) :
PG{pg_info_from_sb(sb)}, pg_sb_{std::move(sb)}, repl_dev_{std::move(rdev)}, metrics_{*this} {
PG{pg_info_from_sb(sb)},
pg_sb_{std::move(sb)},
repl_dev_{std::move(rdev)},
metrics_{*this},
gc_ready_lsn_{repl_dev_->get_last_append_lsn()} {
durable_entities_.blob_sequence_num = pg_sb_->blob_sequence_num;
durable_entities_.active_blob_count = pg_sb_->active_blob_count;
durable_entities_.tombstone_blob_count = pg_sb_->tombstone_blob_count;
Expand Down Expand Up @@ -1320,4 +1347,42 @@ void HSHomeObject::update_pg_meta_after_gc(const pg_id_t pg_id, const homestore:
}
}

void HSHomeObject::on_pg_lsn_commit(pg_id_t pg_id, int64_t lsn) {
std::lock_guard lck(_pg_lock);
auto iter = _pg_map.find(pg_id);
if (iter == _pg_map.end()) {
LOGW("pg_id={} is not found in pg_map when on_pg_lsn_commit is called", pg_id);
return;
}

auto hs_pg = dynamic_cast< HS_PG* >(iter->second.get());
if (hs_pg == nullptr) {
LOGW("pg_id={} is not found in pg_map when on_pg_lsn_commit is called", pg_id);
return;
}

if (hs_pg->gc_ready_lsn_ <= lsn && !(hs_pg->ready_for_gc)) {
LOGD("set ready_for_gc to true for pg_id={}, gc_ready_lsn_={}, committed_lsn={}", pg_id, hs_pg->gc_ready_lsn_,
lsn);
hs_pg->ready_for_gc = true;
}
}

void HSHomeObject::on_pg_lsn_rollback(pg_id_t pg_id, int64_t lsn) {
std::lock_guard lck(_pg_lock);
auto iter = _pg_map.find(pg_id);
if (iter == _pg_map.end()) {
LOGW("pg_id={} is not found in pg_map when on_pg_lsn_commit is called", pg_id);
return;
}

auto hs_pg = dynamic_cast< HS_PG* >(iter->second.get());
if (hs_pg == nullptr) {
LOGW("pg_id={} is not found in pg_map when on_pg_lsn_commit is called", pg_id);
return;
}

hs_pg->gc_ready_lsn_ = std::min(lsn - 1, hs_pg->gc_ready_lsn_);
}

} // namespace homeobject
Loading
Loading