Skip to content

Commit

Permalink
chore(1-3216): send the same headers in streaming as for polling (#613)
Browse files Browse the repository at this point in the history
This pr adds the same headers to the streaming clients as we use for polling clients.
  • Loading branch information
thomasheartman authored Dec 20, 2024
1 parent 9fcdbff commit 2171000
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 7 deletions.
23 changes: 21 additions & 2 deletions server/src/http/feature_refresher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ use unleash_yggdrasil::EngineState;
use crate::error::{EdgeError, FeatureError};
use crate::feature_cache::FeatureCache;
use crate::filters::{filter_client_features, FeatureFilterSet};
use crate::http::headers::{
UNLEASH_APPNAME_HEADER, UNLEASH_CLIENT_SPEC_HEADER, UNLEASH_INSTANCE_ID_HEADER,
};
use crate::types::{build, EdgeResult, TokenType, TokenValidationStatus};
use crate::{
persistence::EdgePersistence,
Expand Down Expand Up @@ -271,17 +274,33 @@ impl FeatureRefresher {
}

/// This is where we set up a listener per token.
pub async fn start_streaming_features_background_task(&self) -> anyhow::Result<()> {
pub async fn start_streaming_features_background_task(
&self,
app_name: String,
custom_headers: Vec<(String, String)>,
) -> anyhow::Result<()> {
use anyhow::Context;

let refreshes = self.get_tokens_due_for_refresh();
for refresh in refreshes {
let token = refresh.token.clone();
let streaming_url = self.unleash_client.urls.client_features_stream_url.as_str();

let es_client = eventsource_client::ClientBuilder::for_url(streaming_url)
let mut es_client_builder = eventsource_client::ClientBuilder::for_url(streaming_url)
.context("Failed to create EventSource client for streaming")?
.header("Authorization", &token.token)?
.header(UNLEASH_APPNAME_HEADER, &app_name)?
.header(UNLEASH_INSTANCE_ID_HEADER, "unleash_edge")?
.header(
UNLEASH_CLIENT_SPEC_HEADER,
unleash_yggdrasil::SUPPORTED_SPEC_VERSION,
)?;

for (key, value) in custom_headers.clone() {
es_client_builder = es_client_builder.header(&key, &value)?;
}

let es_client = es_client_builder
.reconnect(
eventsource_client::ReconnectOptions::reconnect(true)
.retry_initial(true)
Expand Down
3 changes: 3 additions & 0 deletions server/src/http/headers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub(crate) const UNLEASH_APPNAME_HEADER: &str = "UNLEASH-APPNAME";
pub(crate) const UNLEASH_INSTANCE_ID_HEADER: &str = "UNLEASH-INSTANCEID";
pub(crate) const UNLEASH_CLIENT_SPEC_HEADER: &str = "Unleash-Client-Spec";
1 change: 1 addition & 0 deletions server/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
pub mod background_send_metrics;
pub mod broadcaster;
pub mod feature_refresher;
pub(crate) mod headers;
pub mod unleash_client;
7 changes: 3 additions & 4 deletions server/src/http/unleash_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ use unleash_types::client_metrics::ClientApplication;
use crate::cli::ClientIdentity;
use crate::error::EdgeError::EdgeMetricsRequestError;
use crate::error::{CertificateError, FeatureError};
use crate::http::headers::{
UNLEASH_APPNAME_HEADER, UNLEASH_CLIENT_SPEC_HEADER, UNLEASH_INSTANCE_ID_HEADER,
};
use crate::metrics::client_metrics::MetricsBatch;
use crate::tls::build_upstream_certificate;
use crate::types::{
Expand All @@ -28,10 +31,6 @@ use crate::types::{
use crate::urls::UnleashUrls;
use crate::{error::EdgeError, types::ClientFeaturesRequest};

const UNLEASH_APPNAME_HEADER: &str = "UNLEASH-APPNAME";
const UNLEASH_INSTANCE_ID_HEADER: &str = "UNLEASH-INSTANCEID";
const UNLEASH_CLIENT_SPEC_HEADER: &str = "Unleash-Client-Spec";

lazy_static! {
pub static ref CLIENT_REGISTER_FAILURES: IntGaugeVec = register_int_gauge_vec!(
Opts::new(
Expand Down
10 changes: 9 additions & 1 deletion server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,13 @@ async fn main() -> Result<(), anyhow::Error> {
instance_id: args.clone().instance_id,
};
let app_name = args.app_name.clone();
let custom_headers = match args.mode {
cli::EdgeMode::Edge(ref edge) => edge.custom_client_headers.clone(),
_ => vec![],
};

let internal_backstage_args = args.internal_backstage.clone();

let (
(token_cache, features_cache, engine_cache),
token_validator,
Expand Down Expand Up @@ -157,9 +163,11 @@ async fn main() -> Result<(), anyhow::Error> {
cli::EdgeMode::Edge(edge) => {
let refresher_for_background = feature_refresher.clone().unwrap();
if edge.streaming {
let app_name = app_name.clone();
let custom_headers = custom_headers.clone();
tokio::spawn(async move {
let _ = refresher_for_background
.start_streaming_features_background_task()
.start_streaming_features_background_task(app_name, custom_headers)
.await;
});
}
Expand Down

0 comments on commit 2171000

Please sign in to comment.