Skip to content

Commit

Permalink
Fix a bug on docker stats blocking websocket
Browse files Browse the repository at this point in the history
Reduce redundancy on running shell commands
  • Loading branch information
vsivanandharao committed Sep 23, 2024
1 parent 122d237 commit b796440
Show file tree
Hide file tree
Showing 10 changed files with 143 additions and 106 deletions.
58 changes: 42 additions & 16 deletions src/resources/disks.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::collections::HashMap;
use std::process::Command;
use std::str;
use regex::Regex;
use serde_json::Value;
use crate::squire;

// todo: tested only on macOS

Expand All @@ -16,7 +16,7 @@ fn format_nos(input: f64) -> f64 {

fn size_converter(byte_size: f64) -> String {
let size_name = ["B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"];
let index = (byte_size.log(1024.0)).floor() as usize;
let index = byte_size.log(1024.0).floor() as usize;
format!("{:.2} {}", format_nos(byte_size / 1024.0_f64.powi(index as i32)), size_name[index])
}

Expand All @@ -42,17 +42,15 @@ fn parse_size(size_str: &str) -> String {
.replace("P", " PB")
}

fn run_command(command: &str, args: &[&str]) -> String {
let output = Command::new(command)
.args(args)
.output()
.expect("Failed to execute command");
String::from_utf8_lossy(&output.stdout).to_string()
}

fn is_physical_disk(lib_path: &str, device_id: &str) -> bool {
let output = run_command(lib_path, &["info", device_id]);
// !output.contains("Virtual:Yes")
let result = squire::util::run_command(lib_path, &["info", device_id]);
let output = match result {
Ok(output) => output,
Err(_) => {
log::error!("Failed to get disk info");
return false;
},
};
for line in output.split("\n") {
if line.contains("Virtual:") && line.contains("Yes") {
return false;
Expand All @@ -62,7 +60,14 @@ fn is_physical_disk(lib_path: &str, device_id: &str) -> bool {
}

fn linux_disks(lib_path: &str) -> Vec<HashMap<String, String>> {
let output = run_command(lib_path, &["-o", "NAME,SIZE,TYPE,MODEL", "-d"]);
let result = squire::util::run_command(lib_path, &["-o", "NAME,SIZE,TYPE,MODEL", "-d"]);
let output = match result {
Ok(output) => output,
Err(_) => {
log::error!("Failed to get disk info");
return Vec::new();
},
};
let disks: Vec<&str> = output.lines().collect();
let filtered_disks: Vec<&str> = disks.into_iter().filter(|&disk| !disk.contains("loop")).collect();
let keys_raw = filtered_disks[0].to_lowercase()
Expand All @@ -85,7 +90,14 @@ fn linux_disks(lib_path: &str) -> Vec<HashMap<String, String>> {
}

fn darwin_disks(lib_path: &str) -> Vec<HashMap<String, String>> {
let output = run_command(lib_path, &["list"]);
let result = squire::util::run_command(lib_path, &["list"]);
let output = match result {
Ok(output) => output,
Err(_) => {
log::error!("Failed to get disk info");
return Vec::new();
},
};
let disks: Vec<&str> = output.lines().collect();
let disk_lines: Vec<&str> = disks.into_iter().filter(|&line| line.starts_with("/dev/disk")).collect();
let mut disk_info = Vec::new();
Expand All @@ -94,7 +106,14 @@ fn darwin_disks(lib_path: &str) -> Vec<HashMap<String, String>> {
if !is_physical_disk(lib_path, device_id) {
continue;
}
let disk_info_output = run_command(lib_path, &["info", device_id]);
let result = squire::util::run_command(lib_path, &["info", device_id]);
let disk_info_output = match result {
Ok(output) => output,
Err(_) => {
log::error!("Failed to get disk info");
return Vec::new();
},
};
let info_lines: Vec<&str> = disk_info_output.lines().collect();
let mut disk_data = HashMap::new();
for info_line in info_lines {
Expand Down Expand Up @@ -124,7 +143,14 @@ fn reformat_windows(data: &mut HashMap<String, Value>) -> HashMap<String, String

fn windows_disks(lib_path: &str) -> Vec<HashMap<String, String>> {
let ps_command = "Get-CimInstance Win32_DiskDrive | Select-Object Caption, DeviceID, Model, Partitions, Size | ConvertTo-Json";
let output = run_command(lib_path, &["-Command", ps_command]);
let result = squire::util::run_command(lib_path, &["-Command", ps_command]);
let output = match result {
Ok(output) => output,
Err(_) => {
log::error!("Failed to get disk info");
return Vec::new();
},
};
let disks_info: Value = serde_json::from_str(&output).unwrap();
let mut disk_info = Vec::new();
if let Some(disks) = disks_info.as_array() {
Expand Down
3 changes: 2 additions & 1 deletion src/resources/info.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::HashMap;
use chrono::Utc;
use inflector::cases::titlecase::to_title_case;
use sysinfo::{DiskExt, System, SystemExt};
use crate::{squire, resources};

Expand All @@ -26,7 +27,7 @@ pub fn get_sys_info() -> HashMap<&'static str, HashMap<&'static str, String>> {
let os_arch = resources::system::os_arch();
let basic = HashMap::from_iter(vec![
("node", sys.host_name().unwrap_or("Unknown".to_string())),
("system", os_arch.name),
("system", to_title_case(&os_arch.name)),
("architecture", os_arch.architecture),
("uptime", uptime),
("CPU_cores_raw", sys.cpus().len().to_string()
Expand Down
32 changes: 11 additions & 21 deletions src/resources/processor.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
use crate::resources;
use crate::{resources, squire};
use std::fs::File;
use std::io::{self, BufRead};
use std::process::Command;

fn get_processor_info_darwin(lib_path: &str) -> Result<String, &'static str> {
let output = match Command::new(lib_path)
.arg("-n")
.arg("machdep.cpu.brand_string")
.output()
{
Ok(output) => output,
Err(_) => return Err("Failed to execute command"),
};
let result = String::from_utf8_lossy(&output.stdout).trim().to_string();
Ok(result)
let result = squire::util::run_command(lib_path, &["-n", "machdep.cpu.brand_string"]);
if result.is_err() {
return Err("Failed to get processor info");
}
Ok(result.unwrap())
}

fn get_processor_info_linux(lib_path: &str) -> Result<String, &'static str> {
let file = match File::open(lib_path) {
Ok(file) => file,
Expand All @@ -36,17 +31,12 @@ fn get_processor_info_linux(lib_path: &str) -> Result<String, &'static str> {
Err("Model name not found")
}
fn get_processor_info_windows(lib_path: &str) -> Result<String, &'static str> {
let output = match Command::new(lib_path)
.arg("cpu")
.arg("get")
.arg("name")
.output()
{
let result = squire::util::run_command(lib_path, &["cpu", "get", "name"]);
let output = match result {
Ok(output) => output,
Err(_) => return Err("Failed to execute command"),
Err(_) => return Err("Failed to get processor info"),
};
let output_str = String::from_utf8_lossy(&output.stdout);
let lines: Vec<&str> = output_str.trim().split('\n').collect();
let lines: Vec<&str> = output.trim().split('\n').collect();
if lines.len() > 1 {
Ok(lines[1].trim().to_string())
} else {
Expand Down
43 changes: 25 additions & 18 deletions src/resources/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,41 @@ use sysinfo::{CpuExt, System, SystemExt};


use serde_json;
use std::process::{Command, Stdio};
use crate::squire;

/// Function to get docker stats via commandline.
///
/// # Returns
///
/// A `Result` containing a `Vec` of `serde_json::Value` if successful, otherwise an empty `Vec`.
fn get_docker_stats() -> Result<Vec<serde_json::Value>, Box<dyn std::error::Error>> {
let process = match Command::new("docker")
.args(["stats", "--no-stream", "--format", "{{json .}}"])
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
{
Ok(process) => process,
Err(_) => {
log::error!("Failed to execute command");
// Check if there are any docker containers running
// `docker -a` will show all containers including stopped, which will block `docker stats`
let ps_result = squire::util::run_command("docker", &["ps", "-q"]);
let stats_result = match ps_result {
Ok(output) if !output.is_empty() => {
let stats_result = squire::util::run_command(
"docker",
&["stats", "--no-stream", "--format", "{{json .}}"]
);
match stats_result {
Ok(stats) => stats,
Err(err) => {
log::error!("Error running docker stats: {}", err);
return Ok(vec![]);
},
}
}
Ok(_) => {
log::debug!("No running containers");
return Ok(vec![]);
},
Err(err) => {
log::error!("Error checking containers: {}", err);
return Ok(vec![]);
},
};
let output = process.wait_with_output().unwrap();
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
log::error!("Error: {}", stderr);
return Ok(vec![]);
}
let stdout = String::from_utf8_lossy(&output.stdout);
let stats: Vec<serde_json::Value> = stdout
let stats: Vec<serde_json::Value> = stats_result
.lines()
.filter_map(|line| serde_json::from_str(line).ok())
.collect();
Expand Down
39 changes: 11 additions & 28 deletions src/resources/system.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::process::Command;
use std::str;
use serde::{Deserialize, Serialize};
use crate::squire;

#[derive(Deserialize, Serialize, Debug)]
pub struct OperatingSystem {
Expand All @@ -10,43 +10,26 @@ pub struct OperatingSystem {

fn unamem() -> String {
// Get architecture using `uname -m` with fallback
let uname_m_output = match Command::new("uname")
.arg("-m")
.output() {
Ok(output) => output,
let result = squire::util::run_command("uname", &["-m"]);
match result {
Ok(output) => output.to_lowercase(),
Err(_) => {
log::error!("Failed to read uname output");
return "".to_string();
},
};
let unamem = match str::from_utf8(&uname_m_output.stdout) {
Ok(output) => output.trim().to_string(),
Err(_) => {
log::error!("Failed to read uname output");
log::error!("Failed to execute command");
"".to_string()
},
};
unamem.to_string()
}
}

fn unameu() -> String {
// Get OS using `uname`
let uname_output = match Command::new("uname")
.output() {
Ok(output) => output,
let result = squire::util::run_command("uname", &[]);
match result {
Ok(output) => output.to_uppercase(),
Err(_) => {
log::error!("Failed to read uname output");
return std::env::consts::OS.to_uppercase();
},
};
let unameu = match str::from_utf8(&uname_output.stdout) {
Ok(output) => output.trim().to_string(),
Err(_) => {
log::error!("Failed to read uname output");
log::error!("Failed to execute command");
std::env::consts::OS.to_uppercase()
},
};
unameu.to_uppercase()
}
}

pub fn os_arch() -> OperatingSystem {
Expand Down
9 changes: 4 additions & 5 deletions src/routes/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async fn send_system_resources(mut session: actix_ws::Session) {
}
}
// 500ms / 0.5s delay
sleep(Duration::from_millis(500)).await;
sleep(Duration::from_secs(1)).await;
}
}

Expand Down Expand Up @@ -91,6 +91,7 @@ async fn echo(
config: web::Data<Arc<squire::settings::Config>>,
stream: web::Payload,
) -> Result<HttpResponse, Error> {
log::info!("Websocket connection initiated");
let auth_response = squire::authenticator::verify_token(&request, &config, &fernet, &session_info);
if !auth_response.ok {
return Ok(routes::auth::failed_auth(auth_response));
Expand All @@ -103,11 +104,9 @@ async fn echo(
};
// todo: implement a session timeout here
let stream = stream
.aggregate_continuations()
// aggregate continuation frames up to 1MiB
// todo: check and remove limit if necessary
.max_continuation_size(2_usize.pow(20));
.aggregate_continuations();
rt::spawn(async move {
log::warn!("Connection established");
let send_task = send_system_resources(session.clone());
let receive_task = receive_messages(session, stream);
future::join(send_task, receive_task).await;
Expand Down
2 changes: 1 addition & 1 deletion src/squire/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub struct Config {
pub fn default_debug() -> bool { false }

/// Returns the default value for UTC logging.
pub fn default_utc_logging() -> bool { true }
pub fn default_utc_logging() -> bool { false }

/// Returns the default host based on the local machine's IP address.
pub fn default_host() -> String {
Expand Down
36 changes: 21 additions & 15 deletions src/squire/startup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use std::io::Write;

use chrono::{DateTime, Local};

use crate::{constant, squire};
use crate::squire::settings;
use crate::{constant, squire};

/// Initializes the logger based on the provided debug flag and cargo information.
///
Expand All @@ -25,23 +25,29 @@ pub fn init_logger(debug: bool, utc: bool, crate_name: &String) {
));
std::env::set_var("RUST_BACKTRACE", "0");
}
let timestamp;
if utc {
env_logger::init();
timestamp = DateTime::<chrono::Utc>::from(Local::now())
.format("%Y-%m-%dT%H:%M:%SZ")
.to_string();
} else {
env_logger::Builder::from_default_env()
.format(|buf, record| {
let local_time: DateTime<Local> = Local::now();
writeln!(
buf,
"[{} {} {}] - {}",
local_time.format("%Y-%m-%dT%H:%M:%SZ"),
record.level(),
record.target(),
record.args()
)
})
.init();
timestamp = Local::now()
.format("%Y-%m-%dT%H:%M:%SZ")
.to_string();
}
env_logger::Builder::from_default_env()
.format(move |buf, record| {
writeln!(
buf,
"[{} {} {}:{}] - {}",
timestamp,
record.level(),
record.target(),
record.line().unwrap_or(0),
record.args()
)
})
.init();
}

/// Extracts the mandatory env vars by key and parses it as `HashMap<String, String>` and `PathBuf`
Expand Down
Loading

0 comments on commit b796440

Please sign in to comment.