Skip to content

Commit

Permalink
Fix quantity comparisons for storage scale operations
Browse files Browse the repository at this point in the history
  • Loading branch information
jackkleeman committed Apr 30, 2024
1 parent 1dee95c commit b281fcc
Show file tree
Hide file tree
Showing 5 changed files with 283 additions and 7 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ tonic = { version = "0.9", optional = true }
thiserror = "1.0.47"
anyhow = "1.0.75"
clap = { version = "4.1", features = ["derive", "env"] }
regex = "1.10.4"

[dev-dependencies]
assert-json-diff = "2.0.2"
Expand Down
16 changes: 11 additions & 5 deletions src/reconcilers/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ use crate::securitygrouppolicies::{
};
use crate::{Context, Error, RestateClusterCompute, RestateClusterSpec, RestateClusterStorage};

use super::quantity_parser::QuantityParser;

fn restate_service_account(
base_metadata: &ObjectMeta,
annotations: Option<&BTreeMap<String, String>>,
Expand Down Expand Up @@ -717,16 +719,20 @@ async fn resize_statefulset_storage(
None => return Ok(()),
};

let existing_resources = existing
let existing_storage_request = existing
.spec
.as_ref()
.and_then(|spec| spec.volume_claim_templates.as_ref())
.and_then(|templates| templates.first())
.and_then(|storage| storage.spec.as_ref())
.and_then(|spec| spec.resources.as_ref());

if existing_resources == resources.as_ref() {
return Ok(()); // nothing to do
.and_then(|spec| spec.resources.as_ref())
.and_then(|resources| resources.requests.as_ref())
.and_then(|requests| requests.get("storage").map(|storage| storage.to_bytes()));

match existing_storage_request {
// check if we can interpret the statefulset as having the same storage request
Some(Ok(Some(bytes))) if bytes == storage.storage_request_bytes => return Ok(()),
_ => {}
}

// expansion case - we would have failed when updating the pvcs if this was a contraction
Expand Down
1 change: 1 addition & 0 deletions src/reconcilers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use k8s_openapi::apimachinery::pkg::apis::meta::v1::{LabelSelector, ObjectMeta};

pub mod compute;
pub mod network_policies;
mod quantity_parser;
pub mod signing_key;

// resource_labels returns labels to apply to all created resources on top of the RestateCluster labels
Expand Down
267 changes: 267 additions & 0 deletions src/reconcilers/quantity_parser.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
use regex::Regex;
use std::{num::ParseIntError, sync::OnceLock};

/// Adapted from https://github.com/sombralibre/k8s-quantity-parser to resolve dependency conflict
/// MIT licensed, Copyright (c) 2022 Alejandro Llanes

#[allow(non_camel_case_types)]
enum QuantityMemoryUnits {
Ki,
Mi,
Gi,
Ti,
Pi,
Ei,
k,
M,
G,
T,
P,
E,
m,
Invalid,
}

impl QuantityMemoryUnits {
fn new(unit: &str) -> Self {
match unit {
"Ki" => Self::Ki,
"Mi" => Self::Mi,
"Gi" => Self::Gi,
"Ti" => Self::Ti,
"Pi" => Self::Pi,
"Ei" => Self::Ei,
"k" => Self::k,
"M" => Self::M,
"G" => Self::G,
"T" => Self::T,
"P" => Self::P,
"E" => Self::E,
"m" => Self::m,
_ => Self::Invalid,
}
}
}

/// This trait works as a parser for the values retrieved from BTreeMap<String, Quantity> collections
/// in `k8s_openapi::api::core::v1::Pod` and `k8s_openapi::api::core::v1::Node`
///
/// # Errors
/// The parser will fails if encounters an invalid unit letters or failed to parse String to i64

pub trait QuantityParser {
/// This method will parse the cpu resource values returned by Kubernetes Api
///
/// ```rust
/// # use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
/// # use k8s_quantity_parser::QuantityParser;
/// #
/// let cpu = Quantity("4".into());
/// let ret: i64 = 4000;
/// assert_eq!(cpu.to_milli_cpus().ok().flatten().unwrap(), ret)
/// ```
///
/// # Errors
///
/// The parser will fails if encounters an invalid unit letters or failed to parse String to i64
///
fn to_milli_cpus(&self) -> Result<Option<i64>, ParseError>;
/// This method will parse the memory resource values returned by Kubernetes Api
///
/// ```rust
/// # use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
/// # use k8s_quantity_parser::QuantityParser;
/// #
/// let mib = Quantity("1Mi".into());
/// let ret: i64 = 1048576;
/// assert_eq!(mib.to_bytes().ok().flatten().unwrap(), ret);
/// ```
///
/// # Errors
///
/// The parser will fails if encounters an invalid unit letters or failed to parse String to i64
///
fn to_bytes(&self) -> Result<Option<i64>, ParseError>;
}

#[derive(Debug, thiserror::Error)]
pub enum ParseError {
#[error(transparent)]
ParseIntError(#[from] ParseIntError),
#[error("Invalid memory unit")]
InvalidMemoryUnit,
}

impl QuantityParser for Quantity {
fn to_milli_cpus(&self) -> Result<Option<i64>, ParseError> {
let unit_str = &self.0;
static REGEX: OnceLock<Regex> = OnceLock::new();
let cap = REGEX
.get_or_init(|| Regex::new(r"([m]{1}$)").unwrap())
.captures(unit_str);
if cap.is_none() {
return Ok(Some(unit_str.parse::<i64>()? * 1000));
};
let mt = cap.unwrap().get(0).unwrap();
let unit_str = unit_str.replace(mt.as_str(), "");
Ok(Some(unit_str.parse::<i64>()?))
}

fn to_bytes(&self) -> Result<Option<i64>, ParseError> {
let unit_str = &self.0;
static REGEX: OnceLock<Regex> = OnceLock::new();
let cap = REGEX
.get_or_init(|| Regex::new(r"([[:alpha:]]{1,2}$)").unwrap())
.captures(unit_str);

if cap.is_none() {
return Ok(Some(unit_str.parse::<i64>()?));
};

// Is safe to use unwrap here, as the value is already checked.
match cap.unwrap().get(0) {
Some(m) => match QuantityMemoryUnits::new(m.as_str()) {
QuantityMemoryUnits::Ki => {
let unit_str = unit_str.replace(m.as_str(), "");
let amount = unit_str.parse::<i64>()?;
Ok(Some(amount * 1024))
}
QuantityMemoryUnits::Mi => {
let unit_str = unit_str.replace(m.as_str(), "");
let amount = unit_str.parse::<i64>()?;
Ok(Some((amount * 1024) * 1024))
}
QuantityMemoryUnits::Gi => {
let unit_str = unit_str.replace(m.as_str(), "");
let amount = unit_str.parse::<i64>()?;
Ok(Some(((amount * 1024) * 1024) * 1024))
}
QuantityMemoryUnits::Ti => {
let unit_str = unit_str.replace(m.as_str(), "");
let amount = unit_str.parse::<i64>()?;
Ok(Some((((amount * 1024) * 1024) * 1024) * 1024))
}
QuantityMemoryUnits::Pi => {
let unit_str = unit_str.replace(m.as_str(), "");
let amount = unit_str.parse::<i64>()?;
Ok(Some(((((amount * 1024) * 1024) * 1024) * 1024) * 1024))
}
QuantityMemoryUnits::Ei => {
let unit_str = unit_str.replace(m.as_str(), "");
let amount = unit_str.parse::<i64>()?;
Ok(Some(
(((((amount * 1024) * 1024) * 1024) * 1024) * 1024) * 1024,
))
}
QuantityMemoryUnits::k => {
let unit_str = unit_str.replace(m.as_str(), "");
let amount = unit_str.parse::<i64>()?;
Ok(Some(amount * 1000))
}
QuantityMemoryUnits::M => {
let unit_str = unit_str.replace(m.as_str(), "");
let amount = unit_str.parse::<i64>()?;
Ok(Some((amount * 1000) * 1000))
}
QuantityMemoryUnits::G => {
let unit_str = unit_str.replace(m.as_str(), "");
let amount = unit_str.parse::<i64>()?;
Ok(Some(((amount * 1000) * 1000) * 1000))
}
QuantityMemoryUnits::T => {
let unit_str = unit_str.replace(m.as_str(), "");
let amount = unit_str.parse::<i64>()?;
Ok(Some((((amount * 1000) * 1000) * 1000) * 1000))
}
QuantityMemoryUnits::P => {
let unit_str = unit_str.replace(m.as_str(), "");
let amount = unit_str.parse::<i64>()?;
Ok(Some(((((amount * 1000) * 1000) * 1000) * 1000) * 1000))
}
QuantityMemoryUnits::E => {
let unit_str = unit_str.replace(m.as_str(), "");
let amount = unit_str.parse::<i64>()?;
Ok(Some(
(((((amount * 1000) * 1000) * 1000) * 1000) * 1000) * 1000,
))
}
QuantityMemoryUnits::m => {
let unit_str = unit_str.replace(m.as_str(), "");
let amount = unit_str.parse::<i64>()?;
Ok(Some(amount / 1000))
}
QuantityMemoryUnits::Invalid => Err(ParseError::InvalidMemoryUnit),
},
None => Ok(None),
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn to_bytes_works() {
assert!(Quantity("12345".into()).to_bytes().is_ok())
}

#[test]
fn to_bytes_is_some() {
assert!(Quantity("12345".into()).to_bytes().unwrap().is_some())
}

#[test]
fn to_milli_cpus_works() {
assert!(Quantity("12345m".into()).to_milli_cpus().is_ok())
}

#[test]
fn to_milli_cpus_is_some() {
assert!(Quantity("12345m".into()).to_milli_cpus().unwrap().is_some())
}

#[test]
fn invalid_unit_fails() {
assert!(Quantity("12345r".into()).to_bytes().is_err())
}

#[test]
fn parse_i64_fails() {
assert!(Quantity("123.123".into()).to_bytes().is_err())
}

#[test]
fn is_none_value() {
assert!(Quantity("0Mi".into()).to_bytes().unwrap().is_some())
}

#[test]
fn pow2_mb_to_bytes() {
let mib = Quantity("1Mi".into());
let ret: i64 = 1048576;
assert_eq!(mib.to_bytes().ok().flatten().unwrap(), ret);
}

#[test]
fn pow10_gb_to_bytes() {
let mib = Quantity("1G".into());
let ret: i64 = 1000000000;
assert_eq!(mib.to_bytes().ok().flatten().unwrap(), ret);
}

#[test]
fn cpu_units_value_to_millis() {
let cpu = Quantity("1536m".into());
let ret: i64 = 1536;
assert_eq!(cpu.to_milli_cpus().ok().flatten().unwrap(), ret)
}

#[test]
fn cpu_cores_value_to_millis() {
let cpu = Quantity("4".into());
let ret: i64 = 4000;
assert_eq!(cpu.to_milli_cpus().ok().flatten().unwrap(), ret)
}
}

0 comments on commit b281fcc

Please sign in to comment.