Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions crates/flagd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ rpc = ["dep:tonic", "dep:tonic-prost", "dep:prost", "dep:prost-types", "dep:hype
# REST evaluation mode - uses HTTP/OFREP to connect to flagd service
rest = ["dep:reqwest"]
# In-process evaluation mode - local evaluation with gRPC sync or file-based configuration
in-process = ["dep:tonic", "dep:tonic-prost", "dep:prost", "dep:prost-types", "dep:datalogic-rs", "dep:murmurhash3", "dep:semver"]
in-process = ["dep:tonic", "dep:tonic-prost", "dep:prost", "dep:prost-types", "dep:datalogic-rs", "dep:murmurhash3", "dep:semver", "dep:notify"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand All @@ -38,7 +38,6 @@ tonic-prost-build = "0.14"

[dev-dependencies]
cucumber = "0.22"
tokio-stream = "0.1"
futures-core = "0.3"
testcontainers = { version = "0.26.0", features = ["http_wait", "blocking"] }
wiremock = "0.6.5"
Expand All @@ -50,24 +49,23 @@ test-log = { version = "0.2", features = ["trace"] }
[dependencies]
open-feature = "0.2"
async-trait = "0.1"
tokio = { version = "1.48", features = ["full"] }
tokio = { version = "1.48", features = ["sync", "time", "fs", "rt", "rt-multi-thread", "macros", "net"] }
serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] }
lru = "0.16"
futures = "0.3"
tokio-stream = "0.1"
tracing = "0.1"
anyhow = "1.0.100"
thiserror = "2.0"

# RPC and In-Process shared dependencies (gRPC)
tonic = { version = "0.14", optional = true }
tonic = { version = "0.14", default-features = false, features = ["transport", "codegen"], optional = true }
tonic-prost = { version = "0.14", optional = true }
prost = { version = "0.14", optional = true }
prost-types = { version = "0.14", optional = true }

# RPC-specific dependencies
hyper-util = { version = "0.1", features = ["tokio"], optional = true }
tower = { version = "0.5", optional = true }
tower = { version = "0.5", default-features = false, features = ["util"], optional = true }

