Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
ClemensElflein committed Sep 11, 2024
1 parent b643e04 commit a2c4330
Showing 2 changed files with 65 additions and 55 deletions.
109 changes: 59 additions & 50 deletions libxbot-service-interface/src/ServiceIO.cpp
Original file line number Diff line number Diff line change
@@ -21,7 +21,7 @@ using namespace xbot::serviceif;
* Maps endpoint to state. Pointers are used so that we can move the state for
* cheap once the endpoint changes.
*/
std::map<uint64_t, std::unique_ptr<ServiceState>> endpoint_map_{};
std::map<uint64_t, std::unique_ptr<ServiceState> > endpoint_map_{};
std::map<std::string, uint64_t> inverse_endpoint_map_{};

// Protects inverse_endpoint_map_, endpoint_map_ and instance_
@@ -34,11 +34,12 @@ std::mutex stopped_mtx_{};
bool stopped_{false};
// track when we last checked for claims and timeouts
std::chrono::time_point<std::chrono::steady_clock> last_check_{
std::chrono::seconds(0)};
std::chrono::seconds(0)
};

// keep a list of callbacks for each service
std::map<std::string, std::vector<ServiceIOCallbacks *>>
registered_callbacks_{};
std::map<std::string, std::vector<ServiceIOCallbacks *> >
registered_callbacks_{};

