Skip to content

Commit

Permalink
add timeoracle implement.
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzongjia committed Jun 8, 2017
1 parent b33e170 commit e03a492
Show file tree
Hide file tree
Showing 16 changed files with 939 additions and 3 deletions.
19 changes: 16 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ MARK_SRC := src/benchmark/mark.cc src/benchmark/mark_main.cc
TEST_SRC := src/utils/test/prop_tree_test.cc src/utils/test/tprinter_test.cc \
src/io/test/tablet_io_test.cc src/io/test/tablet_scanner_test.cc \
src/master/test/master_impl_test.cc src/io/test/load_test.cc \
src/common/test/thread_pool_test.cc
src/common/test/thread_pool_test.cc

TIMEORACLE_SRC := $(wildcard src/timeoracle/*.cc) src/tera_entry.cc
TIMEORACLE_BENCH_SRC := src/timeoracle/bench/timeoracle_bench.cc

TEST_OUTPUT := test_output
UNITTEST_OUTPUT := $(TEST_OUTPUT)/unittest
Expand All @@ -69,14 +72,17 @@ MONITOR_OBJ := $(MONITOR_SRC:.cc=.o)
MARK_OBJ := $(MARK_SRC:.cc=.o)
HTTP_OBJ := $(HTTP_SRC:.cc=.o)
TEST_OBJ := $(TEST_SRC:.cc=.o)
TIMEORACLE_OBJ := $(TIMEORACLE_SRC:.cc=.o)
TIMEORACLE_BENCH_OBJ := $(TIMEORACLE_BENCH_SRC:.cc=.o)

ALL_OBJ := $(MASTER_OBJ) $(TABLETNODE_OBJ) $(IO_OBJ) $(SDK_OBJ) $(PROTO_OBJ) \
$(JNI_TERA_OBJ) $(OTHER_OBJ) $(COMMON_OBJ) $(SERVER_OBJ) $(CLIENT_OBJ) \
$(TEST_CLIENT_OBJ) $(TERA_C_OBJ) $(MONITOR_OBJ) $(MARK_OBJ) $(TEST_OBJ) \
$(SERVER_WRAPPER_OBJ)
$(SERVER_WRAPPER_OBJ) $(TIMEORACLE_OBJ)
LEVELDB_LIB := src/leveldb/libleveldb.a
LEVELDB_UTIL := src/leveldb/util/histogram.o src/leveldb/port/port_posix.o

PROGRAM = tera_main tera_master tabletserver teracli teramo tera_test
PROGRAM = tera_main tera_master tabletserver teracli teramo tera_test timeoracle timeoracle_bench
LIBRARY = libtera.a
SOLIBRARY = libtera.so
TERA_C_SO = libtera_c.so
Expand Down Expand Up @@ -151,6 +157,12 @@ tera_mark: $(MARK_OBJ) $(LIBRARY) $(LEVELDB_LIB)
tera_test: $(TEST_CLIENT_OBJ) $(LIBRARY)
$(CXX) -o $@ $(TEST_CLIENT_OBJ) $(LIBRARY) $(LDFLAGS)

timeoracle: $(TIMEORACLE_OBJ) $(PROTO_OBJ) $(COMMON_OBJ) $(OTHER_OBJ)
$(CXX) -o $@ $^ $(LDFLAGS)

timeoracle_bench : $(TIMEORACLE_BENCH_OBJ) $(LIBRARY)
$(CXX) -o $@ $^ $(LDFLAGS)

terahttp: $(HTTP_OBJ) $(PROTO_OBJ) $(LIBRARY)
$(CXX) -o $@ $^ $(LDFLAGS)

Expand All @@ -163,6 +175,7 @@ src/leveldb/libleveldb.a: FORCE
tera_bench:

# unit test
timeoracle_bench_test: src/timeoracle/test/timeoracle_bench_test.o $(LIBRARY)
thread_pool_test: src/common/test/thread_pool_test.o $(LIBRARY)
$(CXX) -o $@ $^ $(LDFLAGS)

Expand Down
3 changes: 3 additions & 0 deletions src/proto/status_code.proto
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ enum StatusCode {

kTableStatusEnable = 1000;
kTableStatusDisable = 1001;

// Timeoracle
kTimeoracleBusy = 2000;
}

enum TabletStatus {
Expand Down
38 changes: 38 additions & 0 deletions src/proto/timeoracle_client.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright (c) 2015, Baidu.com, Inc. All Rights Reserved
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "proto/timeoracle_client.h"

namespace tera {
namespace timeoracle {

ThreadPool* TimeoracleClient::thread_pool_ = NULL;

void TimeoracleClient::SetThreadPool(ThreadPool* thread_pool) {
thread_pool_ = thread_pool;
}

void TimeoracleClient::SetRpcOption(int32_t max_inflow, int32_t max_outflow,
int32_t pending_buffer_size, int32_t thread_num) {
RpcClientBase::SetOption(max_inflow, max_outflow,
pending_buffer_size, thread_num);
}

TimeoracleClient::TimeoracleClient(const std::string& server_addr,
int32_t rpc_timeout)
: RpcClient<TimeoracleServer::Stub>(server_addr),
rpc_timeout_(rpc_timeout) {}

TimeoracleClient::~TimeoracleClient() {}

bool TimeoracleClient::GetTimestamp(const GetTimestampRequest* request,
GetTimestampResponse* response,
std::function<void (GetTimestampRequest*, GetTimestampResponse*, bool, int)> done) {
return SendMessageWithRetry(&TimeoracleServer::Stub::GetTimestamp,
request, response, done, "GetTimestamp",
rpc_timeout_, thread_pool_);
}

} // namespace timeoracle
} // namespace tera
44 changes: 44 additions & 0 deletions src/proto/timeoracle_client.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright (c) 2015, Baidu.com, Inc. All Rights Reserved
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef TERA_TIMEORACLE_TIMEORACLE_CLIENT_H
#define TERA_TIMEORACLE_TIMEORACLE_CLIENT_H

#include <gflags/gflags.h>
#include <sofa/pbrpc/pbrpc.h>

#include "proto/timeoracle_rpc.pb.h"
#include "proto/rpc_client.h"

DECLARE_int32(tera_rpc_timeout_period);

class ThreadPool;

namespace tera {
namespace timeoracle {

class TimeoracleClient : public RpcClient<TimeoracleServer::Stub> {
public:
static void SetThreadPool(ThreadPool* thread_pool);

static void SetRpcOption(int32_t max_inflow = -1, int32_t max_outflow = -1,
int32_t pending_buffer_size = -1,
int32_t thread_num = -1);

TimeoracleClient(const std::string& addr = "",
int32_t rpc_timeout = FLAGS_tera_rpc_timeout_period);

~TimeoracleClient();

bool GetTimestamp(const GetTimestampRequest* request, GetTimestampResponse* response,
std::function<void (GetTimestampRequest*, GetTimestampResponse*, bool, int)> done);
private:
int32_t rpc_timeout_;
static ThreadPool* thread_pool_;
};

} // namespace timeoracle
} // namespace tera

#endif // TERA_TIMEORACLE_TIMEORACLE_CLIENT_H
20 changes: 20 additions & 0 deletions src/proto/timeoracle_rpc.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import "sofa/pbrpc/rpc_option.proto";
import "status_code.proto";

package tera;

message GetTimestampRequest {
optional uint64 number = 1;
}

message GetTimestampResponse {
optional StatusCode status = 1;
optional uint64 start_timestamp = 2;
optional uint64 number = 3;
}

service TimeoracleServer {
rpc GetTimestamp(GetTimestampRequest) returns(GetTimestampResponse);
}

option cc_generic_services = true;
6 changes: 6 additions & 0 deletions src/tera_flags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -281,3 +281,9 @@ DEFINE_int64(tera_sdk_status_timeout, 600, "(s) check tablet/tabletnode status t
DEFINE_string(tera_http_port, "8657", "the http proxy port of tera");
DEFINE_int32(tera_http_request_thread_num, 30, "the http proxy thread num for handle client request");
DEFINE_int32(tera_http_ctrl_thread_num, 10, "the http proxy thread num for it self");


///////// http /////////
DEFINE_string(tera_timeoracle_port, "30000", "the timeoracle port of tera");
DEFINE_int32(tera_timeoracle_max_lease_second, 30, "timeoracle work this seconds for a lease");
DEFINE_int32(tera_timeoracle_refresh_lease_second, 10, "timeoracle refresh lease before this seconds");
46 changes: 46 additions & 0 deletions src/timeoracle/bench/timeoracle_bench.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#include <iostream>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "common/mutex.h"
#include "common/thread_pool.h"
#include "common/this_thread.h"

#include "proto/timeoracle_client.h"

using namespace tera;
using namespace tera::timeoracle;

int main(int argc, char** argv) {
::google::ParseCommandLineFlags(&argc, &argv, true);

std::shared_ptr<common::ThreadPool> thread_pool(new common::ThreadPool(10));
timeoracle::TimeoracleClient::SetThreadPool(thread_pool.get());

GetTimestampRequest request;
GetTimestampResponse response;

request.set_number(10);
timeoracle::TimeoracleClient timeoracle_client("127.0.0.1:8881");

uint64_t x = 10000000000000ULL;
uint64_t i = 0;
while (++i) {
if ((i % 10) == 0) {
request.set_number(x);
} else {
request.set_number(10);
}

if (timeoracle_client.GetTimestamp(&request, &response, nullptr)) {
if (!response.has_status()) {
std::cout << "success ,timestamp=" << response.start_timestamp()
<< ",number=" << response.number() << std::endl;
continue;
}
}
std::cout << "failed,,code=" << (int)response.status() << std::endl;
ThisThread::Sleep(200);
}

return 0;
}
73 changes: 73 additions & 0 deletions src/timeoracle/remote_timeoracle.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright (c) 2015, Baidu.com, Inc. All Rights Reserved
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef TERA_TIMEORACLE_REMOTE_TIMEORACLE_H
#define TERA_TIMEORACLE_REMOTE_TIMEORACLE_H

#include <sofa/pbrpc/pbrpc.h>
#include "common/base/scoped_ptr.h"
#include "common/thread_pool.h"

#include "proto/timeoracle_rpc.pb.h"
#include "timeoracle/timeoracle.h"

namespace tera {
namespace timeoracle {

class ClosureGuard {
public:
ClosureGuard(::google::protobuf::Closure* done) : done_(done) {
}

~ClosureGuard() {
if (done_) {
done_->Run();
}
}

::google::protobuf::Closure* release() {
auto done = done_;
done_ = nullptr;
return done;
}

private:
ClosureGuard(const ClosureGuard&) = delete;
private:
::google::protobuf::Closure* done_;
};

class RemoteTimeoracle : public TimeoracleServer {
public:
RemoteTimeoracle(uint64_t start_timestamp) : timeoracle_(start_timestamp) {
}

virtual void GetTimestamp(::google::protobuf::RpcController* controller,
const ::tera::GetTimestampRequest* request,
::tera::GetTimestampResponse* response,
::google::protobuf::Closure* done) {
ClosureGuard closure_guard(done);

uint64_t number = request->number();
uint64_t start_timestamp = timeoracle_.GetTimestamp(number);
if (start_timestamp) {
response->set_start_timestamp(start_timestamp);
response->set_number(number);
} else {
response->set_status(kTimeoracleBusy);
}
}

Timeoracle* GetTimeoracle() {
return &timeoracle_;
}

private:
Timeoracle timeoracle_;
};

} // namespace timeoracle
} // namespace tera

#endif // TERA_TIMEORACLE_REMOTE_TIMEORACLE_H
13 changes: 13 additions & 0 deletions src/timeoracle/timeoracle.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright (c) 2015, Baidu.com, Inc. All Rights Reserved
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "timeoracle/timeoracle.h"

namespace tera {
namespace timeoracle {

std::atomic<uint64_t> Timeoracle::s_last_timestamp_ns;

} // namespace timeoracle
} // namespace tera
Loading

0 comments on commit e03a492

Please sign in to comment.