Skip to content

Commit

Permalink
issue=1114: leveldb support version snapshot
Browse files Browse the repository at this point in the history
step 1: Prototype implementation
  • Loading branch information
caijieming committed Dec 14, 2016
1 parent fc42786 commit 3bc5f86
Show file tree
Hide file tree
Showing 7 changed files with 534 additions and 18 deletions.
70 changes: 68 additions & 2 deletions src/leveldb/db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,8 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
return s;
}

Status DBImpl::CompactMemTable() {
// dump memtable and snapshot
Status DBImpl::CompactMemTable(Snapshot* snapshot) {
mutex_.AssertHeld();
assert(imm_ != NULL);

Expand All @@ -601,6 +602,9 @@ Status DBImpl::CompactMemTable() {
if (imm_->GetLastSequence()) {
edit.SetLastSequence(imm_->GetLastSequence());
}
if (snapshot) {
edit.SetPrepareCreateSnapshot(snapshot->name, snapshot->timestamp);
}
Log(options_.info_log, "[%s] CompactMemTable SetLastSequence %lu",
dbname_.c_str(), edit.GetLastSequence());
s = versions_->LogAndApply(&edit, &mutex_);
Expand Down Expand Up @@ -689,6 +693,69 @@ Status DBImpl::TEST_CompactMemTable() {
return s;
}

// impl snapshot
Status DBImpl::ShowSanpshot(std::vector<Snapshot*>* snapshot_list = NULL);
Status DBImpl::ShowAllSanpshot(std::vector<Snapshot*>* snapshot_list = NULL);

Status DBImpl::PrepareCheckoutSanpshot(Snapshot* dest_snapshot, Snapshot* src_snapshot);
Status DBImpl::CommitCheckoutSanpshot(Snapshot* dest_snapshot, Snapshot* src_snapshot);
Status DBImpl::RollbackCheckoutSanpshot(Snapshot* dest_snapshot, Snapshot* src_snapshot);

Status DBImpl::PrepareCreateSanpshot(Snapshot* snapshot) {
assert(snapshot);
MutexLock l(&mutex_);

Status s;
Log(options_.info_log, "[%s] prepare create snapshot, dump mem table", dbname_.c_str());
if (!snapshot->dump_mem_on_snapshot) {
VersionEdit edit;
edit.SetPrepareCreateSnapshot(snapshot->name, snapshot->timestamp);
s = versions_->LogAndApply(&edit, &mutex_);
return s;
}

if (imm_ != NULL) {
s = CompactMemTable();
}
if (s.ok()) {
assert(imm_ == NULL);
while (is_writting_mem_) {
writting_mem_cv_.Wait();
}
imm_ = mem_;
has_imm_.Release_Store(imm_);
mem_ = NewMemTable();
mem_->Ref();
bound_log_size_ = 0;
s = CompactMemTable(snapshot);
}
return s;
}
Status DBImpl::CommitCreateSanpshot(Snapshot* snapshot) {
assert(snapshot);
MutexLock l(&mutex_);
Status s;

VersionEdit edit;
edit.SetCommitCreateSnapshot(snapshot->name, snapshot->timestamp);
s = versions_->LogAndApply(&edit, &mutex_);
return s;
}
Status DBImpl::RollbackCreateSanpshot(Snapshot* snapshot) {
assert(snapshot);
MutexLock l(&mutex_);
Status s;

VersionEdit edit;
edit.SetRollbackCreateSnapshot(snapshot->name, snapshot->timestamp);
s = versions_->LogAndApply(&edit, &mutex_);
return s;
}

Status DBImpl::PrepareDeleteSanpshot(Snapshot* snapshot);
Status DBImpl::CommitDeleteSanpshot(Snapshot* snapshot);
Status DBImpl::RollbackDeleteSanpshot(Snapshot* snapshot);

// tera-specific

bool DBImpl::FindSplitKey(double ratio, std::string* split_key) {
Expand Down Expand Up @@ -1483,7 +1550,6 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
return w.status;
}


// May temporarily unlock and wait.
Status status = MakeRoomForWrite(my_batch == NULL);

Expand Down
18 changes: 17 additions & 1 deletion src/leveldb/db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,22 @@ class DBImpl : public DB {

void AddBoundLogSize(uint64_t size);

// impl snapshot
virtual Status ShowSanpshot(std::vector<Snapshot*>* snapshot_list = NULL);
virtual Status ShowAllSanpshot(std::vector<Snapshot*>* snapshot_list = NULL);

virtual Status PrepareCheckoutSanpshot(Snapshot* dest_snapshot, Snapshot* src_snapshot);
virtual Status CommitCheckoutSanpshot(Snapshot* dest_snapshot, Snapshot* src_snapshot);
virtual Status RollbackCheckoutSanpshot(Snapshot* dest_snapshot, Snapshot* src_snapshot);

virtual Status PrepareCreateSanpshot(Snapshot* parent_snapshot, Snapshot* snapshot);
virtual Status CommitCreateSanpshot(Snapshot* snapshot);
virtual Status RollbackCreateSanpshot(Snapshot* snapshot);

virtual Status PrepareDeleteSanpshot(Snapshot* snapshot);
virtual Status CommitDeleteSanpshot(Snapshot* snapshot);
virtual Status RollbackDeleteSanpshot(Snapshot* snapshot);

// tera-specific
virtual bool BusyWrite();
virtual void Workload(double* write_workload);
Expand Down Expand Up @@ -110,7 +126,7 @@ class DBImpl : public DB {

// Compact the in-memory write buffer to disk. Switches to a new
// log-file/memtable and writes a new descriptor iff successful.
Status CompactMemTable()
Status CompactMemTable(Snapshot* snapshot = NULL)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);

Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base)
Expand Down
92 changes: 91 additions & 1 deletion src/leveldb/db/version_edit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@ enum Tag {
kPrevLogNumber = 9,
kNewFile = 10,
kDeletedFile = 11,
// no more than 4096

kPrepareCreateSnapshot = 12,
kCommitCreateSnapshot = 13,
kRollbackCreateSnapshot = 14,
kBaseSnapshot = 15,

// no more than 100w
kMaxTag = 1 << 20,
};

Expand All @@ -43,10 +49,28 @@ void VersionEdit::Clear() {
has_prev_log_number_ = false;
has_next_file_number_ = false;
has_last_sequence_ = false;

has_prepare_create_snapshot_ = false;
has_commit_create_snapshot_ = false;
has_rollback_create_snapshot_ = false;
has_base_snapshot_ = false;

deleted_files_.clear();
new_files_.clear();
}
void VersionEdit::ClearFile() {
new_files_.clear();
deleted_files_.clear();
}

bool VersionEdit::HasCompactPointer(int level) {
for (size_t i = 0; i < compact_pointers_.size(); i++) {
if (compact_pointers_[i].first == level) {
return true;
}
}
return false;
}
void VersionEdit::EncodeTo(std::string* dst) const {
if (has_comparator_) {
std::string str;
Expand Down Expand Up @@ -136,6 +160,44 @@ void VersionEdit::EncodeTo(std::string* dst) const {
PutVarint32(dst, kNewFile);
dst->append(str.data(), str.size());
}

// use 2pc impl snapshot create
if (has_prepare_create_snapshot_) {
std::string str;
PutLengthPrefixedSlice(&str, prepare_create_snapshot_.name);
PutVarint64(&str, prepare_create_snapshot_.timestamp);

PutVarint32(dst, str.size() + kMaxTag);
PutVarint32(dst, kPrepareCreateSnapshot);
dst->append(str.data(), str.size());
}
if (has_commit_create_snapshot_) {
std::string str;
PutLengthPrefixedSlice(&str, commit_create_snapshot_.name);
PutVarint64(&str, commit_create_snapshot_.timestamp);

PutVarint32(dst, str.size() + kMaxTag);
PutVarint32(dst, kCommitCreateSnapshot);
dst->append(str.data(), str.size());
} else if (has_rollback_create_snapshot_) {
std::string str;
PutLengthPrefixedSlice(&str, rollback_create_snapshot_.name);
PutVarint64(&str, rollback_create_snapshot_.timestamp);

PutVarint32(dst, str.size() + kMaxTag);
PutVarint32(dst, kRollbackCreateSnapshot);
dst->append(str.data(), str.size());
}

if (has_base_snapshot_) {
std::string str;
PutLengthPrefixedSlice(&str, base_snapshot_.name);
PutVarint64(&str, base_snapshot_.timestamp);

PutVarint32(dst, str.size() + kMaxTag);
PutVarint32(dst, kBaseSnapshot);
dst->append(str.data(), str.size());
}
}

static bool GetInternalKey(Slice* input, InternalKey* dst) {
Expand Down Expand Up @@ -291,6 +353,34 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
}
break;

case kPrepareCreateSnapshot:
has_prepare_create_snapshot_ = true;
GetLengthPrefixedSlice(&input, &str);
prepare_create_snapshot_.name = str.ToString();
GetVarint64(&input, &prepare_create_snapshot_.timestamp);
break;

case kCommitCreateSnapshot:
has_commit_create_snapshot_ = true;
GetLengthPrefixedSlice(&input, &str);
commit_create_snapshot_.name = str.ToString();
GetVarint64(&input, &commit_create_snapshot_.timestamp);
break;

case kRollbackCreateSnapshot:
has_rollback_create_snapshot_ = true;
GetLengthPrefixedSlice(&input, &str);
rollback_create_snapshot_.name = str.ToString();
GetVarint64(&input, &rollback_create_snapshot_.timestamp);
break;

case kBaseSnapshot:
has_base_snapshot_ = true;
GetLengthPrefixedSlice(&input, &str);
base_snapshot_.name = str.ToString();
GetVarint64(&input, &base_snapshot_.timestamp);
break;

default: // tag not know, skip it.
input.remove_prefix(len);
fprintf(stderr, "VersionEdit, skip unknow tag %d, len %d\n", tag, len);
Expand Down
66 changes: 66 additions & 0 deletions src/leveldb/db/version_edit.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,39 @@ class VersionEdit {
~VersionEdit() { }

void Clear();
void ClearFile();

void SetBaseSnapshot() {
if (has_base_snapshot_) {
return;
}
assert(has_prepare_create_snapshot_);
has_prepare_create_snapshot_ = false;
has_base_snapshot_ = true;
base_snapshot_.name = prepare_create_snapshot_.name;
base_snapshot_.timestamp = prepare_create_snapshot_.timestamp;
}
void SetBaseSnapshot(const Slice& name, int64_t timestamp) {
has_prepare_create_snapshot_ = false;
has_base_snapshot_ = true;
base_snapshot_.name = name.ToString();
base_snapshot_.timestamp = timestamp;
}
void SetPrepareCreateSnapshot(const Slice& name, int64_t timestamp) {
has_prepare_create_snapshot_ = true;
prepare_create_snapshot_.name = name.ToString();
prepare_create_snapshot_.timestamp = timestamp;
}
void SetCommitCreateSnapshot(const Slice& name, int64_t timestamp) {
has_commit_create_snapshot_ = true;
commit_create_snapshot_.name = name.ToString();
commit_create_snapshot_.timestamp = timestamp;
}
void SetRollbackCreateSnapshot(const Slice& name, int64_t timestamp) {
has_rollback_create_snapshot_ = true;
rollback_create_snapshot_.name = name.ToString();
rollback_create_snapshot_.timestamp = timestamp;
}
void SetComparatorName(const Slice& name) {
has_comparator_ = true;
comparator_ = name.ToString();
Expand Down Expand Up @@ -77,6 +109,9 @@ class VersionEdit {
uint64_t GetLogNumber() const {
return log_number_;
}
uint64_t GetPrevLogNumber() const {
return prev_log_number_;
}
uint64_t GetNextFileNumber() const {
return next_file_number_;
}
Expand All @@ -93,9 +128,13 @@ class VersionEdit {
bool HasLogNumber() const {
return has_log_number_;
}
bool HasPrevLogNumber() const {
return has_prev_log_number_;
}
bool HasComparator() const {
return has_comparator_;
}
bool HasCompactPointer(int level);
bool HasFiles(std::vector<uint64_t>* deleted_files,
std::vector<uint64_t>* added_files) {
bool has_files = deleted_files_.size() > 0
Expand All @@ -115,6 +154,18 @@ class VersionEdit {
// }
return has_files;
}
bool HasPrepareCreateSnapshot() {
return has_prepare_create_snapshot_;
}
bool HasCommitCreateSnapshot() {
return has_commit_create_snapshot_;
}
bool HasRollbackCreateSnapshot() {
return has_rollback_create_snapshot_;
}
bool HasBaseSnapshot() {
return has_base_snapshot_;
}

void ModifyForMerge(std::map<uint64_t, uint64_t> num_map) {
//if (num_map.size() == 0) {
Expand Down Expand Up @@ -192,6 +243,21 @@ class VersionEdit {
bool has_next_file_number_;
bool has_last_sequence_;

// version snapshot impl
bool has_prepare_create_snapshot_;
bool has_commit_create_snapshot_;
bool has_rollback_create_snapshot_;
bool has_base_snapshot_;

struct SnapshotEdit {
std::string name;
int64_t timestamp;
};
SnapshotEdit prepare_create_snapshot_;
SnapshotEdit commit_create_snapshot_;
SnapshotEdit rollback_create_snapshot_;
SnapshotEdit base_snapshot_;

std::vector< std::pair<int, InternalKey> > compact_pointers_;

// Files in Version could be deleted by file number or file meta.
Expand Down
Loading

0 comments on commit 3bc5f86

Please sign in to comment.