|
| 1 | +use std::fmt::Debug; |
| 2 | +use std::ops::Deref; |
| 3 | +use std::sync::Arc; |
| 4 | +use std::{collections::HashMap, time::Duration}; |
| 5 | + |
| 6 | +use axum::extract::FromRef; |
| 7 | +use axum::http::StatusCode; |
| 8 | +use axum::Json; |
| 9 | +use chrono::Utc; |
| 10 | +use dashmap::mapref::one::{MappedRef, RefMut}; |
| 11 | +use dashmap::{mapref::one::Ref, DashMap}; |
| 12 | +use parking_lot::RwLock; |
| 13 | +use sea_orm::prelude::DateTimeUtc; |
| 14 | +use tokio::{sync::broadcast, time::Instant}; |
| 15 | +use tracing::instrument; |
| 16 | + |
| 17 | +use crate::PwmResponse; |
| 18 | + |
| 19 | +#[derive(Debug)] |
| 20 | +pub struct Cacher<T: Send + Sync> { |
| 21 | + cached_values: DashMap<String, (T, DateTimeUtc)>, |
| 22 | + cache_expiry: Duration, |
| 23 | + last_cleaned: RwLock<Instant>, |
| 24 | +} |
| 25 | +impl<T: Send + Sync> Cacher<T> { |
| 26 | + pub fn new(cache_expiry: Duration) -> Self { |
| 27 | + Self { |
| 28 | + cached_values: DashMap::new(), |
| 29 | + cache_expiry, |
| 30 | + last_cleaned: RwLock::new(Instant::now()), |
| 31 | + } |
| 32 | + } |
| 33 | + pub fn invalidate(&self) { |
| 34 | + self.cached_values.clear(); |
| 35 | + } |
| 36 | + pub fn invalidate_key(&self, key: &str) { |
| 37 | + self.cached_values.remove(key); |
| 38 | + } |
| 39 | + pub fn get(&self, key: &str) -> Option<MappedRef<'_, String, (T, DateTimeUtc), T>> { |
| 40 | + self.clean_cache(); |
| 41 | + let x = self.cached_values.get(key).map(|x| x.map(|x| &x.0)); |
| 42 | + x |
| 43 | + } |
| 44 | + pub fn insert(&self, key: String, value: T) { |
| 45 | + self.clean_cache(); |
| 46 | + self.cached_values.insert(key, (value, Utc::now())); |
| 47 | + } |
| 48 | + fn get_no_check(&self, key: &str) -> Option<Ref<String, (T, DateTimeUtc)>> { |
| 49 | + self.cached_values.get(key) |
| 50 | + } |
| 51 | + fn clean_cache(&self) { |
| 52 | + let cleaning_due = { |
| 53 | + let last_cleaned = self.last_cleaned.read(); |
| 54 | + let elapsed = last_cleaned.elapsed(); |
| 55 | + (elapsed > self.cache_expiry, elapsed) |
| 56 | + }; |
| 57 | + if cleaning_due.0 { |
| 58 | + self.cached_values |
| 59 | + .retain(|_k, _| cleaning_due.1 < self.cache_expiry); |
| 60 | + *self.last_cleaned.write() = Instant::now(); |
| 61 | + } |
| 62 | + } |
| 63 | +} |
| 64 | +pub type ReadyCacheStore<T> = Result<T, (StatusCode, Json<PwmResponse>)>; |
| 65 | +#[derive(Debug, Clone)] |
| 66 | +pub struct ReadyCache<T: Send + Sync> { |
| 67 | + pub cache: Arc<Cacher<ReadyCacheStore<T>>>, |
| 68 | + pub in_progress: Arc<DashMap<String, broadcast::Sender<ReadyCacheStore<T>>>>, |
| 69 | +} |
| 70 | +impl<T: Send + Sync + Debug + Clone> ReadyCache<T> { |
| 71 | + pub fn new(cache_expiry: Duration) -> Self { |
| 72 | + Self { |
| 73 | + cache: Arc::new(Cacher::new(cache_expiry)), |
| 74 | + in_progress: Arc::new(DashMap::new()), |
| 75 | + } |
| 76 | + } |
| 77 | + #[instrument] |
| 78 | + pub fn start_processing(&self, key: String) { |
| 79 | + let (tx, rx) = broadcast::channel(1); |
| 80 | + self.in_progress.insert(key, tx.clone()); |
| 81 | + } |
| 82 | + #[instrument] |
| 83 | + pub fn finish_processing(&self, key: &str, value: ReadyCacheStore<T>) { |
| 84 | + self.cache.insert(key.to_string(), value); |
| 85 | + let Some(ref_to_new) = self.cache.get_no_check(key) else { |
| 86 | + return; |
| 87 | + }; |
| 88 | + let in_progress = self.in_progress.get(key).map(|x| x.clone()); |
| 89 | + if let Some(in_progress) = in_progress { |
| 90 | + in_progress.send(ref_to_new.value().clone().0).ok(); |
| 91 | + // self.in_progress.remove(key); |
| 92 | + } |
| 93 | + } |
| 94 | +} |
0 commit comments