Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "7.3.3"
version = "7.3.4"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
9 changes: 6 additions & 3 deletions src/lib/logstore/log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -360,18 +360,21 @@ bool HomeLogStore::rollback(logstore_seq_num_t to_lsn) {
incr_pending_request_num();
// Fast path
if (to_lsn == m_tail_lsn.load()) {
THIS_LOGSTORE_LOG(INFO, "Rollback to the current tail_lsn {}, no-op", to_lsn);
decr_pending_request_num();
return true;
}

if (to_lsn > m_tail_lsn.load() || to_lsn < m_start_lsn.load()) {
// Special case: allow rollback to exactly start_lsn - 1.
// This handles the scenario where all logs were truncated and a leader switch happens before new logs commit.
if (to_lsn > m_tail_lsn.load() || to_lsn < m_start_lsn.load() - 1) {
HS_LOG_ASSERT(false, "Attempted to rollback to {} which is not in the range of [{}, {}]", to_lsn,
m_start_lsn.load(), m_tail_lsn.load());
m_start_lsn.load() - 1, m_tail_lsn.load());
decr_pending_request_num();
return false;
}

THIS_LOGSTORE_LOG(INFO, "Rolling back to {}, tail {}", to_lsn, m_tail_lsn.load());
THIS_LOGSTORE_LOG(INFO, "Rolling back to {}, start {} tail {}", to_lsn, m_start_lsn.load(), m_tail_lsn.load());
bool do_flush{false};
do {
{
Expand Down
2 changes: 1 addition & 1 deletion src/lib/logstore/log_store_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,6 @@ void LogStoreService::remove_log_store(logdev_id_t logdev_id, logstore_id_t stor
HS_LOG(INFO, logstore, "Removing logstore {} from logdev {}", store_id, logdev_id);
incr_pending_request_num();
folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx);
COUNTER_INCREMENT(m_metrics, logstores_count, 1);
const auto it = m_id_logdev_map.find(logdev_id);
if (it == m_id_logdev_map.end()) {
HS_LOG(WARN, logstore, "logdev id {} doesnt exist", logdev_id);
Expand All @@ -325,6 +324,7 @@ void LogStoreService::device_truncate() {
// TODO: make device_truncate_under_lock return future and do collectAllFutures;
if (is_stopping()) return;
incr_pending_request_num();
folly::SharedMutexWritePriority::ReadHolder holder(m_logdev_map_mtx);
for (auto& [id, logdev] : m_id_logdev_map) {
HS_LOG(DEBUG, logstore, "Truncating logdev {}", id);
logdev->truncate();
Expand Down
7 changes: 5 additions & 2 deletions src/lib/replication/log_store/home_raft_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,11 @@ void HomeRaftLogStore::write_at(ulong index, nuraft::ptr< nuraft::log_entry >& e
// calls, but it is dangerous to set higher number.
m_last_durable_lsn = -1;

m_log_store->append_async(sisl::io_blob{buf->data_begin(), uint32_cast(buf->size()), false /* is_aligned */},
nullptr /* cookie */, [buf](int64_t, sisl::io_blob&, logdev_key, void*) {});
auto const appended_seq =
m_log_store->append_async(sisl::io_blob{buf->data_begin(), uint32_cast(buf->size()), false /* is_aligned */},
nullptr /* cookie */, [buf](int64_t, sisl::io_blob&, logdev_key, void*) {});
HS_REL_ASSERT_EQ(to_repl_lsn(appended_seq), static_cast< repl_lsn_t >(index),
"write_at appended lsn mismatch: expected {} actual {}", index, to_repl_lsn(appended_seq));

auto position_in_cache = index % m_log_entry_cache.size();
{
Expand Down
34 changes: 34 additions & 0 deletions src/tests/test_home_raft_logstore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,15 @@ class RaftLogStoreClient {
ASSERT_EQ(m_rls->start_index(), m_start_lsn) << "Start Index not expected to be updated after insertion";
}

void write_at_start_bounday_test(nuraft::ptr< nuraft::log_entry >& le) {
m_rls->write_at(m_start_lsn, le);
ASSERT_EQ(m_rls->entry_at(m_start_lsn)->get_term(), le->get_term());
m_next_lsn = m_start_lsn + 1;
ASSERT_EQ(m_rls->start_index(), m_start_lsn);
ASSERT_EQ(m_rls->last_index(), m_start_lsn);
ASSERT_EQ(m_rls->next_slot(), m_next_lsn);
}

void rollback_test() {
m_next_lsn = (m_next_lsn - m_start_lsn) / 2; // Rollback half of the current logs
++m_cur_term;
Expand All @@ -92,6 +101,13 @@ class RaftLogStoreClient {
validate_all_logs();
}

void compact_all_test() {
m_start_lsn = m_next_lsn;
m_rls->compact(m_next_lsn - 1);
ASSERT_EQ(m_rls->start_index(), m_next_lsn) << "Post compaction, start_index is invalid";
validate_all_logs();
}

void pack_test(uint64_t from, int32_t cnt, pack_result_t& out_pack) {
out_pack.actual_data = m_rls->pack(from, cnt);
ASSERT_NE(out_pack.actual_data.get(), nullptr);
Expand Down Expand Up @@ -258,6 +274,24 @@ TEST_F(TestRaftLogStore, lifecycle_test) {
this->m_follower_store.append_read_test(nrecords); // total_records in follower = 4000
}

TEST_F(TestRaftLogStore, write_at_start_boundary_test) {
LOGINFO("Step 1: Append some records and then compact all");
this->m_leader_store.append_read_test(2);
this->m_leader_store.compact_all_test();

LOGINFO("Step 2: Write at start boundary when no logs exist, expect allowed");
// no more logs exist, write_at at start boundary should be allowed
auto le = nuraft::cs_new< nuraft::log_entry >(5 /*term*/, nuraft::buffer::alloc(8));
le->get_buf().put("ALLOW");
this->m_leader_store.write_at_start_bounday_test(le);

LOGINFO("Step 3: Write at start boundary again, expect allowed");
// write_at start boundary again to make sure rollback is allowed
le = nuraft::cs_new< nuraft::log_entry >(6 /*term*/, nuraft::buffer::alloc(8));
le->get_buf().put("ALLOW2");
this->m_leader_store.write_at_start_bounday_test(le);
}

SISL_OPTIONS_ENABLE(logging, test_home_raft_log_store, iomgr, test_common_setup)
SISL_OPTION_GROUP(test_home_raft_log_store,
(num_records, "", "num_records", "number of record to test",
Expand Down
Loading