# REST-specific dependencies
reqwest = { version = "0.12", default-features = false, features = ["json", "stream", "rustls-tls"], optional = true }
Expand All @@ -76,3 +74,4 @@ reqwest = { version = "0.12", default-features = false, features = ["json", "str
datalogic-rs = { version = "4.0.4", optional = true }
murmurhash3 = { version = "0.0.5", optional = true }
semver = { version = "1.0.27", optional = true }
notify = { version = "8.0", optional = true }
2 changes: 1 addition & 1 deletion crates/flagd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ Configurations can be provided as constructor options or via environment variabl
| TLS | FLAGD_TLS | boolean | false | RPC, In-Process |
| Socket Path | FLAGD_SOCKET_PATH | string | "" | RPC |
| Certificate Path | FLAGD_SERVER_CERT_PATH | string | "" | RPC, In-Process |
| Cache Type (LRU / In-Memory / Disabled) | FLAGD_CACHE | string ("lru", "mem", "disabled") | lru | RPC, In-Process, File |
| Cache Type (LRU / In-Memory / Disabled) | FLAGD_CACHE | string ("lru", "mem", "disabled") | In-Process: disabled, others: lru | RPC, In-Process, File |
| Cache TTL (Seconds) | FLAGD_CACHE_TTL | number | 60 | RPC, In-Process, File |
| Max Cache Size | FLAGD_MAX_CACHE_SIZE | number | 1000 | RPC, In-Process, File |
| Offline File Path | FLAGD_OFFLINE_FLAG_SOURCE_PATH | string | "" | File |
Expand Down
42 changes: 42 additions & 0 deletions crates/flagd/examples/common/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
//! Common utilities for flagd examples

use std::time::Duration;
use testcontainers::{
ContainerAsync, GenericImage, ImageExt,
core::{ContainerPort, Mount, WaitFor, logs::LogSource, wait::LogWaitStrategy},
runners::AsyncRunner,
};

pub const FLAGD_SYNC_PORT: u16 = 8015;

/// Start a flagd container configured for in-process sync (port 8015)
pub async fn start_flagd_sync(
flags_path: &str,
flags_file: &str,
) -> Result<(ContainerAsync<GenericImage>, u16), Box<dyn std::error::Error>> {
// Use fsnotify provider for faster file change detection
let sources_config = format!(
r#"[{{"uri":"/flags/{}","provider":"fsnotify"}}]"#,
flags_file
);

let container = GenericImage::new("ghcr.io/open-feature/flagd", "latest")
.with_exposed_port(ContainerPort::Tcp(FLAGD_SYNC_PORT))
.with_wait_for(WaitFor::Log(LogWaitStrategy::new(
LogSource::StdErr,
"Flag IResolver listening at",
)))
.with_mount(Mount::bind_mount(flags_path.to_string(), "/flags"))
.with_cmd(["start", "--sources", &sources_config])
.start()
.await?;

let sync_port = container
.get_host_port_ipv4(ContainerPort::Tcp(FLAGD_SYNC_PORT))
.await?;

// Give flagd a moment to fully initialize
tokio::time::sleep(Duration::from_millis(500)).await;

Ok((container, sync_port))
}
14 changes: 14 additions & 0 deletions crates/flagd/examples/flags/basic-flags.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"$schema": "https://flagd.dev/schema/v0/flags.json",
"flags": {
"basic-boolean": {
"state": "ENABLED",
"defaultVariant": "false",
"variants": {
"true": true,
"false": false
},
"targeting": {}
}
}
}
55 changes: 55 additions & 0 deletions crates/flagd/examples/in_process.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
//! In-process evaluation example using testcontainers with flagd
//!
//! This example demonstrates in-process flag evaluation by periodically
//! evaluating a boolean flag. Edit `examples/flags/basic-flags.json` while
//! running to see live flag updates.
//!
//! Run with: cargo run --example in_process --all-features
//!
//! Then edit basic-flags.json and change "defaultVariant": "false" to "true"
//! to see the flag value change in real-time.

mod common;

use common::start_flagd_sync;
use open_feature::EvaluationContext;
use open_feature::provider::FeatureProvider;
use open_feature_flagd::{FlagdOptions, FlagdProvider, ResolverType};
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let manifest_dir = env!("CARGO_MANIFEST_DIR");
let flags_path = format!("{}/examples/flags", manifest_dir);

println!("Starting flagd container...");
let (_container, sync_port) = start_flagd_sync(&flags_path, "basic-flags.json").await?;
println!("flagd sync service available on port {}", sync_port);

// Configure the flagd provider for in-process evaluation
let provider = FlagdProvider::new(FlagdOptions {
host: "localhost".to_string(),
port: sync_port,
resolver_type: ResolverType::InProcess,
..Default::default()
})
.await
.expect("Failed to create provider");

let ctx = EvaluationContext::default();

println!("\nEvaluating 'basic-boolean' flag every 2 seconds...");
println!("Edit examples/flags/basic-flags.json to change the flag value.");
println!("Press Ctrl+C to stop.\n");

loop {
let result = provider
.resolve_bool_value("basic-boolean", &ctx)
.await
.expect("Failed to resolve flag");

println!("basic-boolean = {}", result.value);

tokio::time::sleep(Duration::from_secs(2)).await;
}
}
26 changes: 22 additions & 4 deletions crates/flagd/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use thiserror::Error;

