Skip to content

Commit

Permalink
refactor: store less information per streaming client (#618)
Browse files Browse the repository at this point in the history
This PR refactors the broadcaster and what it stores per client:

    I've created a new StreamingQuery struct that has only the data we need,
    I've moved the token storing from client group level to individual client level (and created a ClientData struct)

I realized when setting up the tests for this that the Query, EdgeToken, and FilterQuery all contain more or less the same bits of data. But all we really need is:

    The actual token string (so that we can boot clients if the token expires)
    Name prefix, projects, and env.

In the Unleash Types Query type, projects and env are optional, but we need them to be present to perform the calculation.

So I created a StreamingQuery struct, which consolidates the data we need from the Query and EdgeToken. I also copied in the methods we use for this elsewhere in Unleash and slightly adapted them. I've added notes inline.
  • Loading branch information
thomasheartman authored Jan 6, 2025
1 parent 435ef59 commit 7ecb244
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 66 deletions.
4 changes: 1 addition & 3 deletions server/src/client_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,7 @@ pub async fn stream_features(
let (validated_token, _filter_set, query) =
get_feature_filter(&edge_token, &token_cache, filter_query.clone())?;

broadcaster
.connect(validated_token, filter_query, query)
.await
broadcaster.connect(validated_token, query).await
}

#[utoipa::path(
Expand Down
155 changes: 92 additions & 63 deletions server/src/http/broadcaster.rs
Original file line number Diff line number Diff line change
@@ -1,54 +1,70 @@
use std::{
hash::{Hash, Hasher},
sync::Arc,
time::Duration,
};
use std::{hash::Hash, sync::Arc, time::Duration};

use actix_web::{
rt::time::interval,
web::{Json, Query},
};
use actix_web::{rt::time::interval, web::Json};
use actix_web_lab::{
sse::{self, Event, Sse},
util::InfallibleStream,
};
use dashmap::DashMap;
use futures::future;
use prometheus::{register_int_gauge, IntGauge};
use serde::Serialize;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, warn};
use unleash_types::client_features::{ClientFeatures, Query as FlagQuery};
use unleash_types::client_features::{ClientFeatures, Query};

use crate::{
error::EdgeError,
feature_cache::FeatureCache,
filters::{filter_client_features, name_prefix_filter, project_filter, FeatureFilterSet},
tokens::cache_key,
types::{EdgeJsonResult, EdgeResult, EdgeToken, FeatureFilters},
filters::{filter_client_features, name_prefix_filter, FeatureFilter, FeatureFilterSet},
types::{EdgeJsonResult, EdgeResult, EdgeToken},
};

