Skip to content

Commit

Permalink
Introduce CancelAwaitingJobs() API in CompactionService (#13286)
Browse files Browse the repository at this point in the history
Summary:
Currently, when the primary instance shuts down, remote compaction continues to run and `CompactionService::Wait()` does not get aborted. This slows down `DB::Close()` as it waits for the completion of `CompactionService::Wait()`. Moreover, since shutdown has already begun, the compaction is unnecessary and will be wasted.

This PR introduces `CancelAwaitingJobs()` to the CompactionService interface. This allows users to implement cancellation of running remote compactions from the primary instance. When `CancelAllBackgroundWork()` is called on the primary instance, `CancelAwaitingJobs()` will be invoked, enabling a more efficient shutdown process.

Pull Request resolved: #13286

Test Plan:
Unit Test added
```
./compaction_service_test --gtest_filter="*CancelCompactionOnPrimarySide*"
```

Reviewed By: anand1976, cbi42

Differential Revision: D68035191

Pulled By: jaykorean

fbshipit-source-id: 47da641f7cbed1267f0a1f16924f57efde46216d
  • Loading branch information
jaykorean authored and facebook-github-bot committed Jan 15, 2025
1 parent 5938ad2 commit f9791d4
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 1 deletion.
45 changes: 44 additions & 1 deletion db/compaction/compaction_service_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ class MyTestCompactionService : public CompactionService {
}
}

void CancelAwaitingJobs() override { canceled_ = true; }

void OnInstallation(const std::string& /*scheduled_job_id*/,
CompactionServiceJobStatus status) override {
final_updated_status_ = status;
Expand Down Expand Up @@ -146,6 +148,7 @@ class MyTestCompactionService : public CompactionService {
}

void SetCanceled(bool canceled) { canceled_ = canceled; }
bool GetCanceled() { return canceled_; }

void GetResult(CompactionServiceResult* deserialized) {
CompactionServiceResult::Read(result_, deserialized).PermitUncheckedError();
Expand Down Expand Up @@ -194,12 +197,15 @@ class CompactionServiceTest : public DBTestBase {
options->statistics = primary_statistics_;
compactor_statistics_ = CreateDBStatistics();

compaction_service_ = std::make_shared<MyTestCompactionService>(
auto my_cs = std::make_shared<MyTestCompactionService>(
dbname_, *options, compactor_statistics_, remote_listeners,
remote_table_properties_collector_factories);

compaction_service_ = my_cs;
options->compaction_service = compaction_service_;
DestroyAndReopen(*options);
CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, *options);
my_cs->SetCanceled(false);
}

Statistics* GetCompactorStatistics() { return compactor_statistics_.get(); }
Expand Down Expand Up @@ -339,6 +345,8 @@ TEST_F(CompactionServiceTest, BasicCompactions) {
assert(*id != kNullUniqueId64x2);
verify_passed++;
});
Close();
my_cs->SetCanceled(false);
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "cf_1", "cf_2", "cf_3"},
options);
ASSERT_GT(verify_passed, 0);
Expand Down Expand Up @@ -996,6 +1004,41 @@ TEST_F(CompactionServiceTest, CancelCompactionOnRemoteSide) {
VerifyTestData();
}

TEST_F(CompactionServiceTest, CancelCompactionOnPrimarySide) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
ReopenWithCompactionService(&options);
GenerateTestData();

auto my_cs = GetCompactionService();

std::string start_str = Key(15);
std::string end_str = Key(45);
Slice start(start_str);
Slice end(end_str);
uint64_t comp_num = my_cs->GetCompactionNum();

ReopenWithCompactionService(&options);
GenerateTestData();
my_cs = GetCompactionService();

// Primary DB calls CancelAllBackgroundWork() while the compaction is running
SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::Run():Inprogress",
[&](void* /*arg*/) { CancelAllBackgroundWork(db_, false /*wait*/); });

SyncPoint::GetInstance()->EnableProcessing();

Status s = db_->CompactRange(CompactRangeOptions(), &start, &end);
ASSERT_TRUE(s.IsIncomplete());

// Check canceled_ was set to true by CancelAwaitingJobs()
ASSERT_TRUE(my_cs->GetCanceled());

// compaction number is not increased
ASSERT_GE(my_cs->GetCompactionNum(), comp_num);
}

TEST_F(CompactionServiceTest, FailedToStart) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
Expand Down
5 changes: 5 additions & 0 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,11 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
s.PermitUncheckedError(); //**TODO: What to do on error?
}

// Cancel awaiting remote compactions
if (immutable_db_options_.compaction_service) {
immutable_db_options_.compaction_service->CancelAwaitingJobs();
}

shutting_down_.store(true, std::memory_order_release);
bg_cv_.SignalAll();
if (!wait) {
Expand Down
3 changes: 3 additions & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,9 @@ class CompactionService : public Customizable {
return CompactionServiceJobStatus::kUseLocal;
}

// Cancel awaiting jobs. Called by CancelAllBackgroundWork()
virtual void CancelAwaitingJobs() {}

// Optional callback function upon Installation.
virtual void OnInstallation(const std::string& /*scheduled_job_id*/,
CompactionServiceJobStatus /*status*/) {}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Introduce CancelAwaitingJobs() in CompactionService interface which will allow users to implement cancellation of running remote compactions from the primary instance

0 comments on commit f9791d4

Please sign in to comment.