diff --git a/db/compaction/compaction_service_test.cc b/db/compaction/compaction_service_test.cc index d3ee76aab8a..7be7d0cef88 100644 --- a/db/compaction/compaction_service_test.cc +++ b/db/compaction/compaction_service_test.cc @@ -114,6 +114,8 @@ class MyTestCompactionService : public CompactionService { } } + void CancelAwaitingJobs() override { canceled_ = true; } + void OnInstallation(const std::string& /*scheduled_job_id*/, CompactionServiceJobStatus status) override { final_updated_status_ = status; @@ -146,6 +148,7 @@ class MyTestCompactionService : public CompactionService { } void SetCanceled(bool canceled) { canceled_ = canceled; } + bool GetCanceled() { return canceled_; } void GetResult(CompactionServiceResult* deserialized) { CompactionServiceResult::Read(result_, deserialized).PermitUncheckedError(); @@ -194,12 +197,15 @@ class CompactionServiceTest : public DBTestBase { options->statistics = primary_statistics_; compactor_statistics_ = CreateDBStatistics(); - compaction_service_ = std::make_shared( + auto my_cs = std::make_shared( dbname_, *options, compactor_statistics_, remote_listeners, remote_table_properties_collector_factories); + + compaction_service_ = my_cs; options->compaction_service = compaction_service_; DestroyAndReopen(*options); CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, *options); + my_cs->SetCanceled(false); } Statistics* GetCompactorStatistics() { return compactor_statistics_.get(); } @@ -339,6 +345,8 @@ TEST_F(CompactionServiceTest, BasicCompactions) { assert(*id != kNullUniqueId64x2); verify_passed++; }); + Close(); + my_cs->SetCanceled(false); ReopenWithColumnFamilies({kDefaultColumnFamilyName, "cf_1", "cf_2", "cf_3"}, options); ASSERT_GT(verify_passed, 0); @@ -996,6 +1004,41 @@ TEST_F(CompactionServiceTest, CancelCompactionOnRemoteSide) { VerifyTestData(); } +TEST_F(CompactionServiceTest, CancelCompactionOnPrimarySide) { + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + ReopenWithCompactionService(&options); + GenerateTestData(); + + auto my_cs = GetCompactionService(); + + std::string start_str = Key(15); + std::string end_str = Key(45); + Slice start(start_str); + Slice end(end_str); + uint64_t comp_num = my_cs->GetCompactionNum(); + + ReopenWithCompactionService(&options); + GenerateTestData(); + my_cs = GetCompactionService(); + + // Primary DB calls CancelAllBackgroundWork() while the compaction is running + SyncPoint::GetInstance()->SetCallBack( + "CompactionJob::Run():Inprogress", + [&](void* /*arg*/) { CancelAllBackgroundWork(db_, false /*wait*/); }); + + SyncPoint::GetInstance()->EnableProcessing(); + + Status s = db_->CompactRange(CompactRangeOptions(), &start, &end); + ASSERT_TRUE(s.IsIncomplete()); + + // Check canceled_ was set to true by CancelAwaitingJobs() + ASSERT_TRUE(my_cs->GetCanceled()); + + // compaction number is not increased + ASSERT_GE(my_cs->GetCompactionNum(), comp_num); +} + TEST_F(CompactionServiceTest, FailedToStart) { Options options = CurrentOptions(); options.disable_auto_compactions = true; diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 140d7535916..01a89d92e7c 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -508,6 +508,11 @@ void DBImpl::CancelAllBackgroundWork(bool wait) { s.PermitUncheckedError(); //**TODO: What to do on error? } + // Cancel awaiting remote compactions + if (immutable_db_options_.compaction_service) { + immutable_db_options_.compaction_service->CancelAwaitingJobs(); + } + shutting_down_.store(true, std::memory_order_release); bg_cv_.SignalAll(); if (!wait) { diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 2a4ec9831d7..6dfe12a7f93 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -527,6 +527,9 @@ class CompactionService : public Customizable { return CompactionServiceJobStatus::kUseLocal; } + // Cancel awaiting jobs. Called by CancelAllBackgroundWork() + virtual void CancelAwaitingJobs() {} + // Optional callback function upon Installation. virtual void OnInstallation(const std::string& /*scheduled_job_id*/, CompactionServiceJobStatus /*status*/) {} diff --git a/unreleased_history/new_features/remote_compaction_cancel_awaiting_jobs_api.md b/unreleased_history/new_features/remote_compaction_cancel_awaiting_jobs_api.md new file mode 100644 index 00000000000..5386c586c71 --- /dev/null +++ b/unreleased_history/new_features/remote_compaction_cancel_awaiting_jobs_api.md @@ -0,0 +1 @@ +Introduce CancelAwaitingJobs() in CompactionService interface which will allow users to implement cancellation of running remote compactions from the primary instance