Skip to content

Commit

Permalink
feat: track last request time for each user id (#93)
Browse files Browse the repository at this point in the history
  • Loading branch information
MicaiahReid authored Apr 22, 2024
1 parent 33b8c41 commit 71e070e
Show file tree
Hide file tree
Showing 5 changed files with 372 additions and 39 deletions.
8 changes: 4 additions & 4 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl StacksDevnetConfig {
pub fn to_validated_config(
self,
user_id: &str,
ctx: Context,
ctx: &Context,
) -> Result<ValidatedStacksDevnetConfig, DevNetError> {
let context = format!(
"failed to validate config for NAMESPACE: {}",
Expand Down Expand Up @@ -232,7 +232,7 @@ mod tests {
let _guard = hiro_system_kit::log::setup_global_logger(logger.clone());
let ctx = Context::empty();
let validated_config = template
.to_validated_config(user_id, ctx)
.to_validated_config(user_id, &ctx)
.unwrap_or_else(|e| panic!("config validation test failed: {}", e.message));

let expected_project_manifest = read_file("src/tests/fixtures/project-manifest.yaml");
Expand Down Expand Up @@ -278,7 +278,7 @@ mod tests {
template.network_manifest.devnet = None;
let user_id = template.clone().namespace;
template
.to_validated_config(&user_id, ctx)
.to_validated_config(&user_id, &ctx)
.unwrap_or_else(|e| panic!("config validation test failed: {}", e.message));
}

Expand All @@ -287,7 +287,7 @@ mod tests {
let template = get_template_config("src/tests/fixtures/stacks-devnet-config.json");
let namespace = template.namespace.clone();
let user_id = "wrong";
match template.to_validated_config(user_id, Context::empty()) {
match template.to_validated_config(user_id, &Context::empty()) {
Ok(_) => {
panic!("config validation with non-matching user_id should have been rejected")
}
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1876,7 +1876,7 @@ impl StacksDevnetApiK8sManager {
namespace.map_left(|del| {
assert_eq!(del.name_any(), namespace_str);
self.ctx
.try_log(|logger| slog::error!(logger, "Deleting namespace started"));
.try_log(|logger| slog::info!(logger, "Deleting namespace started"));
});
Ok(())
}
Expand Down
60 changes: 55 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@ use stacks_devnet_api::routes::{
handle_get_status, handle_new_devnet, handle_try_proxy_service, API_PATH,
};
use stacks_devnet_api::{Context, StacksDevnetApiK8sManager};
use std::collections::HashMap;
use std::env;
use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};
use std::{convert::Infallible, net::SocketAddr};

#[tokio::main]
async fn main() {
const HOST: &str = "0.0.0.0";
const PORT: &str = "8477";
let endpoint: String = HOST.to_owned() + ":" + PORT;
let port: &str = &env::var("PORT").unwrap_or("8477".to_string());
let endpoint: String = HOST.to_owned() + ":" + port;
let addr: SocketAddr = endpoint.parse().expect("Could not parse ip:port.");

let logger = hiro_system_kit::log::setup_logger();
Expand All @@ -36,14 +39,22 @@ async fn main() {
}
};
let config = ApiConfig::from_path(&config_path);
let request_store = Arc::new(Mutex::new(HashMap::new()));

let make_svc = make_service_fn(|_| {
let k8s_manager = k8s_manager.clone();
let ctx = ctx.clone();
let config = config.clone();
let request_store = request_store.clone();
async move {
Ok::<_, Infallible>(service_fn(move |req| {
handle_request(req, k8s_manager.clone(), config.clone(), ctx.clone())
handle_request(
req,
k8s_manager.clone(),
config.clone(),
request_store.clone(),
ctx.clone(),
)
}))
}
});
Expand All @@ -64,6 +75,7 @@ async fn handle_request(
http_response_config,
auth_config,
}: ApiConfig,
request_store: Arc<Mutex<HashMap<String, u64>>>,
ctx: Context,
) -> Result<Response<Body>, Infallible> {
let uri = request.uri();
Expand Down Expand Up @@ -109,10 +121,23 @@ async fn handle_request(
None => return responder.err_bad_request("missing required auth header".into()),
};

let request_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Could not get current time in secs")
.as_secs() as u64;
if path == "/api/v1/networks" {
return match method {
&Method::POST => {
handle_new_devnet(request, &user_id, k8s_manager, responder, ctx).await
handle_new_devnet(
request,
&user_id,
k8s_manager,
responder,
request_store,
request_time,
&ctx,
)
.await
}
_ => responder.err_method_not_allowed("network creation must be a POST request".into()),
};
Expand Down Expand Up @@ -149,10 +174,25 @@ async fn handle_request(
if path_parts.subroute.is_none() {
return match method {
&Method::DELETE => {
match request_store.lock() {
Ok(mut store) => {
store.remove(&user_id);
}
Err(_) => {}
}
handle_delete_devnet(k8s_manager, &network, &user_id, responder).await
}
&Method::GET => {
handle_get_devnet(k8s_manager, &network, &user_id, responder, ctx).await
handle_get_devnet(
k8s_manager,
&network,
&user_id,
responder,
request_store,
request_time,
ctx,
)
.await
}
&Method::HEAD => {
handle_check_devnet(k8s_manager, &network, &user_id, responder).await
Expand All @@ -161,6 +201,16 @@ async fn handle_request(
.err_method_not_allowed("can only GET/DELETE/HEAD at provided route".into()),
};
}
// the above methods with no subroute are initiated from our infra,
// but any remaning requests would come from the actual user, so we'll
// track this request as the last time a user made a request
match request_store.lock() {
Ok(mut store) => {
store.insert(user_id.to_string(), request_time);
}
Err(_) => {}
}

let subroute = path_parts.subroute.unwrap();
if subroute == "commands" {
return responder.err_not_implemented("commands route in progress".into());
Expand Down
78 changes: 63 additions & 15 deletions src/routes.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
use hiro_system_kit::slog;
use hyper::{Body, Client, Request, Response, Uri};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::{convert::Infallible, str::FromStr};
use std::{
collections::HashMap,
convert::Infallible,
str::FromStr,
sync::{Arc, Mutex},
};

use crate::{
config::StacksDevnetConfig,
resources::service::{get_service_from_path_part, get_service_url, get_user_facing_port},
responder::Responder,
Context, StacksDevnetApiK8sManager,
Context, StacksDevnetApiK8sManager, StacksDevnetInfoResponse,
};

const VERSION: &str = env!("CARGO_PKG_VERSION");
Expand Down Expand Up @@ -36,7 +42,9 @@ pub async fn handle_new_devnet(
user_id: &str,
k8s_manager: StacksDevnetApiK8sManager,
responder: Responder,
ctx: Context,
request_store: Arc<Mutex<HashMap<String, u64>>>,
request_time: u64,
ctx: &Context,
) -> Result<Response<Body>, Infallible> {
let body = hyper::body::to_bytes(request.into_body()).await;
if body.is_err() {
Expand All @@ -49,7 +57,15 @@ pub async fn handle_new_devnet(
match config {
Ok(config) => match config.to_validated_config(user_id, ctx) {
Ok(config) => match k8s_manager.deploy_devnet(config).await {
Ok(_) => responder.ok(),
Ok(_) => {
match request_store.lock() {
Ok(mut store) => {
store.insert(user_id.to_string(), request_time);
}
Err(_) => {}
}
responder.ok()
}
Err(e) => responder.respond(e.code, e.message),
},
Err(e) => responder.respond(e.code, e.message),
Expand All @@ -75,26 +91,58 @@ pub async fn handle_delete_devnet(
}
}

#[derive(Serialize, Deserialize, Debug)]
pub struct DevnetMetadata {
pub secs_since_last_request: u64,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct StacksDevnetInfoWithMetadata {
#[serde(flatten)]
pub data: StacksDevnetInfoResponse,
pub metadata: DevnetMetadata,
}

pub async fn handle_get_devnet(
k8s_manager: StacksDevnetApiK8sManager,
network: &str,
user_id: &str,
responder: Responder,
request_store: Arc<Mutex<HashMap<String, u64>>>,
request_time: u64,
ctx: Context,
) -> Result<Response<Body>, Infallible> {
match k8s_manager.get_devnet_info(&network, user_id).await {
Ok(devnet_info) => match serde_json::to_vec(&devnet_info) {
Ok(body) => responder.ok_with_json(Body::from(body)),
Err(e) => {
let msg = format!(
"failed to form response body: NAMESPACE: {}, ERROR: {}",
&network,
e.to_string()
);
ctx.try_log(|logger: &hiro_system_kit::Logger| slog::error!(logger, "{}", msg));
responder.err_internal(msg)
Ok(devnet_info) => {
let last_request_time = match request_store.lock() {
Ok(mut store) => match store.get(user_id) {
Some(last_request_time) => *last_request_time,
None => {
store.insert(user_id.to_string(), request_time);
request_time
}
},
Err(_) => 0,
};
let devnet_info_with_metadata = StacksDevnetInfoWithMetadata {
data: devnet_info,
metadata: DevnetMetadata {
secs_since_last_request: request_time.saturating_sub(last_request_time),
},
};
match serde_json::to_vec(&devnet_info_with_metadata) {
Ok(body) => responder.ok_with_json(Body::from(body)),
Err(e) => {
let msg = format!(
"failed to form response body: NAMESPACE: {}, ERROR: {}",
&network,
e.to_string()
);
ctx.try_log(|logger: &hiro_system_kit::Logger| slog::error!(logger, "{}", msg));
responder.err_internal(msg)
}
}
},
}
Err(e) => responder.respond(e.code, e.message),
}
}
Expand Down
Loading

0 comments on commit 71e070e

Please sign in to comment.