Skip to content

Commit

Permalink
Add readiness checks
Browse files Browse the repository at this point in the history
And propagate pod readiness into RestateCluster status
  • Loading branch information
jackkleeman committed Apr 16, 2024
1 parent 60bb400 commit e1aa976
Showing 1 changed file with 47 additions and 9 deletions.
56 changes: 47 additions & 9 deletions src/reconcilers/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ use std::collections::{BTreeMap, HashSet};
use std::convert::Into;
use std::time::Duration;

use k8s_openapi::api::apps::v1::{StatefulSet, StatefulSetSpec};
use k8s_openapi::api::apps::v1::{StatefulSet, StatefulSetSpec, StatefulSetStatus};
use k8s_openapi::api::core::v1::{
Container, ContainerPort, EnvVar, PersistentVolumeClaim, PersistentVolumeClaimSpec, Pod,
PodSecurityContext, PodSpec, PodTemplateSpec, SeccompProfile, SecurityContext, Service,
ServiceAccount, ServicePort, ServiceSpec, Volume, VolumeMount, VolumeResourceRequirements,
Container, ContainerPort, EnvVar, HTTPGetAction, PersistentVolumeClaim,
PersistentVolumeClaimSpec, Pod, PodSecurityContext, PodSpec, PodTemplateSpec, Probe,
SeccompProfile, SecurityContext, Service, ServiceAccount, ServicePort, ServiceSpec, Volume,
VolumeMount, VolumeResourceRequirements,
};
use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use k8s_openapi::apimachinery::pkg::util::intstr::IntOrString;
use kube::api::{DeleteParams, Preconditions, PropagationPolicy};
use kube::core::PartialObjectMeta;
use kube::runtime::reflector::{ObjectRef, Store};
Expand Down Expand Up @@ -220,6 +222,14 @@ fn restate_statefulset(
..Default::default()
},
]),
readiness_probe: Some(Probe {
http_get: Some(HTTPGetAction {
port: IntOrString::Int(9070),
path: Some("/health".into()),
..Default::default()
}),
..Default::default()
}),
resources: compute.resources.clone(),
security_context: Some(SecurityContext {
read_only_root_filesystem: Some(true),
Expand Down Expand Up @@ -430,11 +440,7 @@ pub async fn reconcile_compute(
)
.await?;

let replicas = ss.status.map(|s| s.replicas).unwrap_or(0);
let expected_replicas = spec.compute.replicas.unwrap_or(1);
if replicas != expected_replicas {
return Err(Error::NotReady { reason: "StatefulSetScaling".into(), message: format!("StatefulSet has {replicas} replicas instead of the expected {expected_replicas}; it may be scaling up or down", ), requeue_after: None });
}
validate_stateful_set_status(ss.status, spec.compute.replicas.unwrap_or(1))?;

Ok(())
}
Expand Down Expand Up @@ -723,3 +729,35 @@ async fn apply_stateful_set(
debug!("Applying Stateful Set {} in namespace {}", name, namespace);
Ok(ss_api.patch(name, &params, &Patch::Apply(&ss)).await?)
}

fn validate_stateful_set_status(
status: Option<StatefulSetStatus>,
expected_replicas: i32,
) -> Result<(), Error> {
let status = if let Some(status) = status {
status
} else {
return Err(Error::NotReady {
message: "StatefulSetNoStatus".into(),
reason: "StatefulSet has no status set; it may have just been created".into(),
requeue_after: None,
});
};

let StatefulSetStatus {
replicas,
ready_replicas,
..
} = status;
if replicas != expected_replicas {
return Err(Error::NotReady { reason: "StatefulSetScaling".into(), message: format!("StatefulSet has {replicas} replicas instead of the expected {expected_replicas}; it may be scaling up or down"), requeue_after: None });
};

let ready_replicas = ready_replicas.unwrap_or(0);

if ready_replicas != expected_replicas {
return Err(Error::NotReady { reason: "StatefulSetPodNotReady".into(), message: format!("StatefulSet has {ready_replicas} ready replicas instead of the expected {expected_replicas}; a pod may not be ready"), requeue_after: None });
}

Ok(())
}

0 comments on commit e1aa976

Please sign in to comment.