Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue=1114: leveldb support version snapshot #1115

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 68 additions & 2 deletions src/leveldb/db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,8 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
return s;
}

Status DBImpl::CompactMemTable() {
// dump memtable and snapshot
Status DBImpl::CompactMemTable(Snapshot* snapshot) {
mutex_.AssertHeld();
assert(imm_ != NULL);

Expand All @@ -601,6 +602,9 @@ Status DBImpl::CompactMemTable() {
if (imm_->GetLastSequence()) {
edit.SetLastSequence(imm_->GetLastSequence());
}
if (snapshot) {
edit.SetPrepareCreateSnapshot(snapshot->name, snapshot->timestamp);
}
Log(options_.info_log, "[%s] CompactMemTable SetLastSequence %lu",
dbname_.c_str(), edit.GetLastSequence());
s = versions_->LogAndApply(&edit, &mutex_);
Expand Down Expand Up @@ -689,6 +693,69 @@ Status DBImpl::TEST_CompactMemTable() {
return s;
}

// impl snapshot
Status DBImpl::ShowSanpshot(std::vector<Snapshot*>* snapshot_list = NULL);
Status DBImpl::ShowAllSanpshot(std::vector<Snapshot*>* snapshot_list = NULL);

Status DBImpl::PrepareCheckoutSanpshot(Snapshot* dest_snapshot, Snapshot* src_snapshot);
Status DBImpl::CommitCheckoutSanpshot(Snapshot* dest_snapshot, Snapshot* src_snapshot);
Status DBImpl::RollbackCheckoutSanpshot(Snapshot* dest_snapshot, Snapshot* src_snapshot);

Status DBImpl::PrepareCreateSanpshot(Snapshot* snapshot) {
assert(snapshot);
MutexLock l(&mutex_);

Status s;
Log(options_.info_log, "[%s] prepare create snapshot, dump mem table", dbname_.c_str());
if (!snapshot->dump_mem_on_snapshot) {
VersionEdit edit;
edit.SetPrepareCreateSnapshot(snapshot->name, snapshot->timestamp);
s = versions_->LogAndApply(&edit, &mutex_); // multi-thread safe
return s;
}

if (imm_ != NULL) {
s = CompactMemTable();
}
if (s.ok()) {
assert(imm_ == NULL);
while (is_writting_mem_) {
writting_mem_cv_.Wait();
}
imm_ = mem_;
has_imm_.Release_Store(imm_);
mem_ = NewMemTable();
mem_->Ref();
bound_log_size_ = 0;
s = CompactMemTable(snapshot);
}
return s;
}
Status DBImpl::CommitCreateSanpshot(Snapshot* snapshot) {
assert(snapshot);
MutexLock l(&mutex_);
Status s;

VersionEdit edit;
edit.SetCommitCreateSnapshot(snapshot->name, snapshot->timestamp);
s = versions_->LogAndApply(&edit, &mutex_);
return s;
}
Status DBImpl::RollbackCreateSanpshot(Snapshot* snapshot) {
assert(snapshot);
MutexLock l(&mutex_);
Status s;

VersionEdit edit;
edit.SetRollbackCreateSnapshot(snapshot->name, snapshot->timestamp);
s = versions_->LogAndApply(&edit, &mutex_);
return s;
}

Status DBImpl::PrepareDeleteSanpshot(Snapshot* snapshot);
Status DBImpl::CommitDeleteSanpshot(Snapshot* snapshot);
Status DBImpl::RollbackDeleteSanpshot(Snapshot* snapshot);

// tera-specific

bool DBImpl::FindSplitKey(double ratio, std::string* split_key) {
Expand Down Expand Up @@ -1483,7 +1550,6 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
return w.status;
}


// May temporarily unlock and wait.
Status status = MakeRoomForWrite(my_batch == NULL);

Expand Down
18 changes: 17 additions & 1 deletion src/leveldb/db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,22 @@ class DBImpl : public DB {

void AddBoundLogSize(uint64_t size);

// impl snapshot
virtual Status ShowSanpshot(std::vector<Snapshot*>* snapshot_list = NULL);
virtual Status ShowAllSanpshot(std::vector<Snapshot*>* snapshot_list = NULL);

virtual Status PrepareCheckoutSanpshot(Snapshot* dest_snapshot, Snapshot* src_snapshot);
virtual Status CommitCheckoutSanpshot(Snapshot* dest_snapshot, Snapshot* src_snapshot);
virtual Status RollbackCheckoutSanpshot(Snapshot* dest_snapshot, Snapshot* src_snapshot);

virtual Status PrepareCreateSanpshot(Snapshot* parent_snapshot, Snapshot* snapshot);
virtual Status CommitCreateSanpshot(Snapshot* snapshot);
virtual Status RollbackCreateSanpshot(Snapshot* snapshot);

virtual Status PrepareDeleteSanpshot(Snapshot* snapshot);
virtual Status CommitDeleteSanpshot(Snapshot* snapshot);
virtual Status RollbackDeleteSanpshot(Snapshot* snapshot);

// tera-specific
virtual bool BusyWrite();
virtual void Workload(double* write_workload);
Expand Down Expand Up @@ -110,7 +126,7 @@ class DBImpl : public DB {

// Compact the in-memory write buffer to disk. Switches to a new
// log-file/memtable and writes a new descriptor iff successful.
Status CompactMemTable()
Status CompactMemTable(Snapshot* snapshot = NULL)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);

Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base)
Expand Down
92 changes: 91 additions & 1 deletion src/leveldb/db/version_edit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@ enum Tag {
kPrevLogNumber = 9,
kNewFile = 10,
kDeletedFile = 11,
// no more than 4096

kPrepareCreateSnapshot = 12,
kCommitCreateSnapshot = 13,
kRollbackCreateSnapshot = 14,
kBaseSnapshot = 15,

// no more than 100w
kMaxTag = 1 << 20,
};

Expand All @@ -43,10 +49,28 @@ void VersionEdit::Clear() {
has_prev_log_number_ = false;
has_next_file_number_ = false;
has_last_sequence_ = false;

has_prepare_create_snapshot_ = false;
has_commit_create_snapshot_ = false;
has_rollback_create_snapshot_ = false;
has_base_snapshot_ = false;

deleted_files_.clear();
new_files_.clear();
}
void VersionEdit::ClearFile() {
new_files_.clear();
deleted_files_.clear();
}

bool VersionEdit::HasCompactPointer(int level) {
for (size_t i = 0; i < compact_pointers_.size(); i++) {
if (compact_pointers_[i].first == level) {
return true;
}
}
return false;
}
void VersionEdit::EncodeTo(std::string* dst) const {
if (has_comparator_) {
std::string str;
Expand Down Expand Up @@ -136,6 +160,44 @@ void VersionEdit::EncodeTo(std::string* dst) const {
PutVarint32(dst, kNewFile);
dst->append(str.data(), str.size());
}

// use 2pc impl snapshot create
if (has_prepare_create_snapshot_) {
std::string str;
PutLengthPrefixedSlice(&str, prepare_create_snapshot_.name);
PutVarint64(&str, prepare_create_snapshot_.timestamp);

PutVarint32(dst, str.size() + kMaxTag);
PutVarint32(dst, kPrepareCreateSnapshot);
dst->append(str.data(), str.size());
}
if (has_commit_create_snapshot_) {
std::string str;
PutLengthPrefixedSlice(&str, commit_create_snapshot_.name);
PutVarint64(&str, commit_create_snapshot_.timestamp);

PutVarint32(dst, str.size() + kMaxTag);
PutVarint32(dst, kCommitCreateSnapshot);
dst->append(str.data(), str.size());
} else if (has_rollback_create_snapshot_) {
std::string str;
PutLengthPrefixedSlice(&str, rollback_create_snapshot_.name);
PutVarint64(&str, rollback_create_snapshot_.timestamp);

PutVarint32(dst, str.size() + kMaxTag);
PutVarint32(dst, kRollbackCreateSnapshot);
dst->append(str.data(), str.size());
}

if (has_base_snapshot_) {
std::string str;
PutLengthPrefixedSlice(&str, base_snapshot_.name);
PutVarint64(&str, base_snapshot_.timestamp);

PutVarint32(dst, str.size() + kMaxTag);
PutVarint32(dst, kBaseSnapshot);
dst->append(str.data(), str.size());
}
}

static bool GetInternalKey(Slice* input, InternalKey* dst) {
Expand Down Expand Up @@ -291,6 +353,34 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
}
break;

case kPrepareCreateSnapshot:
has_prepare_create_snapshot_ = true;
GetLengthPrefixedSlice(&input, &str);
prepare_create_snapshot_.name = str.ToString();
GetVarint64(&input, &prepare_create_snapshot_.timestamp);
break;

case kCommitCreateSnapshot:
has_commit_create_snapshot_ = true;
GetLengthPrefixedSlice(&input, &str);
commit_create_snapshot_.name = str.ToString();
GetVarint64(&input, &commit_create_snapshot_.timestamp);
break;

case kRollbackCreateSnapshot:
has_rollback_create_snapshot_ = true;
GetLengthPrefixedSlice(&input, &str);
rollback_create_snapshot_.name = str.ToString();
GetVarint64(&input, &rollback_create_snapshot_.timestamp);
break;

case kBaseSnapshot:
has_base_snapshot_ = true;
GetLengthPrefixedSlice(&input, &str);
base_snapshot_.name = str.ToString();
GetVarint64(&input, &base_snapshot_.timestamp);
break;

default: // tag not know, skip it.
input.remove_prefix(len);
fprintf(stderr, "VersionEdit, skip unknow tag %d, len %d\n", tag, len);
Expand Down
66 changes: 66 additions & 0 deletions src/leveldb/db/version_edit.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,39 @@ class VersionEdit {
~VersionEdit() { }

void Clear();
void ClearFile();

void SetBaseSnapshot() {
if (has_base_snapshot_) {
return;
}
assert(has_prepare_create_snapshot_);
has_prepare_create_snapshot_ = false;
has_base_snapshot_ = true;
base_snapshot_.name = prepare_create_snapshot_.name;
base_snapshot_.timestamp = prepare_create_snapshot_.timestamp;
}
void SetBaseSnapshot(const Slice& name, int64_t timestamp) {
has_prepare_create_snapshot_ = false;
has_base_snapshot_ = true;
base_snapshot_.name = name.ToString();
base_snapshot_.timestamp = timestamp;
}
void SetPrepareCreateSnapshot(const Slice& name, int64_t timestamp) {
has_prepare_create_snapshot_ = true;
prepare_create_snapshot_.name = name.ToString();
prepare_create_snapshot_.timestamp = timestamp;
}
void SetCommitCreateSnapshot(const Slice& name, int64_t timestamp) {
has_commit_create_snapshot_ = true;
commit_create_snapshot_.name = name.ToString();
commit_create_snapshot_.timestamp = timestamp;
}
void SetRollbackCreateSnapshot(const Slice& name, int64_t timestamp) {
has_rollback_create_snapshot_ = true;
rollback_create_snapshot_.name = name.ToString();
rollback_create_snapshot_.timestamp = timestamp;
}
void SetComparatorName(const Slice& name) {
has_comparator_ = true;
comparator_ = name.ToString();
Expand Down Expand Up @@ -77,6 +109,9 @@ class VersionEdit {
uint64_t GetLogNumber() const {
return log_number_;
}
uint64_t GetPrevLogNumber() const {
return prev_log_number_;
}
uint64_t GetNextFileNumber() const {
return next_file_number_;
}
Expand All @@ -93,9 +128,13 @@ class VersionEdit {
bool HasLogNumber() const {
return has_log_number_;
}
bool HasPrevLogNumber() const {
return has_prev_log_number_;
}
bool HasComparator() const {
return has_comparator_;
}
bool HasCompactPointer(int level);
bool HasFiles(std::vector<uint64_t>* deleted_files,
std::vector<uint64_t>* added_files) {
bool has_files = deleted_files_.size() > 0
Expand All @@ -115,6 +154,18 @@ class VersionEdit {
// }
return has_files;
}
bool HasPrepareCreateSnapshot() {
return has_prepare_create_snapshot_;
}
bool HasCommitCreateSnapshot() {
return has_commit_create_snapshot_;
}
bool HasRollbackCreateSnapshot() {
return has_rollback_create_snapshot_;
}
bool HasBaseSnapshot() {
return has_base_snapshot_;
}

void ModifyForMerge(std::map<uint64_t, uint64_t> num_map) {
//if (num_map.size() == 0) {
Expand Down Expand Up @@ -192,6 +243,21 @@ class VersionEdit {
bool has_next_file_number_;
bool has_last_sequence_;

// version snapshot impl
bool has_prepare_create_snapshot_;
bool has_commit_create_snapshot_;
bool has_rollback_create_snapshot_;
bool has_base_snapshot_;

struct SnapshotEdit {
std::string name;
int64_t timestamp;
};
SnapshotEdit prepare_create_snapshot_;
SnapshotEdit commit_create_snapshot_;
SnapshotEdit rollback_create_snapshot_;
SnapshotEdit base_snapshot_;

std::vector< std::pair<int, InternalKey> > compact_pointers_;

// Files in Version could be deleted by file number or file meta.
Expand Down
Loading