From 95b34b951dd3555588c0bade0280aaa334445c4b Mon Sep 17 00:00:00 2001 From: caijieming Date: Wed, 14 Dec 2016 15:07:05 +0800 Subject: [PATCH 1/2] issue=1114: leveldb support version snapshot step 1: Prototype implementation --- src/leveldb/db/db_impl.cc | 70 +++++++- src/leveldb/db/db_impl.h | 18 +- src/leveldb/db/version_edit.cc | 92 ++++++++++- src/leveldb/db/version_edit.h | 66 ++++++++ src/leveldb/db/version_set.cc | 271 +++++++++++++++++++++++++++++-- src/leveldb/db/version_set.h | 10 ++ src/leveldb/include/leveldb/db.h | 22 +++ 7 files changed, 531 insertions(+), 18 deletions(-) diff --git a/src/leveldb/db/db_impl.cc b/src/leveldb/db/db_impl.cc index 7ae24a5c6..6ea2405ef 100644 --- a/src/leveldb/db/db_impl.cc +++ b/src/leveldb/db/db_impl.cc @@ -579,7 +579,8 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, return s; } -Status DBImpl::CompactMemTable() { +// dump memtable and snapshot +Status DBImpl::CompactMemTable(Snapshot* snapshot) { mutex_.AssertHeld(); assert(imm_ != NULL); @@ -601,6 +602,9 @@ Status DBImpl::CompactMemTable() { if (imm_->GetLastSequence()) { edit.SetLastSequence(imm_->GetLastSequence()); } + if (snapshot) { + edit.SetPrepareCreateSnapshot(snapshot->name, snapshot->timestamp); + } Log(options_.info_log, "[%s] CompactMemTable SetLastSequence %lu", dbname_.c_str(), edit.GetLastSequence()); s = versions_->LogAndApply(&edit, &mutex_); @@ -689,6 +693,69 @@ Status DBImpl::TEST_CompactMemTable() { return s; } +// impl snapshot +Status DBImpl::ShowSanpshot(std::vector* snapshot_list = NULL); +Status DBImpl::ShowAllSanpshot(std::vector* snapshot_list = NULL); + +Status DBImpl::PrepareCheckoutSanpshot(Snapshot* dest_snapshot, Snapshot* src_snapshot); +Status DBImpl::CommitCheckoutSanpshot(Snapshot* dest_snapshot, Snapshot* src_snapshot); +Status DBImpl::RollbackCheckoutSanpshot(Snapshot* dest_snapshot, Snapshot* src_snapshot); + +Status DBImpl::PrepareCreateSanpshot(Snapshot* snapshot) { + assert(snapshot); + MutexLock l(&mutex_); + + Status s; + Log(options_.info_log, "[%s] prepare create snapshot, dump mem table", dbname_.c_str()); + if (!snapshot->dump_mem_on_snapshot) { + VersionEdit edit; + edit.SetPrepareCreateSnapshot(snapshot->name, snapshot->timestamp); + s = versions_->LogAndApply(&edit, &mutex_); + return s; + } + + if (imm_ != NULL) { + s = CompactMemTable(); + } + if (s.ok()) { + assert(imm_ == NULL); + while (is_writting_mem_) { + writting_mem_cv_.Wait(); + } + imm_ = mem_; + has_imm_.Release_Store(imm_); + mem_ = NewMemTable(); + mem_->Ref(); + bound_log_size_ = 0; + s = CompactMemTable(snapshot); + } + return s; +} +Status DBImpl::CommitCreateSanpshot(Snapshot* snapshot) { + assert(snapshot); + MutexLock l(&mutex_); + Status s; + + VersionEdit edit; + edit.SetCommitCreateSnapshot(snapshot->name, snapshot->timestamp); + s = versions_->LogAndApply(&edit, &mutex_); + return s; +} +Status DBImpl::RollbackCreateSanpshot(Snapshot* snapshot) { + assert(snapshot); + MutexLock l(&mutex_); + Status s; + + VersionEdit edit; + edit.SetRollbackCreateSnapshot(snapshot->name, snapshot->timestamp); + s = versions_->LogAndApply(&edit, &mutex_); + return s; +} + +Status DBImpl::PrepareDeleteSanpshot(Snapshot* snapshot); +Status DBImpl::CommitDeleteSanpshot(Snapshot* snapshot); +Status DBImpl::RollbackDeleteSanpshot(Snapshot* snapshot); + // tera-specific bool DBImpl::FindSplitKey(double ratio, std::string* split_key) { @@ -1483,7 +1550,6 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { return w.status; } - // May temporarily unlock and wait. Status status = MakeRoomForWrite(my_batch == NULL); diff --git a/src/leveldb/db/db_impl.h b/src/leveldb/db/db_impl.h index ec6d62100..95bb3c211 100644 --- a/src/leveldb/db/db_impl.h +++ b/src/leveldb/db/db_impl.h @@ -54,6 +54,22 @@ class DBImpl : public DB { void AddBoundLogSize(uint64_t size); + // impl snapshot + virtual Status ShowSanpshot(std::vector* snapshot_list = NULL); + virtual Status ShowAllSanpshot(std::vector* snapshot_list = NULL); + + virtual Status PrepareCheckoutSanpshot(Snapshot* dest_snapshot, Snapshot* src_snapshot); + virtual Status CommitCheckoutSanpshot(Snapshot* dest_snapshot, Snapshot* src_snapshot); + virtual Status RollbackCheckoutSanpshot(Snapshot* dest_snapshot, Snapshot* src_snapshot); + + virtual Status PrepareCreateSanpshot(Snapshot* parent_snapshot, Snapshot* snapshot); + virtual Status CommitCreateSanpshot(Snapshot* snapshot); + virtual Status RollbackCreateSanpshot(Snapshot* snapshot); + + virtual Status PrepareDeleteSanpshot(Snapshot* snapshot); + virtual Status CommitDeleteSanpshot(Snapshot* snapshot); + virtual Status RollbackDeleteSanpshot(Snapshot* snapshot); + // tera-specific virtual bool BusyWrite(); virtual void Workload(double* write_workload); @@ -110,7 +126,7 @@ class DBImpl : public DB { // Compact the in-memory write buffer to disk. Switches to a new // log-file/memtable and writes a new descriptor iff successful. - Status CompactMemTable() + Status CompactMemTable(Snapshot* snapshot = NULL) EXCLUSIVE_LOCKS_REQUIRED(mutex_); Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base) diff --git a/src/leveldb/db/version_edit.cc b/src/leveldb/db/version_edit.cc index cc564e36b..3617407d9 100644 --- a/src/leveldb/db/version_edit.cc +++ b/src/leveldb/db/version_edit.cc @@ -28,7 +28,13 @@ enum Tag { kPrevLogNumber = 9, kNewFile = 10, kDeletedFile = 11, - // no more than 4096 + + kPrepareCreateSnapshot = 12, + kCommitCreateSnapshot = 13, + kRollbackCreateSnapshot = 14, + kBaseSnapshot = 15, + + // no more than 100w kMaxTag = 1 << 20, }; @@ -43,10 +49,28 @@ void VersionEdit::Clear() { has_prev_log_number_ = false; has_next_file_number_ = false; has_last_sequence_ = false; + + has_prepare_create_snapshot_ = false; + has_commit_create_snapshot_ = false; + has_rollback_create_snapshot_ = false; + has_base_snapshot_ = false; + deleted_files_.clear(); new_files_.clear(); } +void VersionEdit::ClearFile() { + new_files_.clear(); + deleted_files_.clear(); +} +bool VersionEdit::HasCompactPointer(int level) { + for (size_t i = 0; i < compact_pointers_.size(); i++) { + if (compact_pointers_[i].first == level) { + return true; + } + } + return false; +} void VersionEdit::EncodeTo(std::string* dst) const { if (has_comparator_) { std::string str; @@ -136,6 +160,44 @@ void VersionEdit::EncodeTo(std::string* dst) const { PutVarint32(dst, kNewFile); dst->append(str.data(), str.size()); } + + // use 2pc impl snapshot create + if (has_prepare_create_snapshot_) { + std::string str; + PutLengthPrefixedSlice(&str, prepare_create_snapshot_.name); + PutVarint64(&str, prepare_create_snapshot_.timestamp); + + PutVarint32(dst, str.size() + kMaxTag); + PutVarint32(dst, kPrepareCreateSnapshot); + dst->append(str.data(), str.size()); + } + if (has_commit_create_snapshot_) { + std::string str; + PutLengthPrefixedSlice(&str, commit_create_snapshot_.name); + PutVarint64(&str, commit_create_snapshot_.timestamp); + + PutVarint32(dst, str.size() + kMaxTag); + PutVarint32(dst, kCommitCreateSnapshot); + dst->append(str.data(), str.size()); + } else if (has_rollback_create_snapshot_) { + std::string str; + PutLengthPrefixedSlice(&str, rollback_create_snapshot_.name); + PutVarint64(&str, rollback_create_snapshot_.timestamp); + + PutVarint32(dst, str.size() + kMaxTag); + PutVarint32(dst, kRollbackCreateSnapshot); + dst->append(str.data(), str.size()); + } + + if (has_base_snapshot_) { + std::string str; + PutLengthPrefixedSlice(&str, base_snapshot_.name); + PutVarint64(&str, base_snapshot_.timestamp); + + PutVarint32(dst, str.size() + kMaxTag); + PutVarint32(dst, kBaseSnapshot); + dst->append(str.data(), str.size()); + } } static bool GetInternalKey(Slice* input, InternalKey* dst) { @@ -291,6 +353,34 @@ Status VersionEdit::DecodeFrom(const Slice& src) { } break; + case kPrepareCreateSnapshot: + has_prepare_create_snapshot_ = true; + GetLengthPrefixedSlice(&input, &str); + prepare_create_snapshot_.name = str.ToString(); + GetVarint64(&input, &prepare_create_snapshot_.timestamp); + break; + + case kCommitCreateSnapshot: + has_commit_create_snapshot_ = true; + GetLengthPrefixedSlice(&input, &str); + commit_create_snapshot_.name = str.ToString(); + GetVarint64(&input, &commit_create_snapshot_.timestamp); + break; + + case kRollbackCreateSnapshot: + has_rollback_create_snapshot_ = true; + GetLengthPrefixedSlice(&input, &str); + rollback_create_snapshot_.name = str.ToString(); + GetVarint64(&input, &rollback_create_snapshot_.timestamp); + break; + + case kBaseSnapshot: + has_base_snapshot_ = true; + GetLengthPrefixedSlice(&input, &str); + base_snapshot_.name = str.ToString(); + GetVarint64(&input, &base_snapshot_.timestamp); + break; + default: // tag not know, skip it. input.remove_prefix(len); fprintf(stderr, "VersionEdit, skip unknow tag %d, len %d\n", tag, len); diff --git a/src/leveldb/db/version_edit.h b/src/leveldb/db/version_edit.h index c01d11ff8..6347e4fa2 100644 --- a/src/leveldb/db/version_edit.h +++ b/src/leveldb/db/version_edit.h @@ -46,7 +46,39 @@ class VersionEdit { ~VersionEdit() { } void Clear(); + void ClearFile(); + void SetBaseSnapshot() { + if (has_base_snapshot_) { + return; + } + assert(has_prepare_create_snapshot_); + has_prepare_create_snapshot_ = false; + has_base_snapshot_ = true; + base_snapshot_.name = prepare_create_snapshot_.name; + base_snapshot_.timestamp = prepare_create_snapshot_.timestamp; + } + void SetBaseSnapshot(const Slice& name, int64_t timestamp) { + has_prepare_create_snapshot_ = false; + has_base_snapshot_ = true; + base_snapshot_.name = name.ToString(); + base_snapshot_.timestamp = timestamp; + } + void SetPrepareCreateSnapshot(const Slice& name, int64_t timestamp) { + has_prepare_create_snapshot_ = true; + prepare_create_snapshot_.name = name.ToString(); + prepare_create_snapshot_.timestamp = timestamp; + } + void SetCommitCreateSnapshot(const Slice& name, int64_t timestamp) { + has_commit_create_snapshot_ = true; + commit_create_snapshot_.name = name.ToString(); + commit_create_snapshot_.timestamp = timestamp; + } + void SetRollbackCreateSnapshot(const Slice& name, int64_t timestamp) { + has_rollback_create_snapshot_ = true; + rollback_create_snapshot_.name = name.ToString(); + rollback_create_snapshot_.timestamp = timestamp; + } void SetComparatorName(const Slice& name) { has_comparator_ = true; comparator_ = name.ToString(); @@ -77,6 +109,9 @@ class VersionEdit { uint64_t GetLogNumber() const { return log_number_; } + uint64_t GetPrevLogNumber() const { + return prev_log_number_; + } uint64_t GetNextFileNumber() const { return next_file_number_; } @@ -93,9 +128,13 @@ class VersionEdit { bool HasLogNumber() const { return has_log_number_; } + bool HasPrevLogNumber() const { + return has_prev_log_number_; + } bool HasComparator() const { return has_comparator_; } + bool HasCompactPointer(int level); bool HasFiles(std::vector* deleted_files, std::vector* added_files) { bool has_files = deleted_files_.size() > 0 @@ -115,6 +154,18 @@ class VersionEdit { // } return has_files; } + bool HasPrepareCreateSnapshot() { + return has_prepare_create_snapshot_; + } + bool HasCommitCreateSnapshot() { + return has_commit_create_snapshot_; + } + bool HasRollbackCreateSnapshot() { + return has_rollback_create_snapshot_; + } + bool HasBaseSnapshot() { + return has_base_snapshot_; + } void ModifyForMerge(std::map num_map) { //if (num_map.size() == 0) { @@ -192,6 +243,21 @@ class VersionEdit { bool has_next_file_number_; bool has_last_sequence_; + // version snapshot impl + bool has_prepare_create_snapshot_; + bool has_commit_create_snapshot_; + bool has_rollback_create_snapshot_; + bool has_base_snapshot_; + + struct SnapshotEdit { + std::string name; + int64_t timestamp; + }; + SnapshotEdit prepare_create_snapshot_; + SnapshotEdit commit_create_snapshot_; + SnapshotEdit rollback_create_snapshot_; + SnapshotEdit base_snapshot_; + std::vector< std::pair > compact_pointers_; // Files in Version could be deleted by file number or file meta. diff --git a/src/leveldb/db/version_set.cc b/src/leveldb/db/version_set.cc index 0de718c11..871ed16e1 100644 --- a/src/leveldb/db/version_set.cc +++ b/src/leveldb/db/version_set.cc @@ -964,6 +964,7 @@ VersionSet::~VersionSet() { } } current_->Unref(); + ReleaseVersionSnapshot(); assert(dummy_versions_.next_ == &dummy_versions_); // List must be empty delete descriptor_log_; delete descriptor_file_; @@ -985,7 +986,72 @@ void VersionSet::AppendVersion(Version* v) { v->prev_->next_ = v; v->next_->prev_ = v; } +void VersionSet::ReleaseVersionSnapshot() { + std::map >::iterator it; + it = version_snapshot_prepare_.begin(); + for (it != version_snapshot_prepare_.end(); ++it) { + Version* v = it->second.first; + v.Unref(); + } + it = version_snapshot_.begin(); + for (it != version_snapshot_.end(); ++it) { + Version* v = it->second.first; + v.Unref(); + } +} +void VersionSet::InstallVersionSnapshot(Version* v, VersionEdit* edit) { + if (edit->has_prepare_create_snapshot_) { + std::string vkey; + PutVarint64(&vkey, edit->prepare_create_snapshot_.timestamp); + vkey.append(edit->prepare_create_snapshot_.name.data(), + edit->prepare_create_snapshot_.name.size()); + assert(version_snapshot_prepare_.find(vkey) == version_snapshot_prepare_.end()); + + std::string record; + edit->EncodeTo(&record); + // insert into prepare list + version_snapshot_prepare_[vkey] = std::pair(v, record); + v->Ref(); // ref vesion + } else if (edit->has_base_snapshot_) { + std::string vkey; + PutVarint64(&vkey, edit->base_snapshot_.timestamp); + vkey.append(edit->base_snapshot_.name.data(), + edit->base_snapshot_.name.size()); + assert(version_snapshot_.find(vkey) == version_snapshot_.end()); + + std::string record; + edit->EncodeTo(&record); + // insert into commit list + version_snapshot_[vkey] = std::pair(v, record); + v->Ref(); // ref vesion + } else if (edit->has_commit_create_snapshot_) { + std::string vkey; + PutVarint64(&vkey, edit->commit_create_snapshot_.timestamp); + vkey.append(edit->commit_create_snapshot_.name.data(), + edit->commit_create_snapshot_.name.size()); + std::map > it = version_snapshot_prepare_.find(vkey); + assert(it != version_snapshot_prepare_.end()); + + assert(version_snapshot_.find(vkey) == version_snapshot_.end()); + // move sv from prepare to commit list + version_snapshot_[vkey] = it->second; + version_snapshot_prepare_.erase(it); + } else if (edit->has_rollback_create_snapshot_) { + std::string vkey; + PutVarint64(&vkey, edit->rollback_create_snapshot_.timestamp); + vkey.append(edit->rollback_create_snapshot_.name.data(), + edit->rollback_create_snapshot_.name.size()); + std::map > it = version_snapshot_prepare_.find(vkey); + assert(it != version_snapshot_prepare_.end()); + + Version* sv = it->second.first; + // delete from prepare list + version_snapshot_prepare_.erase(it); + sv->Unref(); // unref version + } +} +// dump version and snapshot Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { if (edit->has_log_number_) { assert(edit->log_number_ >= log_number_); @@ -1060,6 +1126,9 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { // Write new record to MANIFEST log if (s.ok()) { + if (edit->has_prepare_create_snapshot_) { + CreateVersionSnapshot(edit, v); // create a new version + } std::string record; edit->EncodeTo(&record); s = descriptor_log_->AddRecord(record); @@ -1095,14 +1164,12 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { } // write manifest error, cause switch - if (!s.ok()) { - if (!new_manifest_file.empty()) { - delete descriptor_log_; - delete descriptor_file_; - descriptor_log_ = NULL; - descriptor_file_ = NULL; - env_->DeleteFile(new_manifest_file); - } + if (!s.ok() && !new_manifest_file.empty()) { + delete descriptor_log_; + delete descriptor_file_; + descriptor_log_ = NULL; + descriptor_file_ = NULL; + env_->DeleteFile(new_manifest_file); } mu->Lock(); @@ -1111,6 +1178,8 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { // Install the new version if (s.ok()) { AppendVersion(v); + InstallVersionSnapshot(v, edit); + log_number_ = edit->log_number_; prev_log_number_ = edit->prev_log_number_; last_sequence_ = edit->GetLastSequence(); @@ -1256,8 +1325,14 @@ Status VersionSet::Recover() { uint64_t log_number = 0; uint64_t prev_log_number = 0; + std::string current_name = "Tera.current_version"; + std::map > > builder_map; + std::vector status_vec; for (size_t i = 0; i < files.size(); ++i) { - VersionSetBuilder builder(this, current_); + // new an empty version + AppendVersion(new Version(this)); + VersionSetBuilder* builder = new VersionSetBuilder(this, current_); + LogReporter reporter; reporter.status = &s; log::Reader reader(files[i], &reporter, true/*checksum*/, 0/*initial_offset*/); @@ -1286,7 +1361,33 @@ Status VersionSet::Recover() { } if (s.ok()) { - builder.Apply(&edit); + if (edit.has_prepare_create_snapshot_ || edit.has_base_snapshot_) { + // delete all previous edit, and evict base_version with an empty version + delete builder; + AppendVersion(new Version(this)); + builder = new VersionSetBuilder(this, current_); // current_ = empty version + builder->Apply(&edit); + Version* sv = new Version(this); + builder.SaveTo(sv); + Finalize(sv); + AppendVersion(sv); + + std::string name = edit.has_prepare_create_snapshot_? + edit.prepare_snapshot.name: + edit.base_snapshot_.name; + int64_t ts = edit.has_prepare_create_snapshot_? + edit.prepare_snapshot.timestamp: + edit.base_snapshot.timestamp; + std::string vkey; + PutVarint64(&vkey, ts); + vkey.append(name.data(), name.size()); + builder_map[vkey].push_back( + std::pair(record.ToString(), builder)); + + builder = new VersionSetBuilder(this, current_); // current_ = sv + } else { + builder->Apply(&edit); + } } if (edit.has_log_number_) { @@ -1320,22 +1421,70 @@ Status VersionSet::Recover() { delete files[i]; if (record_num == 0) { // empty manifest, delete it - ArchiveFile(env_, dscname[i]); - return Status::Corruption("empty manifest:" + s.ToString()); + s = Status::Corruption("empty manifest:" + s.ToString()); + status_vec.push_back(s); } if (s.ok()) { Version* v = new Version(this); builder.SaveTo(v); Finalize(v); AppendVersion(v); + + std::string e; + builder_map[current_name].push_back( + std::pair(e, builder)); Log(options_->info_log, "[%s] recover manifest finish: %s\n", dbname_.c_str(), dscname[i].c_str()); - } else { + } + //delete builder; + + if (!s.ok()) { Log(options_->info_log, "[%s] recover manifest fail %s, %s\n", dbname_.c_str(), dscname[i].c_str(), s.ToString().c_str()); ArchiveFile(env_, dscname[i]); - return Status::Corruption("recover manifest fail:" + s.ToString()); + status_vec.push_back(Status::Corruption("recover manifest fail:" + s.ToString())); + } + } + + // merge version snapshot and current_ + std::map > >::iterator it = builder_map.begin(); + for (; it != builder_map.end(); ++it) { + if (it->first != current_name) { // merge version snapshot + std::vector >::iterator it_vec = it->second.begin(); + Version* sv = new Version(this); + VersionEdit e; + for (; it_vec != it->second.end(); ++it_vec) { + VersionSetBuilder* builder = it_vec->second; + builder.SaveTo(sv); + VersoinEdit part_edit; + part_edit.DecodeFrom(it_vec->first); + MergeVersionEdit(&e, &part_edit); + delete builder; + } + Finalize(sv); + AppendVersion(sv); + + CreateVersionSnapshot(&e, sv); + InstallVersionSnapshot(sv, &e); + } + } + it = builder_map.find(current_name); + assert(it != builder_map.end()); + if (it->first == current_name) { // merge version snapshot + std::vector >::iterator it_vec = it->second.begin(); + Version* sv = new Version(this); + for (; it_vec != it->second.end(); ++it_vec) { + VersionSetBuilder* builder = it_vec->second; + builder.SaveTo(sv); + delete builder; } + Finalize(sv); + AppendVersion(sv); + } + + if (status_vec.size() > 0) { + // recover fail + return status_vec[0]; } if (s.ok()) { @@ -1468,9 +1617,103 @@ void VersionSet::Finalize(Version* v) { v->compaction_score_ = best_score; } +void VersionSet::MergeVersionEdit(VersionEdit* dest, VersionEdit* src) { + if (!dest->has_base_snapshot_) { + if (src->has_prepare_create_snapshot_) { + dest->SetPrepareCreateSnapshot(src->prepare_create_snapshot_.name, + src->prepare_create_snapshot_.timestamp); + } + if (src->has_base_snapshot_) { + dest->SetBaseSnapshot(src->base_snapshot_.name, + src->base_snapshot_.timestamp); + } + } + // Save metadata + assert(src->GetComparatorName()); + dest->SetComparatorName(src->GetComparatorName()); + assert(src->HasNextFile()) + if (!dest->HasNextFile() || dest->GetNextFileNumber() < src->GetNextFileNumber()) { + dest->SetNextFile(src->GetNextFileNumber()); + } + assert(src->HasLastSequence()); + if (!dest->HasLastSequence() || dest->GetLastSequence() < src->GetLastSequence()) { + dest->SetLastSequence(src->GetLastSequence()); + } + assert(src->HasLogNumber()); + if (!dest->HasLogNumber() || dest->GetLogNumber() < src->GetLogNumber()) { + dest->SetLogNumber(src->GetLogNumber()); + } + assert(src->HasPrevLogNumber()); + if (!dest->HasPrevLogNumber() || dest->GetPrevLogNumber() < src->GetPrevLogNumber()) { + dest->SetPrevLogNumber(src->GetPrevLogNumber()); + } +} +void VersionSet::CreateVersionSnapshot(VersionEdit* edit, Version* v) { + // Save metadata + if (!edit->HasComparator()) { + edit->SetComparatorName(icmp_.user_comparator()->Name()); + } + if (!edit->HasNextFile()) { + edit->SetNextFile(next_file_number_); + } + if (!edit->HasLastSequence()) { + edit->SetLastSequence(last_sequence_); + } + if (!edit->HasLogNumber()) { + edit->SetLogNumber(log_number_); + } + if (!edit->HasPrevLogNumber()) { + edit->SetPrevLogNumber(prev_log_number_); + } + + // Save compaction pointers + for (int level = 0; level < config::kNumLevels; level++) { + if (!compact_pointer_[level].empty()) { + InternalKey key; + key.DecodeFrom(compact_pointer_[level]); + if (!edit->HasCompactPointer(level)) { + edit->SetCompactPointer(level, key); + } + } + } + + // Save files + for (int level = 0; level < config::kNumLevels; level++) { + const std::vector& files = v->files_[level]; + for (size_t i = 0; i < files.size(); i++) { + edit->ClearFile(); + edit->AddFile(level, *files[i]); + } + } + return; +} Status VersionSet::WriteSnapshot(log::Writer* log) { // TODO: Break up into multiple records to reduce memory usage on recovery? + // dump version snapshot + std::map > it; + it = version_snapshot_prepare_.begin(); + for (; it != version_snapshot_prepare_.end(); ++it) { + Status s = log->AddRecord(it->second.second); + if (!s.ok()) { + return s; + } + } + it = version_snapshot_.begin(); + for (; it != version_snapshot_.end(); ++it) { + Status s; + VersionEdit e; + // set prepare into commit edit + s = e.DecodeFrom(it->second.second); + e.SetBaseSnapshot(); + std::string r; + e.EncodeTo(&r); + s = log->AddRecord(r); + if (!s.ok()) { + return s; + } + } + // Save metadata VersionEdit edit; edit.SetComparatorName(icmp_.user_comparator()->Name()); diff --git a/src/leveldb/db/version_set.h b/src/leveldb/db/version_set.h index 676cc64ee..1f4766867 100644 --- a/src/leveldb/db/version_set.h +++ b/src/leveldb/db/version_set.h @@ -305,6 +305,12 @@ class VersionSet { bool ModifyFileSize(FileMetaData* f); + // version snapshot impl + void CreateVersionSnapshot(VersionEdit* edit, Version* v); + void MergeVersionEdit(VersionEdit* dest, VersionEdit* src); + void InstallVersionSnapshot(Version* v, VersionEdit* edit); + void ReleaseVersionSnapshot(); + Env* const env_; const std::string dbname_; const Options* const options_; @@ -326,6 +332,10 @@ class VersionSet { Version dummy_versions_; // Head of circular doubly-linked list of versions. Version* current_; // == dummy_versions_.prev_ + // version snapshot impl + std::map > version_snapshot_prepare_; + std::map > version_snapshot_; + // Per-level key at which the next compaction at that level should start. // Either an empty string, or a valid InternalKey. std::string compact_pointer_[config::kNumLevels]; diff --git a/src/leveldb/include/leveldb/db.h b/src/leveldb/include/leveldb/db.h index 1d235801a..3374aada9 100644 --- a/src/leveldb/include/leveldb/db.h +++ b/src/leveldb/include/leveldb/db.h @@ -35,6 +35,12 @@ struct Range { Range(const Slice& s, const Slice& l) : start(s), limit(l) { } }; +struct Snapshot { + Slice name; + int64_t timestamp; + bool dump_mem_on_snapshot; +}; + // A DB is a persistent ordered map from keys to values. // A DB is safe for concurrent access from multiple threads without // any external synchronization. @@ -102,6 +108,22 @@ class DB { // The returned iterator should be deleted before this db is deleted. virtual Iterator* NewIterator(const ReadOptions& options) = 0; + // Database support snapshot + virtual Status ShowSanpshot(std::vector* snapshot_list = NULL) = 0; + virtual Status ShowAllSanpshot(std::vector* snapshot_list = NULL) = 0; + + virtual Status PrepareCheckoutSanpshot(Snapshot* dest_snapshot, Snapshot* src_snapshot) = 0; + virtual Status CommitCheckoutSanpshot(Snapshot* dest_snapshot, Snapshot* src_snapshot) = 0; + virtual Status RollbackCheckoutSanpshot(Snapshot* dest_snapshot, Snapshot* src_snapshot) = 0; + + virtual Status PrepareCreateSanpshot(Snapshot* parent_snapshot, Snapshot* snapshot) = 0; + virtual Status CommitCreateSanpshot(Snapshot* snapshot) = 0; + virtual Status RollbackCreateSanpshot(Snapshot* snapshot) = 0; + + virtual Status PrepareDeleteSanpshot(Snapshot* snapshot) = 0; + virtual Status CommitDeleteSanpshot(Snapshot* snapshot) = 0; + virtual Status RollbackDeleteSanpshot(Snapshot* snapshot) = 0; + // Return a handle to the current DB state. Iterators created with // this handle will all observe a stable snapshot of the current DB // state. The caller must call ReleaseSnapshot(result) when the From a1a2e59708851870c1dc6025f2a6f3062270839c Mon Sep 17 00:00:00 2001 From: caijieming Date: Wed, 14 Dec 2016 16:34:33 +0800 Subject: [PATCH 2/2] issue=1114: LogAndApply multi-thread safe --- src/leveldb/db/db_impl.cc | 2 +- src/leveldb/db/version_set.cc | 27 ++++++++++++++++++++++++++- src/leveldb/db/version_set.h | 3 +++ 3 files changed, 30 insertions(+), 2 deletions(-) diff --git a/src/leveldb/db/db_impl.cc b/src/leveldb/db/db_impl.cc index 6ea2405ef..cb02de967 100644 --- a/src/leveldb/db/db_impl.cc +++ b/src/leveldb/db/db_impl.cc @@ -710,7 +710,7 @@ Status DBImpl::PrepareCreateSanpshot(Snapshot* snapshot) { if (!snapshot->dump_mem_on_snapshot) { VersionEdit edit; edit.SetPrepareCreateSnapshot(snapshot->name, snapshot->timestamp); - s = versions_->LogAndApply(&edit, &mutex_); + s = versions_->LogAndApply(&edit, &mutex_); // multi-thread safe return s; } diff --git a/src/leveldb/db/version_set.cc b/src/leveldb/db/version_set.cc index 871ed16e1..be04413d3 100644 --- a/src/leveldb/db/version_set.cc +++ b/src/leveldb/db/version_set.cc @@ -1051,7 +1051,16 @@ void VersionSet::InstallVersionSnapshot(Version* v, VersionEdit* edit) { } } -// dump version and snapshot +// multi thread safe +// Information kept for every waiting manifest writer +struct VersionSet::ManifestWriter { + Status status; + VersionEdit* edit; + bool done; + port::CondVar cv; + + explicit ManifestWriter(port::Mutex* mu) : done(false), cv(mu) { } +}; Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { if (edit->has_log_number_) { assert(edit->log_number_ >= log_number_); @@ -1074,6 +1083,17 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { edit->SetLastSequence(last_sequence_); } + mu->AssertHeld(); + // multi write control, do not batch edit write, but multi thread safety + ManifestWriter w(mu); + w.edit = edit; + manifest_writers_.push_back(&w); + while (!w.done && &w != manifest_writers_.front()) { + w.cv.Wait(); + } + assert(manifest_writers_.front() == &w); + + // first manifest writer, batch edit Version* v = new Version(this); { VersionSetBuilder builder(this, current_); @@ -1189,6 +1209,11 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { Log(options_->info_log, "[%s][dfs error] set force_switch_manifest", dbname_.c_str()); } + // TODO: batch write manifest finish, no need + manifest_writers_.pop_front(); + if (!manifest_writers_.empty()) { + manifest_writers_.front()->cv.Signal(); + } return s; } diff --git a/src/leveldb/db/version_set.h b/src/leveldb/db/version_set.h index 1f4766867..a7a01d9ff 100644 --- a/src/leveldb/db/version_set.h +++ b/src/leveldb/db/version_set.h @@ -279,6 +279,7 @@ class VersionSet { friend class Compaction; friend class Version; friend class VersionSetBuilder; + struct ManifestWriter; void Finalize(Version* v); @@ -326,6 +327,8 @@ class VersionSet { uint64_t log_number_; uint64_t prev_log_number_; // 0 or backing store for memtable being compacted + std::deque manifest_writers_; + // Opened lazily WritableFile* descriptor_file_; log::Writer* descriptor_log_;