Skip to content

Commit

Permalink
chore: Add sidecar
Browse files Browse the repository at this point in the history
  • Loading branch information
gonzalezzfelipe committed Oct 31, 2024
1 parent a5406f8 commit 7aef558
Show file tree
Hide file tree
Showing 9 changed files with 635 additions and 24 deletions.
390 changes: 389 additions & 1 deletion Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ tokio-native-tls = "0.3.1"
tracing = "0.1.40"
lazy_static = "1.5.0"
tracing-subscriber = "0.3.18"
reqwest = "0.12.9"
prometheus-parse = "0.2.5"

[profile.release]
debug = true
7 changes: 5 additions & 2 deletions bootstrap/stage1/crd.tf
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ resource "kubernetes_manifest" "customresourcedefinition_hydradoomnodes_hydra_do
"seedInput" = {
"type" = "string"
}
"sidecarImage" = {
"nullable" = true
"type" = "string"
}
}
"required" = [
"commitInputs",
Expand All @@ -109,8 +113,7 @@ resource "kubernetes_manifest" "customresourcedefinition_hydradoomnodes_hydra_do
"type" = "string"
}
"transactions" = {
"format" = "uint"
"minimum" = 0
"format" = "int64"
"type" = "integer"
}
}
Expand Down
9 changes: 7 additions & 2 deletions bootstrap/stage2/deployment.tf
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,17 @@ resource "kubernetes_deployment_v1" "operator" {

env {
name = "IMAGE"
value = var.hydra_pod_image
value = var.hydra_node_image
}

env {
name = "OPEN_HEAD_IMAGE"
value = var.hydra_pod_open_head_image
value = var.open_head_image
}

env {
name = "SIDECAR_IMAGE"
value = var.sidecar_image
}

env {
Expand Down
8 changes: 6 additions & 2 deletions bootstrap/stage2/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@ variable "image" {
type = string
}

variable "hydra_pod_image" {
variable "hydra_node_image" {
type = string
default = "ghcr.io/cardano-scaling/hydra-node"
}

variable "hydra_pod_open_head_image" {
variable "open_head_image" {
type = string
}

variable "sidecar_image" {
type = string
}

Expand Down
12 changes: 7 additions & 5 deletions src/bin/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tracing::{error, info, instrument};

use doom_patrol::{
config::Config,
controller::{error_policy, reconcile, K8sContext},
controller::{error_policy, patch_statuses, reconcile, K8sContext},
custom_resource::HydraDoomNode,
};

Expand All @@ -25,15 +25,17 @@ async fn main() -> Result<()> {
// Create controller for MyApp custom resource
let api: Api<HydraDoomNode> = Api::all(client);
info!("Running controller.");
Controller::new(api, Default::default())
.run(reconcile, error_policy, context)
let controller = Controller::new(api, Default::default())
.run(reconcile, error_policy, context.clone())
.for_each(|res| async move {
match res {
Ok(o) => info!("Reconciled {:?}", o),
Err(e) => error!("Reconcile failed: {:?}", e),
}
})
.await;
});
let patch_statuses_controller = patch_statuses(context.clone());

let _ = tokio::join!(controller, patch_statuses_controller);

Ok(())
}
2 changes: 2 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub fn get_config() -> &'static Config {
pub struct Config {
pub image: String,
pub open_head_image: String,
pub sidecar_image: String,
pub configmap: String,
pub blockfrost_key: String,
pub external_domain: String,
Expand All @@ -24,6 +25,7 @@ impl Config {
Self {
image: env::var("IMAGE").unwrap_or("ghcr.io/cardano-scaling/hydra-node".into()),
open_head_image: env::var("OPEN_HEAD_IMAGE").expect("Missing OPEN_HEAD_IMAGE env var"),
sidecar_image: env::var("SIDECAR_IMAGE").expect("Missing SIDECAR_IMAGE env var"),
configmap: env::var("CONFIGMAP").expect("Missing CONFIGMAP env var"),
blockfrost_key: env::var("BLOCKFROST_KEY").expect("Missing BLOCKFROST_KEY env var"),
external_domain: env::var("EXTERNAL_DOMAIN").expect("Missing EXTERNAL_DOMAIN env var."),
Expand Down
159 changes: 157 additions & 2 deletions src/controller.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,46 @@
use anyhow::bail;
use k8s_openapi::api::{apps::v1::Deployment, core::v1::Service, networking::v1::Ingress};
use kube::{
api::{DeleteParams, Patch, PatchParams},
api::{DeleteParams, ListParams, Patch, PatchParams},
runtime::controller::Action,
Api, Client, ResourceExt,
};
use serde_json::json;
use std::{collections::BTreeMap, sync::Arc, time::Duration};
use thiserror::Error;
use tracing::{error, info};
use tracing::{error, info, warn};

use crate::{config::Config, custom_resource::HydraDoomNodeStatus};

use super::custom_resource::{HydraDoomNode, HYDRA_DOOM_NODE_FINALIZER};

pub enum HydraDoomNodeState {
Offline,
Online,
HeadIsInitializing,
HeadIsOpen,
}
impl From<f64> for HydraDoomNodeState {
fn from(value: f64) -> Self {
match value {
1.0 => Self::Online,
2.0 => Self::HeadIsInitializing,
3.0 => Self::HeadIsOpen,
_ => Self::Offline,
}
}
}
impl From<HydraDoomNodeState> for String {
fn from(val: HydraDoomNodeState) -> Self {
match val {
HydraDoomNodeState::Offline => "Offline".to_string(),
HydraDoomNodeState::Online => "Online".to_string(),
HydraDoomNodeState::HeadIsInitializing => "HeadIsInitializing".to_string(),
HydraDoomNodeState::HeadIsOpen => "HeadIsOpen".to_string(),
}
}
}

pub struct K8sConstants {
pub config_dir: String,
pub data_dir: String,
Expand All @@ -22,6 +49,10 @@ pub struct K8sConstants {
pub port: i32,
pub ingress_class_name: String,
pub ingress_annotations: BTreeMap<String, String>,
pub metrics_port: i32,
pub metrics_endpoint: String,
pub state_metric: String,
pub transactions_metric: String,
}
impl Default for K8sConstants {
fn default() -> Self {
Expand All @@ -31,6 +62,10 @@ impl Default for K8sConstants {
persistence_dir: "/var/persistence".to_string(),
node_port: 5001,
port: 4001,
metrics_port: 8000,
metrics_endpoint: "/metrics".to_string(),
state_metric: "hydra_doom_node_state".to_string(),
transactions_metric: "hydra_doom_node_transactions".to_string(),
ingress_class_name: "nginx".to_string(),
ingress_annotations: [
(
Expand Down Expand Up @@ -247,6 +282,126 @@ impl K8sContext {
Err(e) => Err(e.into()),
}
}

async fn get_status_from_crd(&self, crd: &HydraDoomNode) -> HydraDoomNodeStatus {
let url = format!(
"http://{}:{}{}",
crd.internal_host(),
self.constants.metrics_port,
self.constants.metrics_endpoint
);
let default = HydraDoomNodeStatus::offline(crd, &self.config, &self.constants);

match reqwest::get(&url).await {
Ok(response) => match response.text().await {
Ok(body) => {
let lines: Vec<_> = body.lines().map(|s| Ok(s.to_owned())).collect();
match prometheus_parse::Scrape::parse(lines.into_iter()) {
Ok(metrics) => {
let state = metrics
.clone()
.samples
.into_iter()
.find(|sample| sample.metric == self.constants.state_metric)
.map(|sample| match sample.value {
prometheus_parse::Value::Gauge(value) => {
HydraDoomNodeState::from(value)
}
_ => HydraDoomNodeState::Offline,
});

let transactions = metrics
.clone()
.samples
.into_iter()
.find(|sample| sample.metric == self.constants.state_metric)
.map(|sample| match sample.value {
prometheus_parse::Value::Counter(count) => count.round() as i64,
_ => 0,
});
match (state, transactions) {
(Some(state), Some(transactions)) => HydraDoomNodeStatus {
transactions,
state: state.into(),
local_url: format!(
"ws://{}:{}",
crd.internal_host(),
self.constants.port
),
external_url: format!(
"ws://{}:{}",
crd.external_host(&self.config, &self.constants),
self.config.external_port
),
},
_ => default,
}
}
Err(err) => {
warn!(
err = err.to_string(),
"Failed to parse metrics for {}",
crd.name_any()
);
default
}
}
}
Err(err) => {
warn!(
err = err.to_string(),
"Failed to parse request response to metrics endpoint for {}",
crd.name_any()
);
default
}
},
Err(err) => {
warn!(
err = err.to_string(),
"Failed to request metrics for {}",
crd.name_any()
);
default
}
}
}

async fn patch_statuses(&self) -> anyhow::Result<()> {
let api: Api<HydraDoomNode> = Api::all(self.client.clone());

let crds = api.list(&ListParams::default()).await?;

let mut awaitables = vec![];
for crd in &crds {
awaitables.push(async {
let name = crd.name_any();
if let Err(err) = api
.patch_status(
&name,
&PatchParams::default(),
&Patch::Merge(json!({ "status": self.get_status_from_crd(crd).await })),
)
.await
{
warn!(err = err.to_string(), "Failed to status for CRD.");
};
})
}

futures::future::join_all(awaitables).await;

Ok(())
}
}

pub async fn patch_statuses(context: Arc<K8sContext>) -> Result<()> {
info!("Running status patcher loop.");

loop {
context.patch_statuses().await?;
tokio::time::sleep(Duration::from_secs(5)).await;
}
}

// Auxiliary error value because K8s controller api doesnt go along with anyhow.
Expand Down
Loading

0 comments on commit 7aef558

Please sign in to comment.