diff --git a/src/leveldb/db/version_set.cc b/src/leveldb/db/version_set.cc index 871ed16e1..04d12a1a6 100644 --- a/src/leveldb/db/version_set.cc +++ b/src/leveldb/db/version_set.cc @@ -1051,7 +1051,16 @@ void VersionSet::InstallVersionSnapshot(Version* v, VersionEdit* edit) { } } -// dump version and snapshot +// multi thread safe +// Information kept for every waiting manifest writer +struct VersionSet::ManifestWriter { + Status status; + VersionEdit* edit; + bool done; + port::CondVar cv; + + explicit ManifestWriter(port::Mutex* mu) : done(false), cv(mu) { } +}; Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { if (edit->has_log_number_) { assert(edit->log_number_ >= log_number_); @@ -1074,6 +1083,17 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { edit->SetLastSequence(last_sequence_); } + mu->AssertHeld(); + // multi write control, do not batch edit write, but multi thread safety + ManifestWriter w(mu); + w.edit = edit; + manifest_writers_.push_back(&w); + while (!w.done && &w != manifest_writers_.front()) { + w.cv.Wait(); + } + assert(manifest_writers_.front() == &w); + + // first manifest writer, batch edit Version* v = new Version(this); { VersionSetBuilder builder(this, current_); @@ -1189,6 +1209,11 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { Log(options_->info_log, "[%s][dfs error] set force_switch_manifest", dbname_.c_str()); } + // TODO: batch write manifest finish + manifest_writers_.pop_front(); + if (!manifest_writers_.empty()) { + manifest_writers_.front()->cv.Signal(); + } return s; } diff --git a/src/leveldb/db/version_set.h b/src/leveldb/db/version_set.h index 1f4766867..a7a01d9ff 100644 --- a/src/leveldb/db/version_set.h +++ b/src/leveldb/db/version_set.h @@ -279,6 +279,7 @@ class VersionSet { friend class Compaction; friend class Version; friend class VersionSetBuilder; + struct ManifestWriter; void Finalize(Version* v); @@ -326,6 +327,8 @@ class VersionSet { uint64_t log_number_; uint64_t prev_log_number_; // 0 or backing store for memtable being compacted + std::deque manifest_writers_; + // Opened lazily WritableFile* descriptor_file_; log::Writer* descriptor_log_;