#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
struct QueryWrapper {
query: FlagQuery,
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct StreamingQuery {
pub projects: Vec<String>,
pub name_prefix: Option<String>,
pub environment: String,
}

impl Hash for QueryWrapper {
fn hash<H: Hasher>(&self, state: &mut H) {
serde_json::to_string(&self.query).unwrap().hash(state);
impl From<StreamingQuery> for Query {
fn from(value: StreamingQuery) -> Self {
Self {
tags: None,
name_prefix: value.name_prefix,
environment: Some(value.environment),
inline_segment_constraints: Some(false),
projects: Some(value.projects),
}
}
}

impl From<(&Query, &EdgeToken)> for StreamingQuery {
fn from((query, token): (&Query, &EdgeToken)) -> Self {
Self {
projects: token.projects.clone(),
name_prefix: query.name_prefix.clone(),
environment: match token.environment {
Some(ref env) => env.clone(),
None => token.token.clone(),
},
}
}
}

#[derive(Clone, Debug)]
struct ClientData {
token: String,
sender: mpsc::Sender<sse::Event>,
}

#[derive(Clone, Debug)]
struct ClientGroup {
clients: Vec<mpsc::Sender<sse::Event>>,
filter_set: Query<FeatureFilters>,
token: EdgeToken,
clients: Vec<ClientData>,
}

pub struct Broadcaster {
active_connections: DashMap<QueryWrapper, ClientGroup>,
active_connections: DashMap<StreamingQuery, ClientGroup>,
features_cache: Arc<FeatureCache>,
}

Expand Down Expand Up @@ -101,88 +117,88 @@ impl Broadcaster {
async fn heartbeat(&self) {
let mut active_connections = 0i64;
for mut group in self.active_connections.iter_mut() {
let mut ok_clients = Vec::new();
let clients = std::mem::take(&mut group.clients);
let ok_clients = &mut group.clients;

for client in &group.clients {
if client
for ClientData { token, sender } in clients {
if sender
.send(sse::Event::Comment("keep-alive".into()))
.await
.is_ok()
{
ok_clients.push(client.clone());
ok_clients.push(ClientData { token, sender });
}
}

active_connections += ok_clients.len() as i64;
group.clients = ok_clients;
}
CONNECTED_STREAMING_CLIENTS.set(active_connections)
}

/// Registers client with broadcaster, returning an SSE response body.
pub async fn connect(
&self,
token: EdgeToken,
filter_set: Query<FeatureFilters>,
query: unleash_types::client_features::Query,
query: Query,
) -> EdgeResult<Sse<InfallibleStream<ReceiverStream<sse::Event>>>> {
let (tx, rx) = mpsc::channel(10);
self.create_connection(StreamingQuery::from((&query, &token)), &token.token)
.await
.map(Sse::from_infallible_receiver)
}

let features = &self
.resolve_features(&token, filter_set.clone(), query.clone())
.await?;
async fn create_connection(
&self,
query: StreamingQuery,
token: &str,
) -> EdgeResult<mpsc::Receiver<sse::Event>> {
let (tx, rx) = mpsc::channel(10);

let features = self.resolve_features(query.clone()).await?;
tx.send(
sse::Data::new_json(features)?
sse::Data::new_json(&features)?
.event("unleash-connected")
.into(),
)
.await?;

self.active_connections
.entry(QueryWrapper { query })
.entry(query)
.and_modify(|group| {
group.clients.push(tx.clone());
group.clients.push(ClientData {
token: token.into(),
sender: tx.clone(),
});
})
.or_insert(ClientGroup {
clients: vec![tx.clone()],
filter_set,
token,
clients: vec![ClientData {
token: token.into(),
sender: tx.clone(),
}],
});
Ok(Sse::from_infallible_receiver(rx))
}

fn get_query_filters(
filter_query: Query<FeatureFilters>,
token: &EdgeToken,
) -> FeatureFilterSet {
let query_filters = filter_query.into_inner();
Ok(rx)
}

let filter_set = if let Some(name_prefix) = query_filters.name_prefix {
FeatureFilterSet::from(Box::new(name_prefix_filter(name_prefix)))
fn get_query_filters(query: &StreamingQuery) -> FeatureFilterSet {
let filter_set = if let Some(name_prefix) = &query.name_prefix {
FeatureFilterSet::from(Box::new(name_prefix_filter(name_prefix.clone())))
} else {
FeatureFilterSet::default()
}
.with_filter(project_filter(token));
.with_filter(project_filter(query.projects.clone()));
filter_set
}

async fn resolve_features(
&self,
validated_token: &EdgeToken,
filter_set: Query<FeatureFilters>,
query: FlagQuery,
) -> EdgeJsonResult<ClientFeatures> {
let filter_set = Broadcaster::get_query_filters(filter_set.clone(), validated_token);
async fn resolve_features(&self, query: StreamingQuery) -> EdgeJsonResult<ClientFeatures> {
let filter_set = Broadcaster::get_query_filters(&query);

let features = self
.features_cache
.get(&cache_key(validated_token))
.get(&query.environment)
.map(|client_features| filter_client_features(&client_features, &filter_set));

match features {
Some(features) => Ok(Json(ClientFeatures {
query: Some(query),
query: Some(query.into()),
..features
})),
// Note: this is a simplification for now, using the following assumptions:
Expand All @@ -196,11 +212,12 @@ impl Broadcaster {
/// Broadcast new features to all clients.
pub async fn broadcast(&self) {
let mut client_events = Vec::new();

for entry in self.active_connections.iter() {
let (query, group) = entry.pair();

let event_data = self
.resolve_features(&group.token, group.filter_set.clone(), query.query.clone())
.resolve_features(query.clone())
.await
.and_then(|features| sse::Data::new_json(&features).map_err(|e| e.into()));

Expand All @@ -221,8 +238,20 @@ impl Broadcaster {
// disconnected clients will get swept up by `remove_stale_clients`
let send_events = client_events
.iter()
.map(|(client, event)| client.send(event.clone()));
.map(|(ClientData { sender, .. }, event)| sender.send(event.clone()));

let _ = future::join_all(send_events).await;
}
}

fn project_filter(projects: Vec<String>) -> FeatureFilter {
Box::new(move |feature| {
if let Some(feature_project) = &feature.project {
projects.is_empty()
|| projects.contains(&"*".to_string())
|| projects.contains(feature_project)
} else {
false
}
})
}

0 comments on commit 7ecb244

Please sign in to comment.