Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#541: qualifier range scan support #544

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 122 additions & 28 deletions src/io/tablet_io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,21 @@ bool TabletIO::Read(const leveldb::Slice& key, std::string* value,
return true;
}

void TabletIO::SeekIterator(const std::string& row, const std::string& col,
const std::string& qual, int64_t ts,
leveldb::Iterator* scan_it) {
std::string seek_key;
m_key_operator->EncodeTeraKey(row, col, qual, ts, leveldb::TKT_FORSEEK,
&seek_key);

VLOG(10) << "ll-scan: " << "seek to " << DebugString(row) << ":"
<< DebugString(col) << ":" << DebugString(qual)
<< std::hex << ts;

scan_it->Seek(seek_key);
VLOG(10) << "ll-scan: seek done";
}

StatusCode TabletIO::InitedScanInterator(const std::string& start_tera_key,
const ScanOptions& scan_options,
leveldb::Iterator** scan_it) {
Expand All @@ -586,6 +601,7 @@ StatusCode TabletIO::InitedScanInterator(const std::string& start_tera_key,
VLOG(10) << "ll-scan: " << "startkey=[" << DebugString(start_key.ToString()) << ":"
<< DebugString(start_col.ToString()) << ":" << DebugString(start_qual.ToString());
std::string start_seek_key;

m_key_operator->EncodeTeraKey(start_key.ToString(), "", "", kLatestTs,
leveldb::TKT_FORSEEK, &start_seek_key);
(*scan_it)->Seek(start_seek_key);
Expand Down Expand Up @@ -638,8 +654,9 @@ inline bool TabletIO::LowLevelScan(const std::string& start_tera_key,
*read_bytes = 0;
int64_t now_time = GetTimeStampInMs();
int64_t time_out = now_time + scan_options.timeout;
bool has_filter = scan_options.filter_list.filter_size() > 0;
KeyValuePair next_start_kv_pair;
VLOG(9) << "ll-scan timeout set to be " << scan_options.timeout;
VLOG(9) << "ll-scan timeout set to be " << scan_options.timeout << ", has filter " << has_filter;

for (; it->Valid();) {
bool has_merged = false;
Expand All @@ -658,11 +675,6 @@ inline bool TabletIO::LowLevelScan(const std::string& start_tera_key,
it->Next();
continue;
}
if (now_time > time_out) {
VLOG(9) << "ll-scan timeout. Mark next start key: " << DebugString(tera_key.ToString());
MakeKvPair(key, col, qual, ts, "", &next_start_kv_pair);
break;
}

VLOG(10) << "ll-scan: " << "tablet=[" << m_tablet_path
<< "] key=[" << DebugString(key.ToString())
Expand All @@ -675,6 +687,62 @@ inline bool TabletIO::LowLevelScan(const std::string& start_tera_key,
break;
}

if (now_time > time_out) {
VLOG(9) << "ll-scan timeout. Mark next start key: " << DebugString(tera_key.ToString());
MakeKvPair(key, col, qual, ts, "", &next_start_kv_pair);
break;
}

// qualifier range seek
if (scan_options.qu_range.size()) {
VLOG(10) << "filter by qualifier, size " << scan_options.qu_range.size();
QualifierRange::const_iterator qu_it;
qu_it = scan_options.qu_range.lower_bound(col.ToString());
if (qu_it == scan_options.qu_range.end()) {
// seek to next key
std::string next_key;
m_key_operator->FindSuccessor(key.ToString(), &next_key);
SeekIterator(next_key, "", "", kLatestTs, it);
VLOG(10) << "seek to next_key " << DebugString(next_key);
continue;
} else {
leveldb::Slice scan_cf = qu_it->first;
if (scan_cf.compare(col) > 0) {
// seek to next cf
SeekIterator(key.ToString(), scan_cf.ToString(), "", kLatestTs, it);
VLOG(10) << "seek to next cf " << DebugString(scan_cf.ToString());
continue;
}
// check qualifier range wether match or not
leveldb::Slice scan_qu_start, scan_qu_end;
const std::pair<std::string, std::string>& scan_qu_range = qu_it->second;
scan_qu_start = scan_qu_range.first;
scan_qu_end = scan_qu_range.second;
// qualifier range not match
if (scan_qu_start.compare(qual) > 0) {
// seek to start qual
SeekIterator(key.ToString(), scan_cf.ToString(), scan_qu_start.ToString(), kLatestTs, it);
continue;
} else if (scan_qu_end.compare(qual) < 0) {
// seek to next cf
++qu_it;
if (qu_it == scan_options.qu_range.end()) {
// seek to next key
std::string next_key;
m_key_operator->FindSuccessor(key.ToString(), &next_key);
SeekIterator(next_key, "", "", kLatestTs, it);
continue;
} else {
scan_cf = qu_it->first;
// seek to next cf
SeekIterator(key.ToString(), scan_cf.ToString(), "", kLatestTs, it);
continue;
}
}
// qualifier range match
}
}

const std::set<std::string>& cf_set = scan_options.iter_cf_set;
if (cf_set.size() > 0 &&
cf_set.find(col.ToString()) == cf_set.end() &&
Expand Down Expand Up @@ -709,8 +777,10 @@ inline bool TabletIO::LowLevelScan(const std::string& start_tera_key,

if (key.compare(last_key) != 0) {
*read_row_count += 1;
ProcessRowBuffer(row_buf, scan_options, value_list, &buffer_size);
row_buf.clear();
if (has_filter) {
ProcessRowBuffer(row_buf, scan_options, value_list, &buffer_size);
row_buf.clear();
}
}

// max version filter
Expand Down Expand Up @@ -744,7 +814,14 @@ inline bool TabletIO::LowLevelScan(const std::string& start_tera_key,

KeyValuePair kv;
MakeKvPair(key, col, qual, ts, value, &kv);
row_buf.push_back(kv);
if (!has_filter) {
if (!FilterCell(scan_options, col.ToString(), qual.ToString(), ts)) {
value_list->add_key_values()->CopyFrom(kv);
buffer_size += key.size() + col.size() + qual.size() + sizeof(ts) + value.size();
}
} else {
row_buf.push_back(kv);
}

// check scan buffer
if (buffer_size >= scan_options.max_size) {
Expand All @@ -757,8 +834,9 @@ inline bool TabletIO::LowLevelScan(const std::string& start_tera_key,
}

// process the last row of tablet
ProcessRowBuffer(row_buf, scan_options, value_list, &buffer_size);

if (has_filter) {
ProcessRowBuffer(row_buf, scan_options, value_list, &buffer_size);
}
leveldb::Status it_status;
if (!it->Valid()) {
it_status = it->status();
Expand Down Expand Up @@ -1425,6 +1503,15 @@ void TabletIO::SetupScanRowOptions(const ScanTabletRequest* request,
if (request->timeout()) {
scan_options->timeout = request->timeout();
}
// setup qualifier range
if (request->qu_range_size()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个判断没用吧

for (uint32_t i = 0; i < (uint32_t)request->qu_range_size(); i++) {
const ScanQualifierRange& range = request->qu_range(i);
scan_options->qu_range.insert(
std::pair<std::string, std::pair<std::string, std::string> >(range.cf(),
std::pair<std::string, std::string>(range.qu_start(), range.qu_end())));
}
}

scan_options->snapshot_id = request->snapshot_id();
}
Expand Down Expand Up @@ -1677,31 +1764,38 @@ void TabletIO::ProcessRowBuffer(std::list<KeyValuePair>& row_buf,
const std::string& value = it->value();
int64_t ts = it->timestamp();

// skip unnecessary columns and qualifiers
if (scan_options.column_family_list.size() > 0) {
ColumnFamilyMap::const_iterator it =
scan_options.column_family_list.find(col);
if (it != scan_options.column_family_list.end()) {
const std::set<std::string>& qual_list = it->second;
if (qual_list.size() > 0 && qual_list.end() == qual_list.find(qual)) {
continue;
}
} else {
continue;
}
}
// time range filter
if (ts < scan_options.ts_start || ts > scan_options.ts_end) {
if (FilterCell(scan_options, col, qual, ts)) {
continue;
}

value_list->add_key_values()->CopyFrom(*it);

*buffer_size += key.size() + col.size() + qual.size()
+ sizeof(ts) + value.size();
}
}

bool TabletIO::FilterCell(const ScanOptions& scan_options,
const std::string& col,
const std::string& qual, int64_t ts) {
// skip unnecessary columns and qualifiers
if (scan_options.column_family_list.size() > 0) {
ColumnFamilyMap::const_iterator it =
scan_options.column_family_list.find(col);
if (it != scan_options.column_family_list.end()) {
const std::set<std::string>& qual_list = it->second;
if (qual_list.size() > 0 && qual_list.end() == qual_list.find(qual)) {
return true;
}
} else {
return true;
}
}
// time range filter
if (ts < scan_options.ts_start || ts > scan_options.ts_end) {
return true;
}
return false;
}

uint64_t TabletIO::GetSnapshot(uint64_t id, uint64_t snapshot_sequence,
StatusCode* status) {
{
Expand Down
10 changes: 10 additions & 0 deletions src/io/tablet_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ class TabletIO {
kUnLoading2 = kTabletUnLoading2
};
typedef std::map< std::string, std::set<std::string> > ColumnFamilyMap;
typedef std::map< std::string, std::pair<std::string, std::string> > QualifierRange;
struct ScanOptions {
uint32_t max_versions;
uint32_t max_size;
int64_t ts_start;
int64_t ts_end;
QualifierRange qu_range;// {cf, <qu_start, qu_end>}
uint64_t snapshot_id;
FilterList filter_list;
ColumnFamilyMap column_family_list;
Expand Down Expand Up @@ -188,6 +190,9 @@ class TabletIO {
const ScanOptions& scan_options,
RowResult* value_list,
uint32_t* buffer_size);
bool FilterCell(const ScanOptions& scan_options,
const std::string& col,
const std::string& qual, int64_t ts);

StatusCode InitedScanInterator(const std::string& start_tera_key,
const ScanOptions& scan_options,
Expand Down Expand Up @@ -220,6 +225,11 @@ class TabletIO {
void MakeKvPair(leveldb::Slice key, leveldb::Slice col, leveldb::Slice qual,
int64_t ts, leveldb::Slice value, KeyValuePair* kv);


void SeekIterator(const std::string& row, const std::string& col,
const std::string& qual, int64_t ts,
leveldb::Iterator* scan_it);

private:
mutable Mutex m_mutex;
TabletWriter* m_async_writer;
Expand Down
3 changes: 3 additions & 0 deletions src/leveldb/include/leveldb/raw_key_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ class RawKeyOperator {
TeraKeyType* type) const = 0;
virtual int Compare(const Slice& key1,
const Slice& key2) const = 0;

virtual void FindSuccessor(const std::string& row_key,
std::string* successor_key) const = 0;
};

const RawKeyOperator* ReadableRawKeyOperator();
Expand Down
18 changes: 18 additions & 0 deletions src/leveldb/util/raw_key_operator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ class ReadableRawKeyOperatorImpl : public RawKeyOperator {
virtual int Compare(const Slice& key1, const Slice& key2) const {
return key1.compare(key2);
}

virtual void FindSuccessor(const std::string& row_key,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个函数是不是qualifier也可以调用

std::string* successor_key) const {
*successor_key = row_key;
successor_key->push_back('\x1');
}
};

/**
Expand Down Expand Up @@ -228,6 +234,12 @@ class BinaryRawKeyOperatorImpl : public RawKeyOperator {
Slice ts_type2(data2 + size2 - 12, 8);
return ts_type1.compare(ts_type2);
}

virtual void FindSuccessor(const std::string& row_key,
std::string* successor_key) const {
*successor_key = row_key;
successor_key->push_back('\x0');
}
};

// support KV-pair with TTL, Key's format :
Expand Down Expand Up @@ -275,6 +287,12 @@ class KvRawKeyOperatorImpl : public RawKeyOperator {
}
return r;
}

virtual void FindSuccessor(const std::string& row_key,
std::string* successor_key) const {
*successor_key = row_key;
successor_key->push_back('\x0');
}
};

static pthread_once_t once = PTHREAD_ONCE_INIT;
Expand Down
6 changes: 6 additions & 0 deletions src/proto/tabletnode_rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,11 @@ message ResultCell {
optional bytes value = 5;
};

message ScanQualifierRange {
optional bytes cf = 1;
optional bytes qu_start = 2;
optional bytes qu_end = 3;
};

message ScanTabletRequest {
optional uint64 sequence_id = 1;
Expand All @@ -232,6 +237,7 @@ message ScanTabletRequest {
optional bool part_of_session = 17;
optional int64 timestamp = 18 [default = 0];
optional int64 timeout = 19;
repeated ScanQualifierRange qu_range = 20;
}

message ScanTabletResponse {
Expand Down
6 changes: 6 additions & 0 deletions src/sdk/scan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ void ScanDescriptor::SetPackInterval(int64_t interval) {
_impl->SetPackInterval(interval);
}

void ScanDescriptor::AddQualifierRange(const std::string& cf,
const std::string& qu_start,
const std::string& qu_end) {
_impl->AddQualifierRange(cf, qu_start, qu_end);
}

void ScanDescriptor::SetTimeRange(int64_t ts_end, int64_t ts_start) {
_impl->SetTimeRange(ts_end, ts_start);
}
Expand Down
23 changes: 23 additions & 0 deletions src/sdk/scan_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,7 @@ ScanDescImpl::ScanDescImpl(const ScanDescImpl& impl)
for (int32_t i = 0; i < impl.GetSizeofColumnFamilyList(); ++i) {
_cf_list.push_back(new tera::ColumnFamily(*(impl.GetColumnFamily(i))));
}
_qu_range = impl._qu_range;
}

ScanDescImpl::~ScanDescImpl() {
Expand Down Expand Up @@ -589,6 +590,28 @@ void ScanDescImpl::SetPackInterval(int64_t interval) {
_pack_interval = interval;
}

void ScanDescImpl::AddQualifierRange(const std::string& cf,
const std::string& qu_start,
const std::string& qu_end) {
if (cf.size()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

代码里好几处这样的写法,拿0当false不好吧,empty不就是干这个的么

VLOG(12) << "add qual, " << cf << ":" << qu_start << ":" << qu_end;
_qu_range.insert(std::pair<std::string,
std::pair<std::string, std::string> >(cf, std::pair<std::string, std::string>(qu_start, qu_end)));
}
}

void ScanDescImpl::SetQualifierRange(ScanTabletRequest* request) {
std::map<std::string, std::pair<std::string, std::string> >::iterator it = _qu_range.begin();
for (; it != _qu_range.end(); ++it) {
ScanQualifierRange* qu_range = request->add_qu_range();
qu_range->set_cf(it->first);
const std::pair<std::string, std::string>& range = it->second;
qu_range->set_qu_start(range.first);
qu_range->set_qu_end(range.second);
VLOG(12) << "set qual " << qu_range->DebugString();
}
}

void ScanDescImpl::SetTimeRange(int64_t ts_end, int64_t ts_start) {
if (_timer_range == NULL) {
_timer_range = new tera::TimeRange;
Expand Down
Loading