Skip to content

Commit

Permalink
Finalize monitoring page with websockets streaming system info
Browse files Browse the repository at this point in the history
  • Loading branch information
dormant-user committed Sep 22, 2024
1 parent 576cba7 commit 8c2871c
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 93 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ rustdoc-args = ["--document-private-items"]
[dependencies]
actix-rt = "2.10.0"
actix-web = { version = "4.9.0", features = ["openssl"] }
actix-ws = "0.3.0"
actix-files = "0.6.6"
actix-cors = "0.7.0"
actix-multipart = "0.7.2"
Expand All @@ -49,5 +50,6 @@ Inflector = "0.11"
openssl = "0.10.64"
dotenv = "0.15.0"
futures-util = "0.3.30"
futures = "0.3.30"
[target.'cfg(target_os = "linux")'.dependencies]
openssl = { version = "0.10.64", features = ["vendored"] }
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub async fn start() -> io::Result<()> {
.service(routes::monitor::monitor)
.service(routes::auth::logout)
.service(routes::auth::error)
.configure(routes::configure_websocket)
};
let server = HttpServer::new(application)
.workers(config.workers)
Expand Down
12 changes: 8 additions & 4 deletions src/resources/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ pub fn get_sys_info() -> HashMap<&'static str, HashMap<&'static str, String>> {
let uptime = squire::util::convert_seconds(uptime_duration);

let total_memory = squire::util::size_converter(sys.total_memory()); // in bytes
let total_swap = squire::util::size_converter(sys.total_swap()); // in bytes
let total_storage = squire::util::size_converter(sys.disks().iter().map(|disk| disk.total_space()).sum::<u64>());

// Basic and Memory/Storage Info
Expand All @@ -32,11 +31,16 @@ pub fn get_sys_info() -> HashMap<&'static str, HashMap<&'static str, String>> {
("uptime", uptime),
("CPU_cores_raw", sys.cpus().len().to_string()
)]);
let mem_storage = HashMap::from_iter(vec![
let mut hash_vec = vec![
("memory", total_memory),
("swap", total_swap),
("storage", total_storage)
]);
];

