Skip to content

Commit

Permalink
expose wal archiver timestamp (#758)
Browse files Browse the repository at this point in the history
  • Loading branch information
nhudson authored May 7, 2024
1 parent 0b6581b commit 96adf61
Show file tree
Hide file tree
Showing 10 changed files with 93 additions and 12 deletions.
2 changes: 1 addition & 1 deletion charts/tembo-operator/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: tembo-operator
description: 'Helm chart to deploy the tembo-operator'
type: application
icon: https://cloud.tembo.io/images/TemboElephant.png
version: 0.5.1
version: 0.5.2
home: https://tembo.io
sources:
- https://github.com/tembo-io/tembo
Expand Down
4 changes: 4 additions & 0 deletions charts/tembo-operator/templates/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2790,6 +2790,10 @@ spec:
format: date-time
nullable: true
type: string
last_archiver_status:
format: date-time
nullable: true
type: string
last_fully_reconciled_at:
format: date-time
nullable: true
Expand Down
1 change: 1 addition & 0 deletions tembo-operator/src/apis/coredb_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,7 @@ pub struct CoreDBStatus {
pub first_recoverability_time: Option<DateTime<Utc>>,
pub pg_postmaster_start_time: Option<DateTime<Utc>>,
pub last_fully_reconciled_at: Option<DateTime<Utc>>,
pub last_archiver_status: Option<DateTime<Utc>>,
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions tembo-operator/src/cloudnativepg/archive/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub(crate) mod wal;
44 changes: 44 additions & 0 deletions tembo-operator/src/cloudnativepg/archive/wal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use crate::{
apis::coredb_types::CoreDB,
cloudnativepg::{clusters::ClusterStatusConditionsStatus, cnpg::get_cluster},
Context,
};
use chrono::{DateTime, Utc};
use kube::{runtime::controller::Action, ResourceExt};
use std::sync::Arc;
use tracing::error;

// Find status of the last time a WAL archive was successful and retrun the date
pub async fn reconcile_last_archive_status(
cdb: &CoreDB,
ctx: Arc<Context>,
) -> Result<Option<DateTime<Utc>>, Action> {
let name = cdb.name_any();

let cluster = get_cluster(cdb, ctx.clone()).await;
match cluster {
Some(cluster) => {
if let Some(status) = &cluster.status {
if let Some(conditions) = &status.conditions {
for condition in conditions {
if condition.r#type == "ContinuousArchiving"
&& condition.status == ClusterStatusConditionsStatus::True
{
let last_transition_time = &condition.last_transition_time;
if let Ok(last_transition_time) =
DateTime::parse_from_rfc3339(last_transition_time)
{
return Ok(Some(last_transition_time.with_timezone(&Utc)));
}
}
}
}
}
Ok(None)
}
None => {
error!("Failed to get cluster: {}", &name);
Err(Action::requeue(tokio::time::Duration::from_secs(300)))
}
}
}
42 changes: 36 additions & 6 deletions tembo-operator/src/cloudnativepg/cnpg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1476,17 +1476,18 @@ fn schedule_expression_from_cdb(cdb: &CoreDB) -> String {
match &cdb.spec.backup.schedule {
None => default,
Some(expression) => {
let mut terms = expression.split(' ').collect::<Vec<&str>>();
if terms.len() == 5 {
let terms: Vec<&str> = expression.split(' ').collect();
let terms = if terms.len() == 5 {
// pre-pend "0" to the vector
let mut new_terms = vec!["0"];
new_terms.extend(terms);
terms = new_terms.clone();
}
if terms.len() != 6 {
new_terms
} else if terms.len() == 6 {
terms
} else {
warn!("Invalid schedule expression, expected five or six terms. Setting as default. Found expression: '{}'", expression);
return default;
}
};
// check that all terms are either parsable as int32 or "*"
for term in &terms {
if *term != "*" {
Expand Down Expand Up @@ -2920,6 +2921,35 @@ mod tests {
assert_eq!(cnpg_cluster_storage_class(&cdb_no_storage_class), None);
}

#[test]
fn test_schedule_expression_from_cdb() {
let mut coredb = CoreDB::test();

// Test case 1: No schedule specified, should return default
coredb.spec.backup.schedule = None;
assert_eq!(schedule_expression_from_cdb(&coredb), "0 0 0 * * *");

// Test case 2: Valid 6-term schedule expression
coredb.spec.backup.schedule = Some("30 12 * * * *".to_string());
assert_eq!(schedule_expression_from_cdb(&coredb), "30 12 * * * *");

// Test case 3: Valid 5-term schedule expression
coredb.spec.backup.schedule = Some("30 12 * * *".to_string());
assert_eq!(schedule_expression_from_cdb(&coredb), "0 30 12 * * *");

// Test case 4: Invalid schedule expression with less than 5 terms
coredb.spec.backup.schedule = Some("30 12 * *".to_string());
assert_eq!(schedule_expression_from_cdb(&coredb), "0 0 0 * * *");

// Test case 5: Invalid schedule expression with more than 6 terms
coredb.spec.backup.schedule = Some("30 12 * * * * *".to_string());
assert_eq!(schedule_expression_from_cdb(&coredb), "0 0 0 * * *");

// Test case 6: Invalid schedule expression with non-integer term
coredb.spec.backup.schedule = Some("30 12 * * * abc".to_string());
assert_eq!(schedule_expression_from_cdb(&coredb), "0 0 0 * * *");
}

// #[test]
// fn test_cnpg_cluster_volume_snapshot() {
// let cdb_yaml = r#"
Expand Down
1 change: 1 addition & 0 deletions tembo-operator/src/cloudnativepg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod backups;
pub mod clusters;
pub(crate) mod cnpg;
// pub(crate) mod cnpg_backups;
pub(crate) mod archive;
pub mod cnpg_utils;
pub mod hibernate;
pub(crate) mod placement;
Expand Down
3 changes: 3 additions & 0 deletions tembo-operator/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
apis::coredb_types::{CoreDB, CoreDBStatus, VolumeSnapshot},
app_service::manager::reconcile_app_services,
cloudnativepg::{
archive::wal::reconcile_last_archive_status,
backups::Backup,
cnpg::{
cnpg_cluster_from_cdb, reconcile_cnpg, reconcile_cnpg_scheduled_backup,
Expand Down Expand Up @@ -376,6 +377,7 @@ impl CoreDB {
reconcile_extensions(self, ctx.clone(), &coredbs, &name).await?;

let recovery_time = self.get_recovery_time(ctx.clone()).await?;
let last_archiver_status = reconcile_last_archive_status(self, ctx.clone()).await?;

let current_config_values = get_current_config_values(self, ctx.clone()).await?;
let mut new_status = CoreDBStatus {
Expand All @@ -389,6 +391,7 @@ impl CoreDB {
first_recoverability_time: recovery_time,
pg_postmaster_start_time,
last_fully_reconciled_at: None,
last_archiver_status,
};

let current_time = Utc::now();
Expand Down
2 changes: 1 addition & 1 deletion tembo-operator/src/extensions/toggle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ async fn get_trunk_project_version(
.is_ok();
// If trunk project exists for this version, use it
if trunk_project_version_exists {
trunk_project_version = location_to_toggle.version.clone();
trunk_project_version.clone_from(&location_to_toggle.version);
}
// Otherwise, fall back to latest version
else {
Expand Down
5 changes: 1 addition & 4 deletions tembo-operator/src/extensions/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,7 @@ pub fn generate_extension_enable_cmd(
}
let mut command_suffix: String = "".to_string();
if EXTRA_COMMANDS_TO_ENABLE_EXTENSION.contains_key(ext_name) {
command_suffix = EXTRA_COMMANDS_TO_ENABLE_EXTENSION
.get(ext_name)
.unwrap()
.clone();
command_suffix.clone_from(EXTRA_COMMANDS_TO_ENABLE_EXTENSION.get(ext_name).unwrap());
}
// only specify the schema if it provided
let command = match ext_loc.enabled {
Expand Down

0 comments on commit 96adf61

Please sign in to comment.