/// Error type for flagd operations
#[derive(Error, Debug)]
pub enum FlagdError {
#[error("Provider error: {0}")]
Expand All @@ -8,9 +9,20 @@ pub enum FlagdError {
Connection(String),
#[error("Invalid configuration: {0}")]
Config(String),
#[error("Sync error: {0}")]
Sync(String),
#[error("Parse error: {0}")]
Parse(String),
#[error("Timeout: {0}")]
Timeout(String),
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("JSON error: {0}")]
Json(#[from] serde_json::Error),
#[error("Channel send error: {0}")]
Channel(String),
}

// Add implementations for error conversion
impl From<Box<dyn std::error::Error>> for FlagdError {
fn from(error: Box<dyn std::error::Error>) -> Self {
FlagdError::Provider(error.to_string())
Expand All @@ -23,8 +35,14 @@ impl From<Box<dyn std::error::Error + Send + Sync>> for FlagdError {
}
}

impl From<anyhow::Error> for FlagdError {
fn from(error: anyhow::Error) -> Self {
FlagdError::Provider(error.to_string())
impl<T> From<tokio::sync::mpsc::error::SendError<T>> for FlagdError {
fn from(error: tokio::sync::mpsc::error::SendError<T>) -> Self {
FlagdError::Channel(error.to_string())
}
}

impl From<tokio::time::error::Elapsed> for FlagdError {
fn from(error: tokio::time::error::Elapsed) -> Self {
FlagdError::Timeout(error.to_string())
}
}
16 changes: 15 additions & 1 deletion crates/flagd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@
//! | TLS | FLAGD_TLS | boolean | false | RPC, In-Process |
//! | Socket Path | FLAGD_SOCKET_PATH | string | "" | RPC |
//! | Certificate Path | FLAGD_SERVER_CERT_PATH | string | "" | RPC, In-Process |
//! | Cache Type (LRU / In-Memory / Disabled) | FLAGD_CACHE | string ("lru", "mem", "disabled") | lru | RPC, In-Process, File |
//! | Cache Type (LRU / In-Memory / Disabled) | FLAGD_CACHE | string ("lru", "mem", "disabled") | In-Process: disabled, others: lru | RPC, In-Process, File |
//! | Cache TTL (Seconds) | FLAGD_CACHE_TTL | number | 60 | RPC, In-Process, File |
//! | Max Cache Size | FLAGD_MAX_CACHE_SIZE | number | 1000 | RPC, In-Process, File |
//! | Offline File Path | FLAGD_OFFLINE_FLAG_SOURCE_PATH | string | "" | File |
Expand Down Expand Up @@ -297,6 +297,9 @@ pub struct FlagdOptions {
/// The deadline in milliseconds for event streaming operations. Set to 0 to disable.
/// Recommended to prevent infrastructure from killing idle connections.
pub stream_deadline_ms: u32,
/// HTTP/2 keepalive time in milliseconds. Sends pings to keep connections alive during
/// idle periods, allowing RPCs to start quickly without delay. Set to 0 to disable.
pub keep_alive_time_ms: u64,
/// Offline polling interval in milliseconds
pub offline_poll_interval_ms: Option<u32>,
/// Provider ID for identifying this provider instance to flagd
Expand Down Expand Up @@ -358,6 +361,10 @@ impl Default for FlagdOptions {
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(600000),
keep_alive_time_ms: std::env::var("FLAGD_KEEP_ALIVE_TIME_MS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(0), // Disabled by default, per gherkin spec
socket_path: std::env::var("FLAGD_SOCKET_PATH").ok(),
selector: std::env::var("FLAGD_SOURCE_SELECTOR").ok(),
cache_settings: Some(CacheSettings::default()),
Expand All @@ -378,6 +385,13 @@ impl Default for FlagdOptions {
// Only override to File if FLAGD_RESOLVER wasn't explicitly set
options.resolver_type = ResolverType::File;
}
// Disable caching for in-process/file modes per spec (caching is RPC-only)
if matches!(
options.resolver_type,
ResolverType::InProcess | ResolverType::File
) {
options.cache_settings = None;
}
}

options
Expand Down
41 changes: 21 additions & 20 deletions crates/flagd/src/resolver/common/upstream.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,45 @@
use anyhow::{Context, Result};
use crate::error::FlagdError;
use std::str::FromStr;
use tonic::transport::{Endpoint, Uri};
use tracing::debug;

pub struct UpstreamConfig {
endpoint: Endpoint,
authority: Uri,
authority: Option<String>, // Only set for custom name resolution (envoy://)
}

impl UpstreamConfig {
pub fn new(target: String, is_in_process: bool) -> Result<Self> {
pub fn new(target: String, is_in_process: bool) -> Result<Self, FlagdError> {
debug!("Creating upstream config for target: {}", target);

if target.starts_with("http://") {
debug!("Target is already an HTTP endpoint");
let uri = Uri::from_str(&target)?;
let endpoint = Endpoint::from_shared(target)?;
let endpoint = Endpoint::from_shared(target)
.map_err(|e| FlagdError::Config(format!("Invalid endpoint: {}", e)))?;
return Ok(Self {
endpoint,
authority: uri
.authority()
.map(|a| a.as_str())
.unwrap_or_default()
.parse()?,
authority: None, // Standard HTTP doesn't need custom authority
});
}

let (endpoint_str, authority) = if target.starts_with("envoy://") {
let uri = Uri::from_str(&target).context("Failed to parse target URI")?;
let uri = Uri::from_str(&target)
.map_err(|e| FlagdError::Config(format!("Failed to parse target URI: {}", e)))?;
let authority = uri.path().trim_start_matches('/');

if authority.is_empty() {
return Err(anyhow::anyhow!("Service name (authority) cannot be empty"));
return Err(FlagdError::Config(
"Service name (authority) cannot be empty".to_string(),
));
}

let host = uri.host().unwrap_or("localhost");
let port = uri.port_u16().unwrap_or(9211); // Use Envoy port directly

(format!("http://{}:{}", host, port), authority.to_string())
(
format!("http://{}:{}", host, port),
Some(authority.to_string()),
)
} else {
let parts: Vec<&str> = target.split(':').collect();
let host = parts.first().unwrap_or(&"localhost").to_string();
Expand All @@ -47,24 +49,23 @@ impl UpstreamConfig {
.unwrap_or(if is_in_process { 8015 } else { 8013 });

debug!("Using standard resolution with {}:{}", host, port);
(format!("http://{}:{}", host, port), host)
(format!("http://{}:{}", host, port), None) // Standard resolution doesn't need custom authority
};

let endpoint = Endpoint::from_shared(endpoint_str)?;
let authority_uri =
Uri::from_str(authority.as_str()).context("Failed to parse authority")?;
let endpoint = Endpoint::from_shared(endpoint_str)
.map_err(|e| FlagdError::Config(format!("Invalid endpoint: {}", e)))?;

Ok(Self {
endpoint,
authority: authority_uri,
authority,
})
}

pub fn endpoint(&self) -> &Endpoint {
&self.endpoint
}

pub fn authority(&self) -> &Uri {
&self.authority
pub fn authority(&self) -> Option<String> {
self.authority.clone()
}
}
10 changes: 6 additions & 4 deletions crates/flagd/src/resolver/in_process/model/flag_parser.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
use super::feature_flag::FeatureFlag;
use super::feature_flag::ParsingResult;
use anyhow::Result;
use crate::error::FlagdError;
use serde_json::{Map, Value};
use std::collections::HashMap;

pub struct FlagParser;

impl FlagParser {
pub fn parse_string(configuration: &str) -> Result<ParsingResult> {
pub fn parse_string(configuration: &str) -> Result<ParsingResult, FlagdError> {
let value: Value = serde_json::from_str(configuration)?;
let obj = value
.as_object()
.ok_or_else(|| anyhow::anyhow!("Invalid JSON structure"))?;
.ok_or_else(|| FlagdError::Parse("Invalid JSON structure".to_string()))?;

let flags = obj
.get("flags")
.and_then(|v| v.as_object())
.ok_or_else(|| anyhow::anyhow!("No flag configurations found in the payload"))?;
.ok_or_else(|| {
FlagdError::Parse("No flag configurations found in the payload".to_string())
})?;

let flag_set_metadata = obj
.get("metadata")
Expand Down
Loading