let total_swap = sys.total_swap(); // in bytes
if total_swap != 0 {
hash_vec.push(("swap", squire::util::size_converter(total_swap)));
}
let mem_storage = HashMap::from_iter(hash_vec);
HashMap::from_iter(vec![
("basic", basic),
("mem_storage", mem_storage)
Expand Down
1 change: 1 addition & 0 deletions src/resources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ pub mod network;
pub mod processor;
pub mod disks;
pub mod system;
pub mod stream;
84 changes: 84 additions & 0 deletions src/resources/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use std::collections::HashMap;
use sysinfo::{CpuExt, System, SystemExt};


use serde_json;
use std::process::{Command, Stdio};

fn get_docker_stats() -> Result<Vec<serde_json::Value>, Box<dyn std::error::Error>> {
let process = Command::new("docker")
.args(&["stats", "--no-stream", "--format", "{{json .}}"])
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
let output = process.wait_with_output().unwrap();
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
eprintln!("Error: {}", stderr);
return Ok(vec![]);
}
let stdout = String::from_utf8_lossy(&output.stdout);
let stats: Vec<serde_json::Value> = stdout
.lines()
.filter_map(|line| serde_json::from_str(line).ok())
.collect();
Ok(stats)
}

fn get_cpu_percent() -> Vec<String> {
let mut system = System::new_all();
system.refresh_all();
let mut cpu_usage = Vec::new();
for core in system.cpus() {
cpu_usage.push(format!("{:.2}", core.cpu_usage()));
}
cpu_usage
}

fn get_system_metrics() -> HashMap<String, serde_json::Value> {
let mut system = System::new_all();
system.refresh_all();

let load_avg = system.load_average();

let mut hash_vec = vec![
(
"memory_info".to_string(),
serde_json::json!({
"total": system.total_memory(),
"used": system.used_memory(), // todo: wildly inaccurate (always 99%) on macOS
}),
),
(
"load_averages".to_string(),
serde_json::json!({
"m1": load_avg.one,
"m5": load_avg.five,
"m15": load_avg.fifteen,
}),
),
];

let total_swap = system.total_swap();
if total_swap != 0 {
hash_vec.push((
"swap_info".to_string(),
serde_json::json!({
"total": total_swap,
"used": system.used_swap(),
}),
));
}
let info = HashMap::from_iter(hash_vec);
info
}


pub fn system_resources() -> HashMap<String, serde_json::Value> {
let mut system_metrics = get_system_metrics();
let cpu_percent = get_cpu_percent();
let docker_stats = get_docker_stats().unwrap();
system_metrics.insert("cpu_usage".to_string(), serde_json::json!(cpu_percent));
system_metrics.insert("docker_stats".to_string(), serde_json::json!(docker_stats));
system_metrics
}
8 changes: 8 additions & 0 deletions src/routes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,11 @@ pub mod basics;
pub mod auth;
/// Module for `/monitor` entrypoint.
pub mod monitor;
/// Module for `/ws/system` entrypoint.
pub mod websocket;

use actix_web::web;

pub fn configure_websocket(cfg: &mut web::ServiceConfig) {
cfg.service(websocket::echo);
}
17 changes: 5 additions & 12 deletions src/routes/monitor.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use std::collections::HashMap;
use crate::{constant, routes, squire, resources};
use actix_web::cookie::{Cookie, SameSite};
use crate::{constant, resources, routes, squire};
use actix_web::http::StatusCode;
use actix_web::{web, HttpRequest, HttpResponse};
use fernet::Fernet;
use std::collections::HashMap;
use std::sync::Arc;

/// Handles the monitor endpoint and rendering the appropriate HTML page.
Expand All @@ -27,14 +26,13 @@ pub async fn monitor(request: HttpRequest,
metadata: web::Data<Arc<constant::MetaData>>,
config: web::Data<Arc<squire::settings::Config>>,
template: web::Data<Arc<minijinja::Environment<'static>>>) -> HttpResponse {
let monitor_template = template.get_template("monitor").unwrap();
let mut response = HttpResponse::build(StatusCode::OK);
response.content_type("text/html; charset=utf-8");

let auth_response = squire::authenticator::verify_token(&request, &config, &fernet, &session);
if !auth_response.ok {
return routes::auth::failed_auth(auth_response);
}
let monitor_template = template.get_template("monitor").unwrap();
let mut response = HttpResponse::build(StatusCode::OK);
response.content_type("text/html; charset=utf-8");
log::debug!("Session Validation Response: {}", auth_response.detail);

let sys_info_map = resources::info::get_sys_info();
Expand All @@ -58,10 +56,5 @@ pub async fn monitor(request: HttpRequest,
sys_info_network => sys_info_network,
sys_info_disks => sys_info_disks
)).unwrap();

let mut cookie = Cookie::new("session_token", "");
cookie.set_same_site(SameSite::Strict);
cookie.make_removal();
response.cookie(cookie);
response.body(rendered)
}
63 changes: 63 additions & 0 deletions src/routes/websocket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use crate::{constant, resources, routes, squire};
use actix_web::rt::time::sleep;
use actix_web::{rt, web, Error, HttpRequest, HttpResponse};
use actix_ws::AggregatedMessage;
use fernet::Fernet;
use std::sync::Arc;
use std::time::Duration;
use futures::future;
use futures::stream::StreamExt;


async fn send_system_resources(mut session: actix_ws::Session) {
loop {
let system_resources = resources::stream::system_resources();
let strigified = serde_json::to_string(&system_resources).unwrap();
session.text(strigified).await.unwrap();
sleep(Duration::from_secs(1)).await;
}
}

async fn receive_messages(
mut session: actix_ws::Session,
mut stream: impl futures::Stream<Item=Result<AggregatedMessage, actix_ws::ProtocolError>> + Unpin
) {
while let Some(msg) = stream.next().await {
match msg {
Ok(AggregatedMessage::Text(text)) => {
println!("Text: {:?}", &text);
session.text(text).await.unwrap();
}
_ => {}
}
}
}


#[route("/ws/system", method = "GET")]
async fn echo(
request: HttpRequest,
fernet: web::Data<Arc<Fernet>>,
session_info: web::Data<Arc<constant::Session>>,
config: web::Data<Arc<squire::settings::Config>>,
stream: web::Payload,
) -> Result<HttpResponse, Error> {
let auth_response = squire::authenticator::verify_token(&request, &config, &fernet, &session_info);
if !auth_response.ok {
return Ok(routes::auth::failed_auth(auth_response));
}
let (response, session, stream) = actix_ws::handle(&request, stream)?;
// todo: implement a session timeout here
let stream = stream
.aggregate_continuations()
// aggregate continuation frames up to 1MiB
// todo: check and remove limit if necessary
.max_continuation_size(2_usize.pow(20));
rt::spawn(async move {
let send_task = send_system_resources(session.clone());
let receive_task = receive_messages(session, stream);
future::join(send_task, receive_task).await;
});
// respond immediately with response connected to WS session
Ok(response)
}
Loading

0 comments on commit 8c2871c

Please sign in to comment.