diff --git a/Cargo.lock b/Cargo.lock index fe338e8..6195c10 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1801,9 +1801,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.10.3" +version = "1.10.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15" +checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c" dependencies = [ "aho-corasick", "memchr", @@ -1860,6 +1860,7 @@ dependencies = [ "opentelemetry", "opentelemetry-otlp", "prometheus", + "regex", "schemars", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index c908bf4..1b6a988 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,6 +59,7 @@ tonic = { version = "0.9", optional = true } thiserror = "1.0.47" anyhow = "1.0.75" clap = { version = "4.1", features = ["derive", "env"] } +regex = "1.10.4" [dev-dependencies] assert-json-diff = "2.0.2" diff --git a/src/reconcilers/compute.rs b/src/reconcilers/compute.rs index cf4930b..0547105 100644 --- a/src/reconcilers/compute.rs +++ b/src/reconcilers/compute.rs @@ -29,6 +29,8 @@ use crate::securitygrouppolicies::{ }; use crate::{Context, Error, RestateClusterCompute, RestateClusterSpec, RestateClusterStorage}; +use super::quantity_parser::QuantityParser; + fn restate_service_account( base_metadata: &ObjectMeta, annotations: Option<&BTreeMap>, @@ -717,16 +719,20 @@ async fn resize_statefulset_storage( None => return Ok(()), }; - let existing_resources = existing + let existing_storage_request = existing .spec .as_ref() .and_then(|spec| spec.volume_claim_templates.as_ref()) .and_then(|templates| templates.first()) .and_then(|storage| storage.spec.as_ref()) - .and_then(|spec| spec.resources.as_ref()); - - if existing_resources == resources.as_ref() { - return Ok(()); // nothing to do + .and_then(|spec| spec.resources.as_ref()) + .and_then(|resources| resources.requests.as_ref()) + .and_then(|requests| requests.get("storage").map(|storage| storage.to_bytes())); + + match existing_storage_request { + // check if we can interpret the statefulset as having the same storage request + Some(Ok(Some(bytes))) if bytes == storage.storage_request_bytes => return Ok(()), + _ => {} } // expansion case - we would have failed when updating the pvcs if this was a contraction diff --git a/src/reconcilers/mod.rs b/src/reconcilers/mod.rs index be88be0..9c78548 100644 --- a/src/reconcilers/mod.rs +++ b/src/reconcilers/mod.rs @@ -4,6 +4,7 @@ use k8s_openapi::apimachinery::pkg::apis::meta::v1::{LabelSelector, ObjectMeta}; pub mod compute; pub mod network_policies; +mod quantity_parser; pub mod signing_key; // resource_labels returns labels to apply to all created resources on top of the RestateCluster labels diff --git a/src/reconcilers/quantity_parser.rs b/src/reconcilers/quantity_parser.rs new file mode 100644 index 0000000..9ae3664 --- /dev/null +++ b/src/reconcilers/quantity_parser.rs @@ -0,0 +1,267 @@ +use k8s_openapi::apimachinery::pkg::api::resource::Quantity; +use regex::Regex; +use std::{num::ParseIntError, sync::OnceLock}; + +/// Adapted from https://github.com/sombralibre/k8s-quantity-parser to resolve dependency conflict +/// MIT licensed, Copyright (c) 2022 Alejandro Llanes + +#[allow(non_camel_case_types)] +enum QuantityMemoryUnits { + Ki, + Mi, + Gi, + Ti, + Pi, + Ei, + k, + M, + G, + T, + P, + E, + m, + Invalid, +} + +impl QuantityMemoryUnits { + fn new(unit: &str) -> Self { + match unit { + "Ki" => Self::Ki, + "Mi" => Self::Mi, + "Gi" => Self::Gi, + "Ti" => Self::Ti, + "Pi" => Self::Pi, + "Ei" => Self::Ei, + "k" => Self::k, + "M" => Self::M, + "G" => Self::G, + "T" => Self::T, + "P" => Self::P, + "E" => Self::E, + "m" => Self::m, + _ => Self::Invalid, + } + } +} + +/// This trait works as a parser for the values retrieved from BTreeMap collections +/// in `k8s_openapi::api::core::v1::Pod` and `k8s_openapi::api::core::v1::Node` +/// +/// # Errors +/// The parser will fails if encounters an invalid unit letters or failed to parse String to i64 + +pub trait QuantityParser { + /// This method will parse the cpu resource values returned by Kubernetes Api + /// + /// ```rust + /// # use k8s_openapi::apimachinery::pkg::api::resource::Quantity; + /// # use k8s_quantity_parser::QuantityParser; + /// # + /// let cpu = Quantity("4".into()); + /// let ret: i64 = 4000; + /// assert_eq!(cpu.to_milli_cpus().ok().flatten().unwrap(), ret) + /// ``` + /// + /// # Errors + /// + /// The parser will fails if encounters an invalid unit letters or failed to parse String to i64 + /// + fn to_milli_cpus(&self) -> Result, ParseError>; + /// This method will parse the memory resource values returned by Kubernetes Api + /// + /// ```rust + /// # use k8s_openapi::apimachinery::pkg::api::resource::Quantity; + /// # use k8s_quantity_parser::QuantityParser; + /// # + /// let mib = Quantity("1Mi".into()); + /// let ret: i64 = 1048576; + /// assert_eq!(mib.to_bytes().ok().flatten().unwrap(), ret); + /// ``` + /// + /// # Errors + /// + /// The parser will fails if encounters an invalid unit letters or failed to parse String to i64 + /// + fn to_bytes(&self) -> Result, ParseError>; +} + +#[derive(Debug, thiserror::Error)] +pub enum ParseError { + #[error(transparent)] + ParseIntError(#[from] ParseIntError), + #[error("Invalid memory unit")] + InvalidMemoryUnit, +} + +impl QuantityParser for Quantity { + fn to_milli_cpus(&self) -> Result, ParseError> { + let unit_str = &self.0; + static REGEX: OnceLock = OnceLock::new(); + let cap = REGEX + .get_or_init(|| Regex::new(r"([m]{1}$)").unwrap()) + .captures(unit_str); + if cap.is_none() { + return Ok(Some(unit_str.parse::()? * 1000)); + }; + let mt = cap.unwrap().get(0).unwrap(); + let unit_str = unit_str.replace(mt.as_str(), ""); + Ok(Some(unit_str.parse::()?)) + } + + fn to_bytes(&self) -> Result, ParseError> { + let unit_str = &self.0; + static REGEX: OnceLock = OnceLock::new(); + let cap = REGEX + .get_or_init(|| Regex::new(r"([[:alpha:]]{1,2}$)").unwrap()) + .captures(unit_str); + + if cap.is_none() { + return Ok(Some(unit_str.parse::()?)); + }; + + // Is safe to use unwrap here, as the value is already checked. + match cap.unwrap().get(0) { + Some(m) => match QuantityMemoryUnits::new(m.as_str()) { + QuantityMemoryUnits::Ki => { + let unit_str = unit_str.replace(m.as_str(), ""); + let amount = unit_str.parse::()?; + Ok(Some(amount * 1024)) + } + QuantityMemoryUnits::Mi => { + let unit_str = unit_str.replace(m.as_str(), ""); + let amount = unit_str.parse::()?; + Ok(Some((amount * 1024) * 1024)) + } + QuantityMemoryUnits::Gi => { + let unit_str = unit_str.replace(m.as_str(), ""); + let amount = unit_str.parse::()?; + Ok(Some(((amount * 1024) * 1024) * 1024)) + } + QuantityMemoryUnits::Ti => { + let unit_str = unit_str.replace(m.as_str(), ""); + let amount = unit_str.parse::()?; + Ok(Some((((amount * 1024) * 1024) * 1024) * 1024)) + } + QuantityMemoryUnits::Pi => { + let unit_str = unit_str.replace(m.as_str(), ""); + let amount = unit_str.parse::()?; + Ok(Some(((((amount * 1024) * 1024) * 1024) * 1024) * 1024)) + } + QuantityMemoryUnits::Ei => { + let unit_str = unit_str.replace(m.as_str(), ""); + let amount = unit_str.parse::()?; + Ok(Some( + (((((amount * 1024) * 1024) * 1024) * 1024) * 1024) * 1024, + )) + } + QuantityMemoryUnits::k => { + let unit_str = unit_str.replace(m.as_str(), ""); + let amount = unit_str.parse::()?; + Ok(Some(amount * 1000)) + } + QuantityMemoryUnits::M => { + let unit_str = unit_str.replace(m.as_str(), ""); + let amount = unit_str.parse::()?; + Ok(Some((amount * 1000) * 1000)) + } + QuantityMemoryUnits::G => { + let unit_str = unit_str.replace(m.as_str(), ""); + let amount = unit_str.parse::()?; + Ok(Some(((amount * 1000) * 1000) * 1000)) + } + QuantityMemoryUnits::T => { + let unit_str = unit_str.replace(m.as_str(), ""); + let amount = unit_str.parse::()?; + Ok(Some((((amount * 1000) * 1000) * 1000) * 1000)) + } + QuantityMemoryUnits::P => { + let unit_str = unit_str.replace(m.as_str(), ""); + let amount = unit_str.parse::()?; + Ok(Some(((((amount * 1000) * 1000) * 1000) * 1000) * 1000)) + } + QuantityMemoryUnits::E => { + let unit_str = unit_str.replace(m.as_str(), ""); + let amount = unit_str.parse::()?; + Ok(Some( + (((((amount * 1000) * 1000) * 1000) * 1000) * 1000) * 1000, + )) + } + QuantityMemoryUnits::m => { + let unit_str = unit_str.replace(m.as_str(), ""); + let amount = unit_str.parse::()?; + Ok(Some(amount / 1000)) + } + QuantityMemoryUnits::Invalid => Err(ParseError::InvalidMemoryUnit), + }, + None => Ok(None), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn to_bytes_works() { + assert!(Quantity("12345".into()).to_bytes().is_ok()) + } + + #[test] + fn to_bytes_is_some() { + assert!(Quantity("12345".into()).to_bytes().unwrap().is_some()) + } + + #[test] + fn to_milli_cpus_works() { + assert!(Quantity("12345m".into()).to_milli_cpus().is_ok()) + } + + #[test] + fn to_milli_cpus_is_some() { + assert!(Quantity("12345m".into()).to_milli_cpus().unwrap().is_some()) + } + + #[test] + fn invalid_unit_fails() { + assert!(Quantity("12345r".into()).to_bytes().is_err()) + } + + #[test] + fn parse_i64_fails() { + assert!(Quantity("123.123".into()).to_bytes().is_err()) + } + + #[test] + fn is_none_value() { + assert!(Quantity("0Mi".into()).to_bytes().unwrap().is_some()) + } + + #[test] + fn pow2_mb_to_bytes() { + let mib = Quantity("1Mi".into()); + let ret: i64 = 1048576; + assert_eq!(mib.to_bytes().ok().flatten().unwrap(), ret); + } + + #[test] + fn pow10_gb_to_bytes() { + let mib = Quantity("1G".into()); + let ret: i64 = 1000000000; + assert_eq!(mib.to_bytes().ok().flatten().unwrap(), ret); + } + + #[test] + fn cpu_units_value_to_millis() { + let cpu = Quantity("1536m".into()); + let ret: i64 = 1536; + assert_eq!(cpu.to_milli_cpus().ok().flatten().unwrap(), ret) + } + + #[test] + fn cpu_cores_value_to_millis() { + let cpu = Quantity("4".into()); + let ret: i64 = 4000; + assert_eq!(cpu.to_milli_cpus().ok().flatten().unwrap(), ret) + } +}