diff --git a/Makefile b/Makefile index 2644902ef..70bad27f5 100644 --- a/Makefile +++ b/Makefile @@ -47,7 +47,10 @@ MARK_SRC := src/benchmark/mark.cc src/benchmark/mark_main.cc TEST_SRC := src/utils/test/prop_tree_test.cc src/utils/test/tprinter_test.cc \ src/io/test/tablet_io_test.cc src/io/test/tablet_scanner_test.cc \ src/master/test/master_impl_test.cc src/io/test/load_test.cc \ - src/common/test/thread_pool_test.cc + src/common/test/thread_pool_test.cc + +TIMEORACLE_SRC := $(wildcard src/timeoracle/*.cc) src/tera_entry.cc +TIMEORACLE_BENCH_SRC := src/timeoracle/bench/timeoracle_bench.cc TEST_OUTPUT := test_output UNITTEST_OUTPUT := $(TEST_OUTPUT)/unittest @@ -69,14 +72,17 @@ MONITOR_OBJ := $(MONITOR_SRC:.cc=.o) MARK_OBJ := $(MARK_SRC:.cc=.o) HTTP_OBJ := $(HTTP_SRC:.cc=.o) TEST_OBJ := $(TEST_SRC:.cc=.o) +TIMEORACLE_OBJ := $(TIMEORACLE_SRC:.cc=.o) +TIMEORACLE_BENCH_OBJ := $(TIMEORACLE_BENCH_SRC:.cc=.o) + ALL_OBJ := $(MASTER_OBJ) $(TABLETNODE_OBJ) $(IO_OBJ) $(SDK_OBJ) $(PROTO_OBJ) \ $(JNI_TERA_OBJ) $(OTHER_OBJ) $(COMMON_OBJ) $(SERVER_OBJ) $(CLIENT_OBJ) \ $(TEST_CLIENT_OBJ) $(TERA_C_OBJ) $(MONITOR_OBJ) $(MARK_OBJ) $(TEST_OBJ) \ - $(SERVER_WRAPPER_OBJ) + $(SERVER_WRAPPER_OBJ) $(TIMEORACLE_OBJ) LEVELDB_LIB := src/leveldb/libleveldb.a LEVELDB_UTIL := src/leveldb/util/histogram.o src/leveldb/port/port_posix.o -PROGRAM = tera_main tera_master tabletserver teracli teramo tera_test +PROGRAM = tera_main tera_master tabletserver teracli teramo tera_test timeoracle timeoracle_bench LIBRARY = libtera.a SOLIBRARY = libtera.so TERA_C_SO = libtera_c.so @@ -151,6 +157,12 @@ tera_mark: $(MARK_OBJ) $(LIBRARY) $(LEVELDB_LIB) tera_test: $(TEST_CLIENT_OBJ) $(LIBRARY) $(CXX) -o $@ $(TEST_CLIENT_OBJ) $(LIBRARY) $(LDFLAGS) +timeoracle: $(TIMEORACLE_OBJ) $(PROTO_OBJ) $(COMMON_OBJ) $(OTHER_OBJ) + $(CXX) -o $@ $^ $(LDFLAGS) + +timeoracle_bench : $(TIMEORACLE_BENCH_OBJ) $(LIBRARY) + $(CXX) -o $@ $^ $(LDFLAGS) + terahttp: $(HTTP_OBJ) $(PROTO_OBJ) $(LIBRARY) $(CXX) -o $@ $^ $(LDFLAGS) @@ -163,6 +175,7 @@ src/leveldb/libleveldb.a: FORCE tera_bench: # unit test +timeoracle_bench_test: src/timeoracle/test/timeoracle_bench_test.o $(LIBRARY) thread_pool_test: src/common/test/thread_pool_test.o $(LIBRARY) $(CXX) -o $@ $^ $(LDFLAGS) diff --git a/src/proto/status_code.proto b/src/proto/status_code.proto index 24b0ff595..0d1c11062 100644 --- a/src/proto/status_code.proto +++ b/src/proto/status_code.proto @@ -96,6 +96,9 @@ enum StatusCode { kTableStatusEnable = 1000; kTableStatusDisable = 1001; + + // Timeoracle + kTimeoracleBusy = 2000; } enum TabletStatus { diff --git a/src/proto/timeoracle_client.cc b/src/proto/timeoracle_client.cc new file mode 100644 index 000000000..5315fe84e --- /dev/null +++ b/src/proto/timeoracle_client.cc @@ -0,0 +1,38 @@ +// Copyright (c) 2015, Baidu.com, Inc. All Rights Reserved +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "proto/timeoracle_client.h" + +namespace tera { +namespace timeoracle { + +ThreadPool* TimeoracleClient::thread_pool_ = NULL; + +void TimeoracleClient::SetThreadPool(ThreadPool* thread_pool) { + thread_pool_ = thread_pool; +} + +void TimeoracleClient::SetRpcOption(int32_t max_inflow, int32_t max_outflow, + int32_t pending_buffer_size, int32_t thread_num) { + RpcClientBase::SetOption(max_inflow, max_outflow, + pending_buffer_size, thread_num); +} + +TimeoracleClient::TimeoracleClient(const std::string& server_addr, + int32_t rpc_timeout) + : RpcClient(server_addr), + rpc_timeout_(rpc_timeout) {} + +TimeoracleClient::~TimeoracleClient() {} + +bool TimeoracleClient::GetTimestamp(const GetTimestampRequest* request, + GetTimestampResponse* response, + std::function done) { + return SendMessageWithRetry(&TimeoracleServer::Stub::GetTimestamp, + request, response, done, "GetTimestamp", + rpc_timeout_, thread_pool_); +} + +} // namespace timeoracle +} // namespace tera diff --git a/src/proto/timeoracle_client.h b/src/proto/timeoracle_client.h new file mode 100644 index 000000000..01d4743cc --- /dev/null +++ b/src/proto/timeoracle_client.h @@ -0,0 +1,44 @@ +// Copyright (c) 2015, Baidu.com, Inc. All Rights Reserved +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef TERA_TIMEORACLE_TIMEORACLE_CLIENT_H +#define TERA_TIMEORACLE_TIMEORACLE_CLIENT_H + +#include +#include + +#include "proto/timeoracle_rpc.pb.h" +#include "proto/rpc_client.h" + +DECLARE_int32(tera_rpc_timeout_period); + +class ThreadPool; + +namespace tera { +namespace timeoracle { + +class TimeoracleClient : public RpcClient { +public: + static void SetThreadPool(ThreadPool* thread_pool); + + static void SetRpcOption(int32_t max_inflow = -1, int32_t max_outflow = -1, + int32_t pending_buffer_size = -1, + int32_t thread_num = -1); + + TimeoracleClient(const std::string& addr = "", + int32_t rpc_timeout = FLAGS_tera_rpc_timeout_period); + + ~TimeoracleClient(); + + bool GetTimestamp(const GetTimestampRequest* request, GetTimestampResponse* response, + std::function done); +private: + int32_t rpc_timeout_; + static ThreadPool* thread_pool_; +}; + +} // namespace timeoracle +} // namespace tera + +#endif // TERA_TIMEORACLE_TIMEORACLE_CLIENT_H diff --git a/src/proto/timeoracle_rpc.proto b/src/proto/timeoracle_rpc.proto new file mode 100644 index 000000000..1f6c62759 --- /dev/null +++ b/src/proto/timeoracle_rpc.proto @@ -0,0 +1,20 @@ +import "sofa/pbrpc/rpc_option.proto"; +import "status_code.proto"; + +package tera; + +message GetTimestampRequest { + optional uint64 number = 1; +} + +message GetTimestampResponse { + optional StatusCode status = 1; + optional uint64 start_timestamp = 2; + optional uint64 number = 3; +} + +service TimeoracleServer { + rpc GetTimestamp(GetTimestampRequest) returns(GetTimestampResponse); +} + +option cc_generic_services = true; diff --git a/src/tera_flags.cc b/src/tera_flags.cc index efe0201e0..34d95ae04 100644 --- a/src/tera_flags.cc +++ b/src/tera_flags.cc @@ -281,3 +281,9 @@ DEFINE_int64(tera_sdk_status_timeout, 600, "(s) check tablet/tabletnode status t DEFINE_string(tera_http_port, "8657", "the http proxy port of tera"); DEFINE_int32(tera_http_request_thread_num, 30, "the http proxy thread num for handle client request"); DEFINE_int32(tera_http_ctrl_thread_num, 10, "the http proxy thread num for it self"); + + +///////// http ///////// +DEFINE_string(tera_timeoracle_port, "30000", "the timeoracle port of tera"); +DEFINE_int32(tera_timeoracle_max_lease_second, 30, "timeoracle work this seconds for a lease"); +DEFINE_int32(tera_timeoracle_refresh_lease_second, 10, "timeoracle refresh lease before this seconds"); diff --git a/src/timeoracle/bench/timeoracle_bench.cc b/src/timeoracle/bench/timeoracle_bench.cc new file mode 100644 index 000000000..044cbcfc5 --- /dev/null +++ b/src/timeoracle/bench/timeoracle_bench.cc @@ -0,0 +1,46 @@ +#include +#include +#include +#include "common/mutex.h" +#include "common/thread_pool.h" +#include "common/this_thread.h" + +#include "proto/timeoracle_client.h" + +using namespace tera; +using namespace tera::timeoracle; + +int main(int argc, char** argv) { + ::google::ParseCommandLineFlags(&argc, &argv, true); + + std::shared_ptr thread_pool(new common::ThreadPool(10)); + timeoracle::TimeoracleClient::SetThreadPool(thread_pool.get()); + + GetTimestampRequest request; + GetTimestampResponse response; + + request.set_number(10); + timeoracle::TimeoracleClient timeoracle_client("127.0.0.1:8881"); + + uint64_t x = 10000000000000ULL; + uint64_t i = 0; + while (++i) { + if ((i % 10) == 0) { + request.set_number(x); + } else { + request.set_number(10); + } + + if (timeoracle_client.GetTimestamp(&request, &response, nullptr)) { + if (!response.has_status()) { + std::cout << "success ,timestamp=" << response.start_timestamp() + << ",number=" << response.number() << std::endl; + continue; + } + } + std::cout << "failed,,code=" << (int)response.status() << std::endl; + ThisThread::Sleep(200); + } + + return 0; +} diff --git a/src/timeoracle/remote_timeoracle.h b/src/timeoracle/remote_timeoracle.h new file mode 100644 index 000000000..f732581d4 --- /dev/null +++ b/src/timeoracle/remote_timeoracle.h @@ -0,0 +1,73 @@ +// Copyright (c) 2015, Baidu.com, Inc. All Rights Reserved +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef TERA_TIMEORACLE_REMOTE_TIMEORACLE_H +#define TERA_TIMEORACLE_REMOTE_TIMEORACLE_H + +#include +#include "common/base/scoped_ptr.h" +#include "common/thread_pool.h" + +#include "proto/timeoracle_rpc.pb.h" +#include "timeoracle/timeoracle.h" + +namespace tera { +namespace timeoracle { + +class ClosureGuard { +public: + ClosureGuard(::google::protobuf::Closure* done) : done_(done) { + } + + ~ClosureGuard() { + if (done_) { + done_->Run(); + } + } + + ::google::protobuf::Closure* release() { + auto done = done_; + done_ = nullptr; + return done; + } + +private: + ClosureGuard(const ClosureGuard&) = delete; +private: + ::google::protobuf::Closure* done_; +}; + +class RemoteTimeoracle : public TimeoracleServer { +public: + RemoteTimeoracle(uint64_t start_timestamp) : timeoracle_(start_timestamp) { + } + + virtual void GetTimestamp(::google::protobuf::RpcController* controller, + const ::tera::GetTimestampRequest* request, + ::tera::GetTimestampResponse* response, + ::google::protobuf::Closure* done) { + ClosureGuard closure_guard(done); + + uint64_t number = request->number(); + uint64_t start_timestamp = timeoracle_.GetTimestamp(number); + if (start_timestamp) { + response->set_start_timestamp(start_timestamp); + response->set_number(number); + } else { + response->set_status(kTimeoracleBusy); + } + } + + Timeoracle* GetTimeoracle() { + return &timeoracle_; + } + +private: + Timeoracle timeoracle_; +}; + +} // namespace timeoracle +} // namespace tera + +#endif // TERA_TIMEORACLE_REMOTE_TIMEORACLE_H diff --git a/src/timeoracle/timeoracle.cc b/src/timeoracle/timeoracle.cc new file mode 100644 index 000000000..e78e2f26e --- /dev/null +++ b/src/timeoracle/timeoracle.cc @@ -0,0 +1,13 @@ +// Copyright (c) 2015, Baidu.com, Inc. All Rights Reserved +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "timeoracle/timeoracle.h" + +namespace tera { +namespace timeoracle { + +std::atomic Timeoracle::s_last_timestamp_ns; + +} // namespace timeoracle +} // namespace tera diff --git a/src/timeoracle/timeoracle.h b/src/timeoracle/timeoracle.h new file mode 100644 index 000000000..d39ddc098 --- /dev/null +++ b/src/timeoracle/timeoracle.h @@ -0,0 +1,115 @@ +// Copyright (c) 2015, Baidu.com, Inc. All Rights Reserved +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef TERA_TIMEORACLE_TIMEORACLE_H_ +#define TERA_TIMEORACLE_TIMEORACLE_H_ + +#include +#include +#include +#include +#include + +namespace tera { +namespace timeoracle { + +constexpr uint64_t kNanoPerSecond = 1000000000ULL; + +inline uint64_t clock_realtime_ns() { + struct timespec tp; + ::clock_gettime(CLOCK_REALTIME, &tp); + return tp.tv_sec * kNanoPerSecond + tp.tv_nsec; +} + +class Timeoracle { +public: + Timeoracle(uint64_t start_timestamp) : start_timestamp_(start_timestamp), + limit_timestamp_(0) { + } + + // if num == 0, see next timstamp + uint64_t GetTimestamp(uint64_t num) { + uint64_t start_timestamp = start_timestamp_.fetch_add(num); + + if ((start_timestamp + num) >= limit_timestamp_) { + return 0; + } + + return start_timestamp; + } + + uint64_t UpdateLimitTimestamp(uint64_t limit_timestamp) { + if (limit_timestamp > limit_timestamp_) { + limit_timestamp_ = limit_timestamp; + } else { + LOG(ERROR) << "update limit timestamp failed, limit_timestamp_=" << limit_timestamp_ + << ",update to " << limit_timestamp; + return 0; + } + return limit_timestamp; + } + + void UpdateStartTimestamp() { + const uint64_t cur_timestamp_ns = UniqueTimestampNs(); + /* + if (cur_timestamp_ns >= limit_timestamp_) { + return ; + } + */ + + while (1) { + uint64_t start_timestamp = start_timestamp_; + if (start_timestamp < cur_timestamp_ns) { + if (start_timestamp_.compare_exchange_strong(start_timestamp, cur_timestamp_ns)) { + return ; + } + continue; + } + uint64_t limit_timestamp = limit_timestamp_; + if (start_timestamp > limit_timestamp) { + if (start_timestamp_.compare_exchange_strong(start_timestamp, limit_timestamp)) { + return ; + } + continue; + } + break; + } + } + + uint64_t GetStartTimestamp() const { + return start_timestamp_; + } + + uint64_t GetLimitTimestamp() const { + return limit_timestamp_; + } + +private: + std::atomic start_timestamp_; + std::atomic limit_timestamp_; + +public: + static uint64_t UniqueTimestampNs() { + while (true) { + uint64_t ts = clock_realtime_ns(); + uint64_t last_timestamp_ns = s_last_timestamp_ns; + + if (ts <= last_timestamp_ns) { + return s_last_timestamp_ns.fetch_add(1) + 1 ; + } + + if (s_last_timestamp_ns.compare_exchange_strong(last_timestamp_ns, ts)) { + return ts; + } + } + } + +private: + static std::atomic s_last_timestamp_ns; +}; + +} // namespace timeoracle +} // namespace tera + +#endif // TERA_TIMEORACLE_TIMEORACLE_H_ diff --git a/src/timeoracle/timeoracle_entry.cc b/src/timeoracle/timeoracle_entry.cc new file mode 100644 index 000000000..d8f219ec9 --- /dev/null +++ b/src/timeoracle/timeoracle_entry.cc @@ -0,0 +1,139 @@ +// Copyright (c) 2015, Baidu.com, Inc. All Rights Reserved +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "timeoracle/timeoracle_entry.h" + +#include "common/net/ip_address.h" +#include "common/this_thread.h" +#include +#include +#include "utils/utils_cmd.h" + +#include "timeoracle/remote_timeoracle.h" +#include "timeoracle/timeoracle_zk_adapter.h" +#include + +DECLARE_string(tera_timeoracle_port); +DECLARE_string(tera_local_addr); +DECLARE_int32(tera_timeoracle_refresh_lease_second); +DECLARE_int32(tera_timeoracle_max_lease_second); + +namespace tera { +namespace timeoracle { + +TimeoracleEntry::TimeoracleEntry() : remote_timeoracle_(nullptr), need_quit_(false) { + sofa::pbrpc::RpcServerOptions rpc_options; + //rpc_options.max_throughput_in = FLAGS_tera_master_sofa_pbrpc_server_max_inflow; + //rpc_options.max_throughput_out = FLAGS_tera_master_sofa_pbrpc_server_max_outflow; + sofa_pbrpc_server_.reset(new sofa::pbrpc::RpcServer(rpc_options)); + + if (FLAGS_tera_local_addr.empty()) { + local_addr_ = utils::GetLocalHostName()+ ":" + FLAGS_tera_timeoracle_port; + } else { + local_addr_ = FLAGS_tera_local_addr + ":" + FLAGS_tera_timeoracle_port; + } +} + +bool TimeoracleEntry::Start() { + if (!InitZKAdaptor()) { + return false; + } + + if (!StartServer()) { + return false; + } + + return true; +} + +TimeoracleEntry::~TimeoracleEntry() { + need_quit_ = true; + if (lease_thread_.joinable()) { + lease_thread_.join(); + } +} + +bool TimeoracleEntry::InitZKAdaptor() { + startup_timestamp_ = 0; + //return true; + zk_adapter_.reset(new TimeoracleZkAdapter(local_addr_)); + return zk_adapter_->Init(&startup_timestamp_); +} + +bool TimeoracleEntry::StartServer() { + IpAddress timeoracle_addr("0.0.0.0", FLAGS_tera_timeoracle_port); + LOG(INFO) << "Start timeoracle RPC server at: " << timeoracle_addr.ToString(); + + remote_timeoracle_ = new RemoteTimeoracle(startup_timestamp_); + std::thread lease_thread(&TimeoracleEntry::LeaseThread, this); + lease_thread_ = std::move(lease_thread); + + + auto timeoracle = remote_timeoracle_->GetTimeoracle(); + + while (!timeoracle->GetLimitTimestamp()) { + if (need_quit_) { + return false; + } + ThisThread::Sleep(100); + } + + sofa_pbrpc_server_->RegisterService(remote_timeoracle_); + if (!sofa_pbrpc_server_->Start(timeoracle_addr.ToString())) { + LOG(ERROR) << "start timeoracle RPC server error"; + return false; + } + + LOG(INFO) << "finish start timeoracle RPC server"; + return true; +} + +bool TimeoracleEntry::Run() { + if (need_quit_) { + return false; + } + + remote_timeoracle_->GetTimeoracle()->UpdateStartTimestamp(); + + ThisThread::Sleep(1000); + return true; +} + +void TimeoracleEntry::ShutdownServer() { + need_quit_ = true; + sofa_pbrpc_server_->Stop(); +} + +void TimeoracleEntry::LeaseThread() { + auto timeoracle = remote_timeoracle_->GetTimeoracle(); + + while (!need_quit_) { + uint64_t start_timestamp = timeoracle->GetStartTimestamp(); + uint64_t limit_timestamp = timeoracle->GetLimitTimestamp(); + uint64_t refresh_lease_timestamp = + FLAGS_tera_timeoracle_refresh_lease_second * kNanoPerSecond; + + if (start_timestamp + refresh_lease_timestamp >= limit_timestamp) { + // need to require lease + if (limit_timestamp < start_timestamp) { + limit_timestamp = start_timestamp; + } + + uint64_t next_limit_timestamp = + limit_timestamp + FLAGS_tera_timeoracle_max_lease_second * kNanoPerSecond; + + if (!zk_adapter_->UpdateTimestamp(next_limit_timestamp)) { + need_quit_ = true; + return; + } + + timeoracle->UpdateLimitTimestamp(next_limit_timestamp); + } + + ThisThread::Sleep(1000); + } +} + +} // namespace timeoracle +} // namespace tera diff --git a/src/timeoracle/timeoracle_entry.h b/src/timeoracle/timeoracle_entry.h new file mode 100644 index 000000000..b68dda484 --- /dev/null +++ b/src/timeoracle/timeoracle_entry.h @@ -0,0 +1,49 @@ +// Copyright (c) 2015, Baidu.com, Inc. All Rights Reserved +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef TERA_TIMEORACLE_TIMEORACLE_ENTRY_H_ +#define TERA_TIMEORACLE_TIMEORACLE_ENTRY_H_ + +#include + +#include "common/base/scoped_ptr.h" +#include "tera_entry.h" +#include +#include + +namespace tera { +namespace timeoracle { + +class RemoteTimeoracle; +class TimeoracleZkAdapterBase; + +class TimeoracleEntry : public TeraEntry { +public: + TimeoracleEntry(); + ~TimeoracleEntry(); + + + virtual bool Start() override; + virtual bool Run() override; + virtual void ShutdownServer() override; + +private: + bool InitZKAdaptor(); + bool StartServer(); + void LeaseThread(); + +private: + std::string local_addr_; + RemoteTimeoracle* remote_timeoracle_; + scoped_ptr sofa_pbrpc_server_; + uint64_t startup_timestamp_; + scoped_ptr zk_adapter_; + std::thread lease_thread_; + std::atomic need_quit_; +}; + +} // namespace timeoracle +} // namespace tera + +#endif // TERA_TIMEORACLE_TIMEORACLE_ENTRY_H_ diff --git a/src/timeoracle/timeoracle_main.cc b/src/timeoracle/timeoracle_main.cc new file mode 100644 index 000000000..080fb3c20 --- /dev/null +++ b/src/timeoracle/timeoracle_main.cc @@ -0,0 +1,84 @@ +/*************************************************************************** + * + * Copyright (c) 2017 Baidu.com, Inc. All Rights Reserved + * $Id$ + * + **************************************************************************/ + + + +/** + * @file timeoracle/main.cpp + * @author chenzongjia(chenzongjia@baidu.com) + * @date 2017/05/19 14:02:55 + * @version $Revision + * @brief + * + **/ + +#include + +#include +#include + +#include "common/base/scoped_ptr.h" +#include "tera_entry.h" +#include "utils/utils_cmd.h" +#include "version.h" +#include "timeoracle/timeoracle_entry.h" +#include + +DECLARE_string(tera_log_prefix); + +volatile sig_atomic_t g_quit = 0; + +static void SignalIntHandler(int sig) { + g_quit = 1; +} + +int main(int argc, char* argv[]) { + ::google::ParseCommandLineFlags(&argc, &argv, true); + ::google::InitGoogleLogging(argv[0]); + if (!FLAGS_tera_log_prefix.empty()) { + tera::utils::SetupLog(FLAGS_tera_log_prefix); + } else { + tera::utils::SetupLog("timeoracle"); + } + + if (argc > 1) { + std::string ext_cmd = argv[1]; + if (ext_cmd == "version") { + PrintSystemVersion(); + return 0; + } + } + + signal(SIGINT, SignalIntHandler); + signal(SIGTERM, SignalIntHandler); + + scoped_ptr entry(new tera::timeoracle::TimeoracleEntry()); + + if (!entry->Start()) { + return -1; + } + + while (!g_quit) { + if (!entry->Run()) { + LOG(ERROR) << "Server run error ,and then exit now "; + break; + } + } + if (g_quit) { + LOG(INFO) << "received interrupt signal from user, will stop"; + } + + if (!entry->Shutdown()) { + return -1; + } + + return 0; +} + + + +/* vim: set ts=4 sw=4 sts=4 tw=100 */ diff --git a/src/timeoracle/timeoracle_zk_adapter.cc b/src/timeoracle/timeoracle_zk_adapter.cc new file mode 100644 index 000000000..f07b73566 --- /dev/null +++ b/src/timeoracle/timeoracle_zk_adapter.cc @@ -0,0 +1,197 @@ +// Copyright (c) 2015, Baidu.com, Inc. All Rights Reserved +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "timeoracle/timeoracle_zk_adapter.h" +#include "common/file/file_path.h" +#include "common/this_thread.h" +#include "types.h" +#include "zk/zk_util.h" + +DECLARE_string(tera_zk_addr_list); +DECLARE_string(tera_zk_root_path); +DECLARE_string(tera_fake_zk_path_prefix); +DECLARE_int32(tera_zk_timeout); +DECLARE_int64(tera_zk_retry_period); +DECLARE_int32(tera_zk_retry_max_times); + +namespace tera { +namespace timeoracle { + +void TimeoracleZkAdapterBase::OnNodeValueChanged(const std::string& path, + const std::string& value) { + LOG(INFO) << "zk OnNodeValueChanged, path=" << path; +} + +void TimeoracleZkAdapterBase::OnChildrenChanged(const std::string& path, + const std::vector& name_list, + const std::vector& data_list) { + LOG(INFO) << "zk OnChildrenChanged, path=" << path; +} + +void TimeoracleZkAdapterBase::OnNodeCreated(const std::string& path) { + LOG(INFO) << "zk OnNodeCreated, path=" << path; +} + +void TimeoracleZkAdapterBase::OnNodeDeleted(const std::string& path) { + LOG(INFO) << "zk OnNodeDeleted, path=" << path; +} + +void TimeoracleZkAdapterBase::OnWatchFailed(const std::string& path, int watch_type, + int err) { + LOG(INFO) << "zk OnWatchFailed, path=" << path; +} + +void TimeoracleZkAdapterBase::OnSessionTimeout() { + LOG(ERROR) << "zk session timeout!"; + _Exit(EXIT_FAILURE); +} + +bool TimeoracleZkAdapter::Init(uint64_t* last_timestamp) { + if (!InitZk()) { + return false; + } + + if (!LockTimeoracleLock()) { + return false; + } + + if (ReadTimestamp(last_timestamp)) { + LOG(INFO) << "read timestamp sucess,get start_timestamp=" << *last_timestamp; + return CreateTimeoracleNode(); + } + + return false; +} + +bool TimeoracleZkAdapter::CreateTimeoracleNode() { + LOG(INFO) << "try create timeoracle nod,path=" << kTimeoracleNodePath; + int32_t retry_count = 0; + int zk_errno = zk::ZE_OK; + while (!CreateEphemeralNode(kTimeoracleNodePath, server_addr_, &zk_errno)) { + if (retry_count++ >= FLAGS_tera_zk_retry_max_times) { + LOG(ERROR) << "fail to create timeoracle node"; + return false; + } + LOG(ERROR) << "retry create timeoracle node in " + << FLAGS_tera_zk_retry_period << " ms, retry=" << retry_count; + ThisThread::Sleep(FLAGS_tera_zk_retry_period); + zk_errno = zk::ZE_OK; + } + LOG(INFO) << "create timeoracle node success"; + return true; +} + +bool TimeoracleZkAdapter::InitZk() { + LOG(INFO) << "try to init zk,zk_addr_list=" << FLAGS_tera_zk_addr_list + << ",zk_root_path=" << FLAGS_tera_zk_root_path; + int zk_errno = zk::ZE_OK; + int32_t retry_count = 0; + while (!ZooKeeperAdapter::Init(FLAGS_tera_zk_addr_list, + FLAGS_tera_zk_root_path, + FLAGS_tera_zk_timeout, + server_addr_, &zk_errno)) { + if (retry_count++ >= FLAGS_tera_zk_retry_max_times) { + LOG(ERROR) << "fail to init zk: " << zk::ZkErrnoToString(zk_errno); + return false; + } + LOG(ERROR) << "init zk fail: " << zk::ZkErrnoToString(zk_errno) + << ". retry in " << FLAGS_tera_zk_retry_period << " ms, retry: " + << retry_count; + ThisThread::Sleep(FLAGS_tera_zk_retry_period); + zk_errno = zk::ZE_OK; + } + LOG(INFO) << "init zk success"; + return true; +} + +bool TimeoracleZkAdapter::LockTimeoracleLock() { + LOG(INFO) << "try to lock timeoracle lock,path=" << kTimeoracleLockPath; + int32_t retry_count = 0; + int zk_errno = zk::ZE_OK; + while (!SyncLock(kTimeoracleLockPath, &zk_errno, -1)) { + if (retry_count++ >= FLAGS_tera_zk_retry_max_times) { + LOG(ERROR) << "fail to acquire timeoracle lock"; + return false; + } + LOG(ERROR) << "retry lock timeoracle lock in " + << FLAGS_tera_zk_retry_period << " ms, retry=" << retry_count; + ThisThread::Sleep(FLAGS_tera_zk_retry_period); + zk_errno = zk::ZE_OK; + } + LOG(INFO) << "acquire timeoracle lock success"; + return true; +} + +bool TimeoracleZkAdapter::ReadTimestamp(uint64_t* timestamp) { + LOG(INFO) << "try to read timestamp, path=" << kTimeoracleTimestampPath; + + std::string timestamp_str; + int32_t retry_count = 0; + int zk_errno = zk::ZE_OK; + while (!ReadNode(kTimeoracleTimestampPath, ×tamp_str, &zk_errno) + && zk_errno != zk::ZE_NOT_EXIST) { + if (retry_count++ >= FLAGS_tera_zk_retry_max_times) { + LOG(ERROR) << "fail to read timestamp node"; + return false; + } + LOG(ERROR) << "retry read timestamp node in " + << FLAGS_tera_zk_retry_period << " ms, retry=" << retry_count; + ThisThread::Sleep(FLAGS_tera_zk_retry_period); + zk_errno = zk::ZE_OK; + } + if (zk_errno == zk::ZE_NOT_EXIST) { + *timestamp = 0; + return true; + } + + char * pEnd = nullptr; + *timestamp = ::strtoull(timestamp_str.c_str(), &pEnd, 10); + if (*pEnd != '\0') { + // TODO (chenzongjia) + LOG(WARNING) << "read invalid timestamp value=" << timestamp_str; + return false; + } + + LOG(INFO) << "read timestamp value=" << timestamp_str; + + return true; +} + +bool TimeoracleZkAdapter::UpdateTimestamp(uint64_t timestamp) { + char timestamp_str[64]; + snprintf(timestamp_str, sizeof(timestamp_str), "%lu", timestamp); + LOG(INFO) << "try to update timestamp to " << timestamp; + int zk_errno = zk::ZE_OK; + while (!WriteNode(kTimeoracleTimestampPath, timestamp_str, &zk_errno) + && zk_errno != zk::ZE_NOT_EXIST) { + return false; + /* + if (retry_count++ >= FLAGS_tera_zk_retry_max_times) { + LOG(INFO) << "fail to update timestamp"; + return false; + } + LOG(ERROR) << "retry update timestamp in " + << FLAGS_tera_zk_retry_period << " ms, retry=" << retry_count; + ThisThread::Sleep(FLAGS_tera_zk_retry_period); + zk_errno = zk::ZE_OK; + */ + } + if (zk_errno == zk::ZE_OK) { + LOG(INFO) << "update zk path=" << kTimeoracleTimestampPath << " to " + << timestamp_str << " success."; + return true; + } + + LOG(INFO) << "timestamp node not exist, try create timestamp node"; + zk_errno = zk::ZE_OK; + while (!CreatePersistentNode(kTimeoracleTimestampPath, timestamp_str, &zk_errno)) { + return false; + } + LOG(INFO) << "create timestamp node success"; + return true; + +} + +} // namespace timeoracle +} // namespace tera diff --git a/src/timeoracle/timeoracle_zk_adapter.h b/src/timeoracle/timeoracle_zk_adapter.h new file mode 100644 index 000000000..a869ccf82 --- /dev/null +++ b/src/timeoracle/timeoracle_zk_adapter.h @@ -0,0 +1,93 @@ +// Copyright (c) 2015, Baidu.com, Inc. All Rights Reserved +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef TERA_TIMEORACLE_TIMEORACLE_ZK_ADAPTER_H +#define TERA_TIMEORACLE_TIMEORACLE_ZK_ADAPTER_H + +#include +#include +#include "zk/zk_adapter.h" + +namespace tera { +namespace timeoracle { + +class TimeoracleZkAdapterBase: public zk::ZooKeeperAdapter { +public: + virtual ~TimeoracleZkAdapterBase() {}; + + // not thread safe + virtual bool Init(uint64_t* last_timestamp) = 0; + + // not thread safe + virtual bool UpdateTimestamp(uint64_t new_timestamp) = 0; + + virtual void OnChildrenChanged(const std::string& path, + const std::vector& name_list, + const std::vector& data_list) override; + + virtual void OnNodeValueChanged(const std::string& path, + const std::string& value) override; + + virtual void OnNodeCreated(const std::string& path) override; + + virtual void OnNodeDeleted(const std::string& path) override; + + virtual void OnWatchFailed(const std::string& path, int watch_type, + int err) override; + + virtual void OnSessionTimeout() final; +}; + +class TimeoracleZkAdapter : public TimeoracleZkAdapterBase { +public: + TimeoracleZkAdapter(const std::string& server_addr) : server_addr_(server_addr) { + } + + virtual bool Init(uint64_t* last_timestamp) override; + + virtual bool UpdateTimestamp(uint64_t new_timestamp) override; + +private: + bool InitZk(); + + bool LockTimeoracleLock(); + + bool ReadTimestamp(uint64_t* timestamp); + + bool CreateTimeoracleNode(); + +private: + std::string server_addr_; +}; + +/* + * This is not zookeeper! + * Just used on onebox for tasting tera briefly. + * This is implemented through local file system. + * Not support watching. + */ +class FakeTimeoracleZkAdapter: public TimeoracleZkAdapterBase { +public: + FakeTimeoracleZkAdapter(const std::string& server_addr) : server_addr_(server_addr) { + } + + // not thread safe + virtual bool Init(uint64_t* last_timestamp) override { + *last_timestamp = 0; + return true; + } + + // not thread safe + virtual bool UpdateTimestamp(uint64_t new_timestamp) override { + return true; + } + +private: + std::string server_addr_; +}; + +} // namespace timeoracle +} // namespace tera + +#endif // TERA_TIMEORACLE_TIMEORACLE_ZK_ADAPTER_H diff --git a/src/types.h b/src/types.h index bfad100da..5f6982714 100644 --- a/src/types.h +++ b/src/types.h @@ -27,6 +27,9 @@ const std::string kTsListPath = "/ts"; const std::string kKickPath = "/kick"; const std::string kRootTabletNodePath = "/root_table"; const std::string kSafeModeNodePath = "/safemode"; +const std::string kTimeoracleNodePath = "/timeoracle"; +const std::string kTimeoracleLockPath = "/timeoracle-lock"; +const std::string kTimeoracleTimestampPath = "/timeoracle-timestamp"; const std::string kSms = "[SMS] "; const std::string kMail = "[MAIL] "; const int64_t kLatestTs = INT64_MAX;