Skip to content

Commit

Permalink
Fix inconsistent memory information in the UI
Browse files Browse the repository at this point in the history
Add docstrings and remove blind unwraps
  • Loading branch information
dormant-user committed Sep 22, 2024
1 parent 8c2871c commit 122d237
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 19 deletions.
41 changes: 34 additions & 7 deletions src/resources/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,28 @@ use sysinfo::{CpuExt, System, SystemExt};
use serde_json;
use std::process::{Command, Stdio};

/// Function to get docker stats via commandline.
///
/// # Returns
///
/// A `Result` containing a `Vec` of `serde_json::Value` if successful, otherwise an empty `Vec`.
fn get_docker_stats() -> Result<Vec<serde_json::Value>, Box<dyn std::error::Error>> {
let process = Command::new("docker")
.args(&["stats", "--no-stream", "--format", "{{json .}}"])
let process = match Command::new("docker")
.args(["stats", "--no-stream", "--format", "{{json .}}"])
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
.spawn()
{
Ok(process) => process,
Err(_) => {
log::error!("Failed to execute command");
return Ok(vec![]);
},
};
let output = process.wait_with_output().unwrap();
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
eprintln!("Error: {}", stderr);
log::error!("Error: {}", stderr);
return Ok(vec![]);
}
let stdout = String::from_utf8_lossy(&output.stdout);
Expand All @@ -25,6 +37,11 @@ fn get_docker_stats() -> Result<Vec<serde_json::Value>, Box<dyn std::error::Erro
Ok(stats)
}

/// Function to get CPU usage percentage.
///
/// # Returns
///
/// A `Vec` containing the CPU usage percentage of each core.
fn get_cpu_percent() -> Vec<String> {
let mut system = System::new_all();
system.refresh_all();
Expand All @@ -35,18 +52,24 @@ fn get_cpu_percent() -> Vec<String> {
cpu_usage
}

/// Function to get system metrics.
///
/// # Returns
///
/// A `HashMap` containing the system metrics with CPU load average, memory and swap usage.
fn get_system_metrics() -> HashMap<String, serde_json::Value> {
let mut system = System::new_all();
system.refresh_all();

let load_avg = system.load_average();

// used_memory uses "mem_total - mem_free" but memory is set to available instead of free in macOS
let mut hash_vec = vec![
(
"memory_info".to_string(),
serde_json::json!({
"total": system.total_memory(),
"used": system.used_memory(), // todo: wildly inaccurate (always 99%) on macOS
"used": system.total_memory() - system.available_memory(),
}),
),
(
Expand All @@ -69,11 +92,15 @@ fn get_system_metrics() -> HashMap<String, serde_json::Value> {
}),
));
}
let info = HashMap::from_iter(hash_vec);
info
HashMap::from_iter(hash_vec)
}


/// Function to get the system information.
///
/// # Returns
///
/// A `HashMap` containing the system information with basic system information and memory/storage information.
pub fn system_resources() -> HashMap<String, serde_json::Value> {
let mut system_metrics = get_system_metrics();
let cpu_percent = get_cpu_percent();
Expand Down
2 changes: 0 additions & 2 deletions src/routes/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ pub async fn login(request: HttpRequest,
}

let mapped = verified.unwrap();
squire::custom::log_connection(&request);

let payload = serde_json::to_string(&mapped).unwrap();
let encrypted_payload = fernet.encrypt(payload.as_bytes());

Expand Down
72 changes: 63 additions & 9 deletions src/routes/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,86 @@ use actix_web::rt::time::sleep;
use actix_web::{rt, web, Error, HttpRequest, HttpResponse};
use actix_ws::AggregatedMessage;
use fernet::Fernet;
use std::sync::Arc;
use std::time::Duration;
use futures::future;
use futures::stream::StreamExt;
use std::sync::Arc;
use std::time::Duration;


/// Streams system resources via websocket through a loop.
///
/// # Arguments
///
/// * `request` - A reference to the Actix web `HttpRequest` object.
async fn send_system_resources(mut session: actix_ws::Session) {
loop {
let system_resources = resources::stream::system_resources();
let strigified = serde_json::to_string(&system_resources).unwrap();
session.text(strigified).await.unwrap();
sleep(Duration::from_secs(1)).await;
let serialized = serde_json::to_string(&system_resources).unwrap();
match session.text(serialized).await {
Ok(_) => (),
Err(err) => {
log::warn!("Connection {} by the client!", err);
break;
}
}
// 500ms / 0.5s delay
sleep(Duration::from_millis(500)).await;
}
}

/// Receives messages from the client and sends them back.
///
/// # Summary
///
/// Handles text, binary, and ping messages from the client.
///
/// # References
///
/// * [AggregatedMessage](https://docs.rs/actix-web/4.0.0-beta.8/actix_web/websocket/struct.AggregatedMessage.html)
/// * [ProtocolError](https://docs.rs/actix-web/4.0.0-beta.8/actix_web/websocket/enum.ProtocolError.html)
/// * [Session](https://docs.rs/actix-web/4.0.0-beta.8/actix_web/websocket/struct.Session.html)
/// * [Stream](https://docs.rs/futures/0.3.17/futures/stream/trait.Stream.html)
/// * [Unpin](https://doc.rust-lang.org/std/marker/trait.Unpin.html)
///
/// # Arguments
///
/// * `session` - A reference to the Actix web `Session` object.
/// * `stream` - A stream of `AggregatedMessage` objects.
async fn receive_messages(
mut session: actix_ws::Session,
mut stream: impl futures::Stream<Item=Result<AggregatedMessage, actix_ws::ProtocolError>> + Unpin
) {
while let Some(msg) = stream.next().await {
match msg {
Ok(AggregatedMessage::Text(text)) => {
println!("Text: {:?}", &text);
// echo text message
session.text(text).await.unwrap();
}
Ok(AggregatedMessage::Binary(bin)) => {
// echo binary message
session.binary(bin).await.unwrap();
}
Ok(AggregatedMessage::Ping(msg)) => {
// respond to PING frame with PONG frame
session.pong(&msg).await.unwrap();
}
_ => {}
}
}
}


/// Handles the WebSocket endpoint for system resources.
///
/// # Arguments
///
/// * `request` - A reference to the Actix web `HttpRequest` object.
/// * `fernet` - Fernet object to encrypt the auth payload that will be set as `session_token` cookie.
/// * `session_info` - Session struct that holds the `session_mapping` to handle sessions.
/// * `config` - Configuration data for the application.
/// * `stream` - A stream of `Payload` objects.
///
/// # Returns
///
/// Returns an `HttpResponse` with the appropriate status code.
#[route("/ws/system", method = "GET")]
async fn echo(
request: HttpRequest,
Expand All @@ -46,7 +95,12 @@ async fn echo(
if !auth_response.ok {
return Ok(routes::auth::failed_auth(auth_response));
}
let (response, session, stream) = actix_ws::handle(&request, stream)?;
let (response, session, stream) = match actix_ws::handle(&request, stream) {
Ok(result) => result,
Err(_) => {
return Ok(HttpResponse::ServiceUnavailable().finish());
},
};
// todo: implement a session timeout here
let stream = stream
.aggregate_continuations()
Expand Down
2 changes: 1 addition & 1 deletion src/templates/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ pub fn get_content() -> String {
.center-container {
width: 100%;
margin-left: 45%;
margin-left: 40%;
}
.center-container details {
Expand Down

0 comments on commit 122d237

Please sign in to comment.