Skip to content

Commit

Permalink
issue=1114: LogAndApply multi-thread safe
Browse files Browse the repository at this point in the history
  • Loading branch information
caijieming committed Dec 15, 2016
1 parent 95b34b9 commit a1a2e59
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 2 deletions.
2 changes: 1 addition & 1 deletion src/leveldb/db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ Status DBImpl::PrepareCreateSanpshot(Snapshot* snapshot) {
if (!snapshot->dump_mem_on_snapshot) {
VersionEdit edit;
edit.SetPrepareCreateSnapshot(snapshot->name, snapshot->timestamp);
s = versions_->LogAndApply(&edit, &mutex_);
s = versions_->LogAndApply(&edit, &mutex_); // multi-thread safe
return s;
}

Expand Down
27 changes: 26 additions & 1 deletion src/leveldb/db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1051,7 +1051,16 @@ void VersionSet::InstallVersionSnapshot(Version* v, VersionEdit* edit) {
}
}

// dump version and snapshot
// multi thread safe
// Information kept for every waiting manifest writer
struct VersionSet::ManifestWriter {
Status status;
VersionEdit* edit;
bool done;
port::CondVar cv;

explicit ManifestWriter(port::Mutex* mu) : done(false), cv(mu) { }
};
Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
if (edit->has_log_number_) {
assert(edit->log_number_ >= log_number_);
Expand All @@ -1074,6 +1083,17 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
edit->SetLastSequence(last_sequence_);
}

mu->AssertHeld();
// multi write control, do not batch edit write, but multi thread safety
ManifestWriter w(mu);
w.edit = edit;
manifest_writers_.push_back(&w);
while (!w.done && &w != manifest_writers_.front()) {
w.cv.Wait();
}
assert(manifest_writers_.front() == &w);

// first manifest writer, batch edit
Version* v = new Version(this);
{
VersionSetBuilder builder(this, current_);
Expand Down Expand Up @@ -1189,6 +1209,11 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
Log(options_->info_log, "[%s][dfs error] set force_switch_manifest", dbname_.c_str());
}

// TODO: batch write manifest finish, no need
manifest_writers_.pop_front();
if (!manifest_writers_.empty()) {
manifest_writers_.front()->cv.Signal();
}
return s;
}

Expand Down
3 changes: 3 additions & 0 deletions src/leveldb/db/version_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ class VersionSet {
friend class Compaction;
friend class Version;
friend class VersionSetBuilder;
struct ManifestWriter;

void Finalize(Version* v);

Expand Down Expand Up @@ -326,6 +327,8 @@ class VersionSet {
uint64_t log_number_;
uint64_t prev_log_number_; // 0 or backing store for memtable being compacted

std::deque<ManifestWriter*> manifest_writers_;

// Opened lazily
WritableFile* descriptor_file_;
log::Writer* descriptor_log_;
Expand Down

0 comments on commit a1a2e59

Please sign in to comment.