Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: counter cannot compact in mem leveldb #1283

Merged
merged 7 commits into from
Jun 7, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ INCPATH += -I./src -I./include -I./src/leveldb/include -I./src/leveldb \
CFLAGS += $(OPT) $(INCPATH) -fPIC -fvisibility=hidden # hide internal symbol of tera
CXXFLAGS += -std=gnu++11 $(CFLAGS)
LDFLAGS += -rdynamic $(DEPS_LDPATH) $(DEPS_LDFLAGS) -lpthread -lrt -lz -ldl \
-lreadline -lncurses
-lreadline -lncurses -fPIC
SO_LDFLAGS += -rdynamic $(DEPS_LDPATH) $(SO_DEPS_LDFLAGS) -lpthread -lrt -lz -ldl \
-shared -Wl,--version-script,so-version-script # hide symbol of thirdparty libs
-shared -fPIC -Wl,--version-script,so-version-script # hide symbol of thirdparty libs

PROTO_FILES := $(wildcard src/proto/*.proto)
PROTO_OUT_CC := $(PROTO_FILES:.proto=.pb.cc)
Expand Down
9 changes: 8 additions & 1 deletion src/io/default_compact_strategy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ namespace io {
DefaultCompactStrategy::DefaultCompactStrategy(const TableSchema& schema)
: schema_(schema),
raw_key_operator_(GetRawKeyOperatorFromSchema(schema_)),
cmp_(NewRowKeyComparator(raw_key_operator_)),
last_ts_(-1), last_type_(leveldb::TKT_FORSEEK), cur_type_(leveldb::TKT_FORSEEK),
del_row_ts_(-1), del_col_ts_(-1), del_qual_ts_(-1), cur_ts_(-1),
del_row_seq_(0), del_col_seq_(0), del_qual_seq_(0), version_num_(0),
Expand All @@ -26,7 +27,13 @@ DefaultCompactStrategy::DefaultCompactStrategy(const TableSchema& schema)
VLOG(11) << "DefaultCompactStrategy construct";
}

DefaultCompactStrategy::~DefaultCompactStrategy() {}
DefaultCompactStrategy::~DefaultCompactStrategy() {
delete cmp_;
}

const leveldb::Comparator* DefaultCompactStrategy::RowKeyComparator() {
return cmp_;
}

const char* DefaultCompactStrategy::Name() const {
return "tera.DefaultCompactStrategy";
Expand Down
4 changes: 4 additions & 0 deletions src/io/default_compact_strategy.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#define TERA_IO_DEFAULT_COMPACT_STRATEGY_H_

#include "leveldb/compact_strategy.h"
#include "leveldb/comparator.h"
#include "leveldb/slice.h"

#include "common/mutex.h"
Expand All @@ -25,6 +26,8 @@ class DefaultCompactStrategy : public leveldb::CompactStrategy {
virtual bool Drop(const Slice& k, uint64_t n,
const std::string& lower_bound);

virtual const leveldb::Comparator* RowKeyComparator();

// tera-specific, based on all-level iterators.
// used in LowLevelScan
virtual bool ScanDrop(const Slice& k, uint64_t n);
Expand Down Expand Up @@ -58,6 +61,7 @@ class DefaultCompactStrategy : public leveldb::CompactStrategy {
std::map<std::string, int32_t> cf_indexs_;
TableSchema schema_;
const leveldb::RawKeyOperator* raw_key_operator_;
leveldb::Comparator* cmp_;

std::string last_key_;
std::string last_col_;
Expand Down
6 changes: 6 additions & 0 deletions src/io/ttlkv_compact_strategy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,17 @@ using namespace leveldb;
KvCompactStrategy::KvCompactStrategy(const TableSchema& schema)
: schema_(schema),
raw_key_operator_(GetRawKeyOperatorFromSchema(schema_)),
cmp_(NewRowKeyComparator(raw_key_operator_)),
snapshot_(leveldb::kMaxSequenceNumber) {
VLOG(11) << "KvCompactStrategy construct";
}

KvCompactStrategy::~KvCompactStrategy() {
delete cmp_;
}

const leveldb::Comparator* KvCompactStrategy::RowKeyComparator() {
return cmp_;
}

const char* KvCompactStrategy::Name() const {
Expand Down
3 changes: 3 additions & 0 deletions src/io/ttlkv_compact_strategy.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include "common/mutex.h"
#include "leveldb/compact_strategy.h"
#include "leveldb/comparator.h"
#include "leveldb/raw_key_operator.h"
#include "leveldb/slice.h"
#include "proto/table_schema.pb.h"
Expand All @@ -19,6 +20,7 @@ class KvCompactStrategy : public leveldb::CompactStrategy {
KvCompactStrategy(const TableSchema& schema);
virtual ~KvCompactStrategy();

virtual const leveldb::Comparator* RowKeyComparator();
virtual bool CheckTag(const leveldb::Slice& tera_key, bool* del_tag, int64_t* ttl_tag);
virtual bool Drop(const leveldb::Slice& k, uint64_t n,
const std::string& lower_bound);
Expand All @@ -40,6 +42,7 @@ class KvCompactStrategy : public leveldb::CompactStrategy {
private:
TableSchema schema_;
const leveldb::RawKeyOperator* raw_key_operator_;
leveldb::Comparator* cmp_;
uint64_t snapshot_;
};

Expand Down
4 changes: 2 additions & 2 deletions src/leveldb/db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
if (s.ok() && meta.file_size > 0) {
const Slice min_user_key = meta.smallest.user_key();
const Slice max_user_key = meta.largest.user_key();
if (base != NULL) {
if (base != NULL && options_.drop_base_level_del_in_compaction) {
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
}
edit->AddFile(level, meta);
Expand Down Expand Up @@ -1906,7 +1906,7 @@ MemTable* DBImpl::NewMemTable() const {
} else {
Logger* info_log = NULL;
// Logger* info_log = options_.info_log;
MemTableOnLevelDB* new_mem = new MemTableOnLevelDB(internal_comparator_,
MemTableOnLevelDB* new_mem = new MemTableOnLevelDB(dbname_, internal_comparator_,
options_.compact_strategy_factory,
options_.memtable_ldb_write_buffer_size,
options_.memtable_ldb_block_size,
Expand Down
5 changes: 3 additions & 2 deletions src/leveldb/db/memtable_on_leveldb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@

namespace leveldb {

MemTableOnLevelDB::MemTableOnLevelDB(const InternalKeyComparator& comparator,
MemTableOnLevelDB::MemTableOnLevelDB(const std::string& dbname,
const InternalKeyComparator& comparator,
CompactStrategyFactory* compact_strategy_factory,
size_t write_buffer_size,
size_t block_size,
Logger* info_log)
: MemTable(comparator, compact_strategy_factory) {
char memdb_name[1024] = { '\0' };
snprintf(memdb_name, sizeof(memdb_name), "/%d/%llu", getpid(),
snprintf(memdb_name, sizeof(memdb_name), "/%d/%s/%llu", getpid(), dbname.c_str(),
(unsigned long long)this);
leveldb::Options opts;
opts.env = memenv_ = leveldb::NewMemEnv(GetBaseEnv());
Expand Down
3 changes: 2 additions & 1 deletion src/leveldb/db/memtable_on_leveldb.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ class MemTableOnLevelDB : public MemTable{

public:

MemTableOnLevelDB (const InternalKeyComparator& comparator,
MemTableOnLevelDB (const std::string& dbname,
const InternalKeyComparator& comparator,
CompactStrategyFactory* compact_strategy_factory,
size_t write_buffer_size,
size_t block_size,
Expand Down
14 changes: 13 additions & 1 deletion src/leveldb/db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,15 @@ void Version::GetOverlappingInputs(
if (end != NULL) {
user_end = end->user_key();
}
const Comparator* user_cmp = vset_->icmp_.user_comparator();
const Comparator* user_cmp = NULL;
CompactStrategy* strategy = NULL;
if (!vset_->options_->drop_base_level_del_in_compaction) { // use row key comparator
strategy = vset_->options_->compact_strategy_factory->NewInstance();
user_cmp = strategy->RowKeyComparator();
}
if (user_cmp == NULL) {
user_cmp = vset_->icmp_.user_comparator();
}
for (size_t i = 0; i < files_[level].size(); ) {
FileMetaData* f = files_[level][i++];
const Slice file_start = f->smallest.user_key();
Expand All @@ -542,6 +550,10 @@ void Version::GetOverlappingInputs(
}
}
}
if (strategy != NULL) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(可以不用判断NULL,直接delete;加上判断也行,随你。)

delete strategy;
}
return;
}

void Version::GetApproximateSizes(uint64_t* size, uint64_t* size_under_level1) {
Expand Down
5 changes: 5 additions & 0 deletions src/leveldb/include/leveldb/compact_strategy.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <stdint.h>
#include <string>
#include "leveldb/iterator.h"
#include "leveldb/comparator.h"

namespace leveldb {

Expand All @@ -22,6 +23,8 @@ class CompactStrategy {
public:
virtual ~CompactStrategy() {}

virtual const Comparator* RowKeyComparator() = 0;

virtual bool Drop(const Slice& k, uint64_t n,
const std::string& lower_bound = "") = 0;

Expand Down Expand Up @@ -49,6 +52,8 @@ class DummyCompactStrategy : public CompactStrategy {
public:
virtual ~DummyCompactStrategy() {}

virtual const Comparator* RowKeyComparator() { return NULL;}

virtual bool Drop(const Slice& k, uint64_t n, const std::string& lower_bound) {
return false;
}
Expand Down
3 changes: 3 additions & 0 deletions src/leveldb/include/leveldb/comparator.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#define STORAGE_LEVELDB_INCLUDE_COMPARATOR_H_

#include <string>
#include "leveldb/raw_key_operator.h"

namespace leveldb {

Expand Down Expand Up @@ -66,6 +67,8 @@ extern const Comparator* BytewiseComparator();
// ordering.
extern const Comparator* TeraBinaryComparator();

extern Comparator* NewRowKeyComparator(const RawKeyOperator* key_operator);

// Return a comparator that compare TTL-Key with row_key only.
const Comparator* TeraTTLKvComparator();
} // namespace leveldb
Expand Down
44 changes: 44 additions & 0 deletions src/leveldb/util/comparator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,46 @@ namespace leveldb {
Comparator::~Comparator() { }

namespace {
class RowKeyComparator : public Comparator {
public:
RowKeyComparator(const RawKeyOperator* key_operator)
: key_operator_(key_operator) {}

virtual const char* Name() const {
return "leveldb.RowKeyComparator";
}

virtual int Compare(const Slice& a, const Slice& b) const {
Slice a_key, a_col, a_qual;
Slice b_key, b_col, b_qual;
int64_t a_ts = -1;
int64_t b_ts = -1;
leveldb::TeraKeyType a_type;
leveldb::TeraKeyType b_type;

if (!key_operator_->ExtractTeraKey(a, &a_key, &a_col, &a_qual, &a_ts, &a_type)) {
return key_operator_->Compare(a, b);
}
if (!key_operator_->ExtractTeraKey(b, &b_key, &b_col, &b_qual, &b_ts, &b_type)) {
return key_operator_->Compare(a, b);
}
return a_key.compare(b_key);
}

virtual void FindShortestSeparator(
std::string* start,
const Slice& limit) const {
return;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

直接 {} 就行了,可以不用再写return;
和现有代码的风格保持一致

}

virtual void FindShortSuccessor(std::string* key) const {
return;
}

private:
const RawKeyOperator* key_operator_;
};

class BytewiseComparatorImpl : public Comparator {
public:
BytewiseComparatorImpl() { }
Expand Down Expand Up @@ -156,4 +196,8 @@ const Comparator* TeraTTLKvComparator() {
return terakv;
}

Comparator* NewRowKeyComparator(const RawKeyOperator* key_operator) {
Comparator* cmp = new RowKeyComparator(key_operator);
return cmp;
}
} // namespace leveldb