bool ServiceIOImpl::OnServiceDiscovered(std::string uid) {
std::unique_lock lk{state_mutex_};
@@ -52,8 +53,8 @@ bool ServiceIOImpl::OnServiceDiscovered(std::string uid) {
uint64_t key = BuildKey(service_ip, service_port);
if (endpoint_map_.contains(key)) {
spdlog::warn(
"Service state already exists, overwriting with new state. This "
"might have unforseen consequences.");
"Service state already exists, overwriting with new state. This "
"might have unforseen consequences.");
endpoint_map_.erase(key);
}
std::unique_ptr<ServiceState> state = std::make_unique<ServiceState>();
@@ -80,15 +81,15 @@ bool ServiceIOImpl::OnEndpointChanged(std::string uid, uint32_t old_ip, uint16_t

if (endpoint_map_.contains(new_key)) {
spdlog::warn(
"Service state already exists, overwriting with new state. This might "
"have unforseen consequences.");
"Service state already exists, overwriting with new state. This might "
"have unforseen consequences.");
endpoint_map_.erase(new_key);
}

if (!endpoint_map_.contains(old_key)) {
spdlog::warn(
"Service state did not exist, so we cannot update it. Creating a new "
"one instead.");
"Service state did not exist, so we cannot update it. Creating a new "
"one instead.");
endpoint_map_.emplace(new_key, std::make_unique<ServiceState>());
} else {
endpoint_map_.emplace(new_key, std::move(endpoint_map_.at(old_key)));
@@ -99,6 +100,7 @@ bool ServiceIOImpl::OnEndpointChanged(std::string uid, uint32_t old_ip, uint16_t

return true;
}

void ServiceIOImpl::SetBindAddress(std::string bind_address) {
io_socket_.SetBindAddress(bind_address);
}
@@ -111,15 +113,15 @@ ServiceIOImpl *ServiceIOImpl::GetInstance() {
return instance_;
}

bool ServiceIOImpl::Start() {
{
bool ServiceIOImpl::Start() { {
std::unique_lock lk{stopped_mtx_};
stopped_ = false;
}
io_thread_ = std::thread{&ServiceIOImpl::RunIo, this};

return true;
}

void ServiceIOImpl::RegisterCallbacks(const std::string &uid,
ServiceIOCallbacks *callbacks) {
std::unique_lock lk{state_mutex_};
@@ -133,9 +135,10 @@ void ServiceIOImpl::RegisterCallbacks(const std::string &uid,
// add the callbacks
vector.push_back(callbacks);
}

void ServiceIOImpl::UnregisterCallbacks(ServiceIOCallbacks *callbacks) {
std::unique_lock lk{state_mutex_};
for (auto [service_id, callback_list] : registered_callbacks_) {
for (auto [service_id, callback_list]: registered_callbacks_) {
for (auto cb_it = callback_list.begin(); cb_it != callback_list.end();) {
if (*cb_it == callbacks) {
// Erase
@@ -150,8 +153,7 @@ void ServiceIOImpl::UnregisterCallbacks(ServiceIOCallbacks *callbacks) {

bool ServiceIOImpl::SendData(const std::string &uid,
const std::vector<uint8_t> &data) {
uint64_t endpoint;
{
uint64_t endpoint; {
std::unique_lock lk{state_mutex_};
if (!inverse_endpoint_map_.contains(uid)) {
spdlog::warn("no endpoint for service {}", uid);
@@ -177,8 +179,9 @@ void ServiceIOImpl::RunIo() {
if (stopped_) break;
}
if (std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now() - last_check_) >
std::chrono::steady_clock::now() - last_check_) >
std::chrono::microseconds(1000000)) {
last_check_ = std::chrono::steady_clock::now();
spdlog::debug("running checks");
std::unique_lock lk{state_mutex_};
// Claim all unclaimed services and check for timeouts.
@@ -190,8 +193,8 @@ void ServiceIOImpl::RunIo() {
} else {
// Check for timeout
if (std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now() -
it->second->last_heartbeat_received_) >
std::chrono::steady_clock::now() -
it->second->last_heartbeat_received_) >
std::chrono::microseconds(config::default_heartbeat_micros +
config::heartbeat_jitter)) {
spdlog::warn("Service timed out, removing service.");
@@ -204,8 +207,8 @@ void ServiceIOImpl::RunIo() {

// Notify callbacks for that service
if (const auto cb_it = registered_callbacks_.find(uid);
cb_it != registered_callbacks_.end()) {
for (const auto &cb : cb_it->second) {
cb_it != registered_callbacks_.end()) {
for (const auto &cb: cb_it->second) {
cb->OnServiceDisconnected(uid);
}
}
@@ -282,7 +285,7 @@ void ServiceIOImpl::ClaimService(uint64_t key) {
auto now = std::chrono::steady_clock::now();

auto diff = std::chrono::duration_cast<std::chrono::microseconds>(
now - state->last_claim_sent_);
now - state->last_claim_sent_);
if (diff < std::chrono::microseconds(1000)) {
return;
}
@@ -310,11 +313,11 @@ void ServiceIOImpl::ClaimService(uint64_t key) {
header->sequence_no = 0;
header->flags = 0;
header->timestamp = std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
std::chrono::steady_clock::now().time_since_epoch())
.count();
header->payload_size = sizeof(datatypes::ClaimPayload);
auto payload_ptr = reinterpret_cast<datatypes::ClaimPayload *>(
packet.data() + sizeof(datatypes::XbotHeader));
packet.data() + sizeof(datatypes::XbotHeader));
payload_ptr->target_ip = IpStringToInt(my_ip);
payload_ptr->target_port = my_port;
payload_ptr->heartbeat_micros = config::default_heartbeat_micros;
@@ -332,6 +335,7 @@ bool ServiceIOImpl::TransmitPacket(uint64_t key,
}
return io_socket_.TransmitPacket(service_ip, service_port, data);
}

void ServiceIOImpl::HandleClaimMessage(uint64_t key,
xbot::datatypes::XbotHeader *header,
const uint8_t *payload,
@@ -360,17 +364,17 @@ void ServiceIOImpl::HandleClaimMessage(uint64_t key,

// Notify callbacks for that service
if (const auto it = registered_callbacks_.find(ptr->uid);
it != registered_callbacks_.end()) {
for (const auto &cb : it->second) {
it != registered_callbacks_.end()) {
for (const auto &cb: it->second) {
cb->OnServiceConnected(ptr->uid);
}
}
}

void ServiceIOImpl::HandleDataMessage(uint64_t key,
xbot::datatypes::XbotHeader *header,
const uint8_t *payload,
size_t payload_len) {
{
size_t payload_len) { {
std::unique_lock lk{state_mutex_};
if (!endpoint_map_.contains(key)) {
spdlog::debug("got data from wrong service");
@@ -385,18 +389,18 @@ void ServiceIOImpl::HandleDataMessage(uint64_t key,

// Notify callbacks for that service
if (const auto it = registered_callbacks_.find(ptr->uid);
it != registered_callbacks_.end()) {
for (const auto &cb : it->second) {
it != registered_callbacks_.end()) {
for (const auto &cb: it->second) {
cb->OnData(ptr->uid, header->timestamp, header->arg2, payload,
header->payload_size);
}
}
}

void ServiceIOImpl::HandleDataTransaction(uint64_t key,
xbot::datatypes::XbotHeader *header,
const uint8_t *payload,
size_t payload_len) {
{
size_t payload_len) { {
std::unique_lock lk{state_mutex_};
if (!endpoint_map_.contains(key)) {
// This happens if we restart the interface and an unknown service sends
@@ -415,8 +419,8 @@ void ServiceIOImpl::HandleDataTransaction(uint64_t key,

// Notify callbacks for that service
if (const auto it = registered_callbacks_.find(state_ptr->uid);
it != registered_callbacks_.end()) {
for (const auto &cb : it->second) {
it != registered_callbacks_.end()) {
for (const auto &cb: it->second) {
cb->OnTransactionStart(header->timestamp);
// Go through all data packets in the transaction
size_t processed_len = 0;
@@ -431,13 +435,13 @@ void ServiceIOImpl::HandleDataTransaction(uint64_t key,
header->payload_size) {
// we can safely read the data
cb->OnData(
state_ptr->uid, header->timestamp, descriptor->target_id,
payload + processed_len + sizeof(datatypes::DataDescriptor),
data_size);
state_ptr->uid, header->timestamp, descriptor->target_id,
payload + processed_len + sizeof(datatypes::DataDescriptor),
data_size);
} else {
spdlog::error(
"Error parsing transaction, header payload size does not "
"match transaction size!");
"Error parsing transaction, header payload size does not "
"match transaction size!");
break;
}
processed_len += data_size + sizeof(datatypes::DataDescriptor);
@@ -451,6 +455,7 @@ void ServiceIOImpl::HandleDataTransaction(uint64_t key,
}
}
}

void ServiceIOImpl::HandleHeartbeatMessage(uint64_t key,
xbot::datatypes::XbotHeader *header,
const uint8_t *payload,
@@ -463,9 +468,10 @@ void ServiceIOImpl::HandleHeartbeatMessage(uint64_t key,
endpoint_map_.at(key)->last_heartbeat_received_ =
std::chrono::steady_clock::now();
}

void ServiceIOImpl::HandleConfigurationRequest(
uint64_t key, xbot::datatypes::XbotHeader *header, const uint8_t *payload,
size_t payload_len) {
uint64_t key, xbot::datatypes::XbotHeader *header, const uint8_t *payload,
size_t payload_len) {
std::unique_lock lk{state_mutex_};
if (!endpoint_map_.contains(key)) {
spdlog::debug("got config request from wrong service");
@@ -481,8 +487,8 @@ void ServiceIOImpl::HandleConfigurationRequest(
// Notify callbacks for that service
bool configuration_handled = false;
if (const auto it = registered_callbacks_.find(ptr->uid);
it != registered_callbacks_.end()) {
for (const auto &cb : it->second) {
it != registered_callbacks_.end()) {
for (const auto &cb: it->second) {
if (cb->OnConfigurationRequested(ptr->uid)) {
configuration_handled = true;
break;
@@ -491,21 +497,24 @@ void ServiceIOImpl::HandleConfigurationRequest(
}
if (!configuration_handled) {
spdlog::warn(
"service {} requires configuration, but no handler provided any "
"configuration. "
"The service won't start.",
ptr->uid);
"service {} requires configuration, but no handler provided any "
"configuration. "
"The service won't start.",
ptr->uid);
}
}

ServiceIOImpl::ServiceIOImpl(ServiceDiscoveryImpl *serviceDiscovery)
: service_discovery(serviceDiscovery) {}
: service_discovery(serviceDiscovery) {
}

bool ServiceIOImpl::OK() {
std::unique_lock lk{stopped_mtx_};
return !stopped_;
}

bool ServiceIOImpl::Stop() {
spdlog::info("Shutting down ServiceIO");
{
spdlog::info("Shutting down ServiceIO"); {
std::unique_lock lk{stopped_mtx_};
stopped_ = true;
}
11 changes: 6 additions & 5 deletions libxbot-service-interface/src/XbotServiceInterface.cpp
Original file line number Diff line number Diff line change
@@ -68,13 +68,13 @@ xbot::serviceif::Context xbot::serviceif::Start(bool register_handlers, std::str
crow::SimpleApp &app = *crow_app;

CROW_ROUTE(app, "/")
([]() { return "OK"; });
([]() { return "OK"; });
CROW_ROUTE(app, "/services")
([sdImpl]() {
nlohmann::json result = nlohmann::detail::value_t::object;
const auto services = sdImpl->GetAllServices();

for (const auto &s : *services) {
for (const auto &s: *services) {
result[s.first] = s.second;
}
return result.dump(2);
@@ -85,9 +85,9 @@ xbot::serviceif::Context xbot::serviceif::Start(bool register_handlers, std::str
CROW_LOG_INFO << "New Websocket Connection";
})
.onclose(
[&](crow::websocket::connection &conn, const std::string &reason) {
CROW_LOG_INFO << "Closed Connection. Reason: " << reason;
})
[&](crow::websocket::connection &conn, const std::string &reason) {
CROW_LOG_INFO << "Closed Connection. Reason: " << reason;
})
.onmessage([&](crow::websocket::connection & /*conn*/,
const std::string &data, bool is_binary) {
CROW_LOG_INFO << "New Websocket Message: " << data;
@@ -105,6 +105,7 @@ xbot::serviceif::Context xbot::serviceif::Start(bool register_handlers, std::str
}
return ctx;
}

void xbot::serviceif::Stop() {
spdlog::info("Shutting Down");
std::unique_lock lk{mtx};

0 comments on commit a2c4330

Please sign in to comment.