Skip to content

Commit

Permalink
bugfix: counter cannot compact in mem leveldb (#1283)
Browse files Browse the repository at this point in the history
cherry-pick from bb9be4f

bugfix:counter cannot compact in mem leveldb (
* add code review and merge code rule

* add code review and merge code rule

* add code review and merge code rule

* add code review and merge code rule

* bugfix: counter cannot compact in mem leveldb
  • Loading branch information
ajie committed Jun 23, 2017
1 parent 6475fe6 commit e2ea249
Show file tree
Hide file tree
Showing 12 changed files with 100 additions and 11 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ INCPATH += -I./src -I./include -I./src/leveldb/include -I./src/leveldb \
CFLAGS += $(OPT) $(INCPATH) -fPIC -fvisibility=hidden # hide internal symbol of tera
CXXFLAGS += $(CFLAGS)
LDFLAGS += -rdynamic $(DEPS_LDPATH) $(DEPS_LDFLAGS) -lpthread -lrt -lz -ldl \
-lreadline -lncurses
-lreadline -lncurses -fPIC
SO_LDFLAGS += -rdynamic $(DEPS_LDPATH) $(SO_DEPS_LDFLAGS) -lpthread -lrt -lz -ldl \
-shared -Wl,--version-script,so-version-script # hide symbol of thirdparty libs
-shared -fPIC -Wl,--version-script,so-version-script # hide symbol of thirdparty libs

PROTO_FILES := $(wildcard src/proto/*.proto)
PROTO_OUT_CC := $(PROTO_FILES:.proto=.pb.cc)
Expand Down
12 changes: 10 additions & 2 deletions src/io/default_compact_strategy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ namespace io {
DefaultCompactStrategy::DefaultCompactStrategy(const TableSchema& schema)
: schema_(schema),
raw_key_operator_(GetRawKeyOperatorFromSchema(schema_)),
last_ts_(-1), del_row_ts_(-1), del_col_ts_(-1), del_qual_ts_(-1), cur_ts_(-1),
cmp_(NewRowKeyComparator(raw_key_operator_)),
last_ts_(-1), last_type_(leveldb::TKT_FORSEEK), cur_type_(leveldb::TKT_FORSEEK),
del_row_ts_(-1), del_col_ts_(-1), del_qual_ts_(-1), cur_ts_(-1),
del_row_seq_(0), del_col_seq_(0), del_qual_seq_(0), version_num_(0),
snapshot_(leveldb::kMaxSequenceNumber) {
// build index
Expand All @@ -25,7 +27,13 @@ DefaultCompactStrategy::DefaultCompactStrategy(const TableSchema& schema)
VLOG(11) << "DefaultCompactStrategy construct";
}

DefaultCompactStrategy::~DefaultCompactStrategy() {}
DefaultCompactStrategy::~DefaultCompactStrategy() {
delete cmp_;
}

const leveldb::Comparator* DefaultCompactStrategy::RowKeyComparator() {
return cmp_;
}

const char* DefaultCompactStrategy::Name() const {
return "tera.DefaultCompactStrategy";
Expand Down
5 changes: 5 additions & 0 deletions src/io/default_compact_strategy.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#define TERA_IO_DEFAULT_COMPACT_STRATEGY_H_

#include "leveldb/compact_strategy.h"
#include "leveldb/comparator.h"
#include "leveldb/slice.h"

#include "common/mutex.h"
#include "io/io_utils.h"
Expand All @@ -24,6 +26,8 @@ class DefaultCompactStrategy : public leveldb::CompactStrategy {
virtual bool Drop(const Slice& k, uint64_t n,
const std::string& lower_bound);

virtual const leveldb::Comparator* RowKeyComparator();

// tera-specific, based on all-level iterators.
// used in LowLevelScan
virtual bool ScanDrop(const Slice& k, uint64_t n);
Expand Down Expand Up @@ -56,6 +60,7 @@ class DefaultCompactStrategy : public leveldb::CompactStrategy {
std::map<std::string, int32_t> cf_indexs_;
TableSchema schema_;
const leveldb::RawKeyOperator* raw_key_operator_;
leveldb::Comparator* cmp_;

std::string last_key_;
std::string last_col_;
Expand Down
9 changes: 8 additions & 1 deletion src/io/ttlkv_compact_strategy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,18 @@ using namespace leveldb;

KvCompactStrategy::KvCompactStrategy(const TableSchema& schema)
: schema_(schema),
raw_key_operator_(GetRawKeyOperatorFromSchema(schema_)) {
raw_key_operator_(GetRawKeyOperatorFromSchema(schema_)),
cmp_(NewRowKeyComparator(raw_key_operator_)),
snapshot_(leveldb::kMaxSequenceNumber) {
VLOG(11) << "KvCompactStrategy construct";
}

KvCompactStrategy::~KvCompactStrategy() {
delete cmp_;
}

const leveldb::Comparator* KvCompactStrategy::RowKeyComparator() {
return cmp_;
}

const char* KvCompactStrategy::Name() const {
Expand Down
3 changes: 3 additions & 0 deletions src/io/ttlkv_compact_strategy.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include "common/mutex.h"
#include "leveldb/compact_strategy.h"
#include "leveldb/comparator.h"
#include "leveldb/raw_key_operator.h"
#include "proto/table_schema.pb.h"

Expand All @@ -18,6 +19,7 @@ class KvCompactStrategy : public leveldb::CompactStrategy {
KvCompactStrategy(const TableSchema& schema);
virtual ~KvCompactStrategy();

virtual const leveldb::Comparator* RowKeyComparator();
virtual bool Drop(const leveldb::Slice& k, uint64_t n,
const std::string& lower_bound);

Expand All @@ -38,6 +40,7 @@ class KvCompactStrategy : public leveldb::CompactStrategy {
private:
TableSchema schema_;
const leveldb::RawKeyOperator* raw_key_operator_;
leveldb::Comparator* cmp_;
uint64_t snapshot_;
};

Expand Down
4 changes: 2 additions & 2 deletions src/leveldb/db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
if (s.ok() && meta.file_size > 0) {
const Slice min_user_key = meta.smallest.user_key();
const Slice max_user_key = meta.largest.user_key();
if (base != NULL) {
if (base != NULL && options_.drop_base_level_del_in_compaction) {
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
}
edit->AddFile(level, meta.number, meta.file_size, meta.smallest, meta.largest);
Expand Down Expand Up @@ -1922,7 +1922,7 @@ MemTable* DBImpl::NewMemTable() const {
} else {
Logger* info_log = NULL;
// Logger* info_log = options_.info_log;
MemTableOnLevelDB* new_mem = new MemTableOnLevelDB(internal_comparator_,
MemTableOnLevelDB* new_mem = new MemTableOnLevelDB(dbname_, internal_comparator_,
options_.compact_strategy_factory,
options_.memtable_ldb_write_buffer_size,
options_.memtable_ldb_block_size,
Expand Down
5 changes: 3 additions & 2 deletions src/leveldb/db/memtable_on_leveldb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@

namespace leveldb {

MemTableOnLevelDB::MemTableOnLevelDB(const InternalKeyComparator& comparator,
MemTableOnLevelDB::MemTableOnLevelDB(const std::string& dbname,
const InternalKeyComparator& comparator,
CompactStrategyFactory* compact_strategy_factory,
size_t write_buffer_size,
size_t block_size,
Logger* info_log)
: MemTable(comparator, compact_strategy_factory) {
char memdb_name[1024] = { '\0' };
snprintf(memdb_name, sizeof(memdb_name), "/%d/%llu", getpid(),
snprintf(memdb_name, sizeof(memdb_name), "/%d/%s/%llu", getpid(), dbname.c_str(),
(unsigned long long)this);
leveldb::Options opts;
opts.env = memenv_ = leveldb::NewMemEnv(GetBaseEnv());
Expand Down
3 changes: 2 additions & 1 deletion src/leveldb/db/memtable_on_leveldb.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ class MemTableOnLevelDB : public MemTable{

public:

MemTableOnLevelDB (const InternalKeyComparator& comparator,
MemTableOnLevelDB (const std::string& dbname,
const InternalKeyComparator& comparator,
CompactStrategyFactory* compact_strategy_factory,
size_t write_buffer_size,
size_t block_size,
Expand Down
14 changes: 13 additions & 1 deletion src/leveldb/db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,15 @@ void Version::GetOverlappingInputs(
if (end != NULL) {
user_end = end->user_key();
}
const Comparator* user_cmp = vset_->icmp_.user_comparator();
const Comparator* user_cmp = NULL;
CompactStrategy* strategy = NULL;
if (!vset_->options_->drop_base_level_del_in_compaction) { // use row key comparator
strategy = vset_->options_->compact_strategy_factory->NewInstance();
user_cmp = strategy->RowKeyComparator();
}
if (user_cmp == NULL) {
user_cmp = vset_->icmp_.user_comparator();
}
for (size_t i = 0; i < files_[level].size(); ) {
FileMetaData* f = files_[level][i++];
const Slice file_start = f->smallest.user_key();
Expand All @@ -526,6 +534,10 @@ void Version::GetOverlappingInputs(
}
}
}
if (strategy != NULL) {
delete strategy;
}
return;
}

void Version::GetApproximateSizes(uint64_t* size, uint64_t* size_under_level1) {
Expand Down
5 changes: 5 additions & 0 deletions src/leveldb/include/leveldb/compact_strategy.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <stdint.h>
#include <string>
#include "leveldb/iterator.h"
#include "leveldb/comparator.h"

namespace leveldb {

Expand All @@ -22,6 +23,8 @@ class CompactStrategy {
public:
virtual ~CompactStrategy() {}

virtual const Comparator* RowKeyComparator() = 0;

virtual bool Drop(const Slice& k, uint64_t n,
const std::string& lower_bound = "") = 0;

Expand All @@ -47,6 +50,8 @@ class DummyCompactStrategy : public CompactStrategy {
public:
virtual ~DummyCompactStrategy() {}

virtual const Comparator* RowKeyComparator() { return NULL;}

virtual bool Drop(const Slice& k, uint64_t n, const std::string& lower_bound) {
return false;
}
Expand Down
3 changes: 3 additions & 0 deletions src/leveldb/include/leveldb/comparator.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#define STORAGE_LEVELDB_INCLUDE_COMPARATOR_H_

#include <string>
#include "leveldb/raw_key_operator.h"

namespace leveldb {

Expand Down Expand Up @@ -66,6 +67,8 @@ extern const Comparator* BytewiseComparator();
// ordering.
extern const Comparator* TeraBinaryComparator();

extern Comparator* NewRowKeyComparator(const RawKeyOperator* key_operator);

// Return a comparator that compare TTL-Key with row_key only.
const Comparator* TeraTTLKvComparator();
} // namespace leveldb
Expand Down
44 changes: 44 additions & 0 deletions src/leveldb/util/comparator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,46 @@ namespace leveldb {
Comparator::~Comparator() { }

namespace {
class RowKeyComparator : public Comparator {
public:
RowKeyComparator(const RawKeyOperator* key_operator)
: key_operator_(key_operator) {}

virtual const char* Name() const {
return "leveldb.RowKeyComparator";
}

virtual int Compare(const Slice& a, const Slice& b) const {
Slice a_key, a_col, a_qual;
Slice b_key, b_col, b_qual;
int64_t a_ts = -1;
int64_t b_ts = -1;
leveldb::TeraKeyType a_type;
leveldb::TeraKeyType b_type;

if (!key_operator_->ExtractTeraKey(a, &a_key, &a_col, &a_qual, &a_ts, &a_type)) {
return key_operator_->Compare(a, b);
}
if (!key_operator_->ExtractTeraKey(b, &b_key, &b_col, &b_qual, &b_ts, &b_type)) {
return key_operator_->Compare(a, b);
}
return a_key.compare(b_key);
}

virtual void FindShortestSeparator(
std::string* start,
const Slice& limit) const {
return;
}

virtual void FindShortSuccessor(std::string* key) const {
return;
}

private:
const RawKeyOperator* key_operator_;
};

class BytewiseComparatorImpl : public Comparator {
public:
BytewiseComparatorImpl() { }
Expand Down Expand Up @@ -156,4 +196,8 @@ const Comparator* TeraTTLKvComparator() {
return terakv;
}

Comparator* NewRowKeyComparator(const RawKeyOperator* key_operator) {
Comparator* cmp = new RowKeyComparator(key_operator);
return cmp;
}
} // namespace leveldb

0 comments on commit e2ea249

Please sign in to comment.