diff --git a/conanfile.py b/conanfile.py index 637a67aed..65ceccda4 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "7.3.3" + version = "7.3.4" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/lib/logstore/log_store.cpp b/src/lib/logstore/log_store.cpp index 7c1b8ec36..deeda2847 100644 --- a/src/lib/logstore/log_store.cpp +++ b/src/lib/logstore/log_store.cpp @@ -360,18 +360,21 @@ bool HomeLogStore::rollback(logstore_seq_num_t to_lsn) { incr_pending_request_num(); // Fast path if (to_lsn == m_tail_lsn.load()) { + THIS_LOGSTORE_LOG(INFO, "Rollback to the current tail_lsn {}, no-op", to_lsn); decr_pending_request_num(); return true; } - if (to_lsn > m_tail_lsn.load() || to_lsn < m_start_lsn.load()) { + // Special case: allow rollback to exactly start_lsn - 1. + // This handles the scenario where all logs were truncated and a leader switch happens before new logs commit. + if (to_lsn > m_tail_lsn.load() || to_lsn < m_start_lsn.load() - 1) { HS_LOG_ASSERT(false, "Attempted to rollback to {} which is not in the range of [{}, {}]", to_lsn, - m_start_lsn.load(), m_tail_lsn.load()); + m_start_lsn.load() - 1, m_tail_lsn.load()); decr_pending_request_num(); return false; } - THIS_LOGSTORE_LOG(INFO, "Rolling back to {}, tail {}", to_lsn, m_tail_lsn.load()); + THIS_LOGSTORE_LOG(INFO, "Rolling back to {}, start {} tail {}", to_lsn, m_start_lsn.load(), m_tail_lsn.load()); bool do_flush{false}; do { { diff --git a/src/lib/logstore/log_store_service.cpp b/src/lib/logstore/log_store_service.cpp index c1ddda6e7..7affe71b9 100644 --- a/src/lib/logstore/log_store_service.cpp +++ b/src/lib/logstore/log_store_service.cpp @@ -309,7 +309,6 @@ void LogStoreService::remove_log_store(logdev_id_t logdev_id, logstore_id_t stor HS_LOG(INFO, logstore, "Removing logstore {} from logdev {}", store_id, logdev_id); incr_pending_request_num(); folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); - COUNTER_INCREMENT(m_metrics, logstores_count, 1); const auto it = m_id_logdev_map.find(logdev_id); if (it == m_id_logdev_map.end()) { HS_LOG(WARN, logstore, "logdev id {} doesnt exist", logdev_id); @@ -325,6 +324,7 @@ void LogStoreService::device_truncate() { // TODO: make device_truncate_under_lock return future and do collectAllFutures; if (is_stopping()) return; incr_pending_request_num(); + folly::SharedMutexWritePriority::ReadHolder holder(m_logdev_map_mtx); for (auto& [id, logdev] : m_id_logdev_map) { HS_LOG(DEBUG, logstore, "Truncating logdev {}", id); logdev->truncate(); diff --git a/src/lib/replication/log_store/home_raft_log_store.cpp b/src/lib/replication/log_store/home_raft_log_store.cpp index ad909ec42..d4e80eb30 100644 --- a/src/lib/replication/log_store/home_raft_log_store.cpp +++ b/src/lib/replication/log_store/home_raft_log_store.cpp @@ -187,8 +187,11 @@ void HomeRaftLogStore::write_at(ulong index, nuraft::ptr< nuraft::log_entry >& e // calls, but it is dangerous to set higher number. m_last_durable_lsn = -1; - m_log_store->append_async(sisl::io_blob{buf->data_begin(), uint32_cast(buf->size()), false /* is_aligned */}, - nullptr /* cookie */, [buf](int64_t, sisl::io_blob&, logdev_key, void*) {}); + auto const appended_seq = + m_log_store->append_async(sisl::io_blob{buf->data_begin(), uint32_cast(buf->size()), false /* is_aligned */}, + nullptr /* cookie */, [buf](int64_t, sisl::io_blob&, logdev_key, void*) {}); + HS_REL_ASSERT_EQ(to_repl_lsn(appended_seq), static_cast< repl_lsn_t >(index), + "write_at appended lsn mismatch: expected {} actual {}", index, to_repl_lsn(appended_seq)); auto position_in_cache = index % m_log_entry_cache.size(); { diff --git a/src/tests/test_home_raft_logstore.cpp b/src/tests/test_home_raft_logstore.cpp index b4349e6b8..4c8cc3ebf 100644 --- a/src/tests/test_home_raft_logstore.cpp +++ b/src/tests/test_home_raft_logstore.cpp @@ -66,6 +66,15 @@ class RaftLogStoreClient { ASSERT_EQ(m_rls->start_index(), m_start_lsn) << "Start Index not expected to be updated after insertion"; } + void write_at_start_bounday_test(nuraft::ptr< nuraft::log_entry >& le) { + m_rls->write_at(m_start_lsn, le); + ASSERT_EQ(m_rls->entry_at(m_start_lsn)->get_term(), le->get_term()); + m_next_lsn = m_start_lsn + 1; + ASSERT_EQ(m_rls->start_index(), m_start_lsn); + ASSERT_EQ(m_rls->last_index(), m_start_lsn); + ASSERT_EQ(m_rls->next_slot(), m_next_lsn); + } + void rollback_test() { m_next_lsn = (m_next_lsn - m_start_lsn) / 2; // Rollback half of the current logs ++m_cur_term; @@ -92,6 +101,13 @@ class RaftLogStoreClient { validate_all_logs(); } + void compact_all_test() { + m_start_lsn = m_next_lsn; + m_rls->compact(m_next_lsn - 1); + ASSERT_EQ(m_rls->start_index(), m_next_lsn) << "Post compaction, start_index is invalid"; + validate_all_logs(); + } + void pack_test(uint64_t from, int32_t cnt, pack_result_t& out_pack) { out_pack.actual_data = m_rls->pack(from, cnt); ASSERT_NE(out_pack.actual_data.get(), nullptr); @@ -258,6 +274,24 @@ TEST_F(TestRaftLogStore, lifecycle_test) { this->m_follower_store.append_read_test(nrecords); // total_records in follower = 4000 } +TEST_F(TestRaftLogStore, write_at_start_boundary_test) { + LOGINFO("Step 1: Append some records and then compact all"); + this->m_leader_store.append_read_test(2); + this->m_leader_store.compact_all_test(); + + LOGINFO("Step 2: Write at start boundary when no logs exist, expect allowed"); + // no more logs exist, write_at at start boundary should be allowed + auto le = nuraft::cs_new< nuraft::log_entry >(5 /*term*/, nuraft::buffer::alloc(8)); + le->get_buf().put("ALLOW"); + this->m_leader_store.write_at_start_bounday_test(le); + + LOGINFO("Step 3: Write at start boundary again, expect allowed"); + // write_at start boundary again to make sure rollback is allowed + le = nuraft::cs_new< nuraft::log_entry >(6 /*term*/, nuraft::buffer::alloc(8)); + le->get_buf().put("ALLOW2"); + this->m_leader_store.write_at_start_bounday_test(le); +} + SISL_OPTIONS_ENABLE(logging, test_home_raft_log_store, iomgr, test_common_setup) SISL_OPTION_GROUP(test_home_raft_log_store, (num_records, "", "num_records", "number of record to test",