Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,7 @@ lcov.info

**/build/**

Move.lock
Move.lock

# local configs
*.local.yaml
24 changes: 22 additions & 2 deletions crates/key-server/src/aggregator/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ impl NetworkConfig for AggregatorOptions {
struct AppState {
aggregator_metrics: Arc<AggregatorMetrics>,
grpc_client: SuiGrpcClient,
http_client: reqwest::Client,
threshold: Arc<RwLock<u16>>,
committee_members: Arc<RwLock<VecMap<Address, PartialKeyServer>>>,
options: AggregatorOptions,
Expand Down Expand Up @@ -318,6 +319,7 @@ async fn handle_fetch_key(
let ks_version_req = &state.options.key_server_version_requirement;
let api_credentials = &state.options.api_credentials;
let metrics = state.aggregator_metrics.clone();
let http_client = state.http_client.clone();
let mut fetch_tasks: FuturesUnordered<_> = state
.committee_members
.read()
Expand All @@ -331,6 +333,7 @@ async fn handle_fetch_key(
let ks_version_req = ks_version_req.clone();
let api_creds = api_credentials.get(&partial_key_server.name).cloned();
let metrics = metrics.clone();
let http_client = http_client.clone();
async move {
// Check if API credentials exist for this server.
let creds = match api_creds {
Expand All @@ -351,6 +354,7 @@ async fn handle_fetch_key(
req_id,
&ks_version_req,
creds,
&http_client,
)
.await
{
Expand Down Expand Up @@ -446,13 +450,12 @@ async fn fetch_from_member(
req_id: &str,
ks_version_req: &VersionReq,
api_credentials: ApiCredentials,
client: &reqwest::Client,
) -> Result<FetchKeyResponse, ErrorResponse> {
info!(
"Fetching from party {} at {} (req_id: {})",
member.party_id, member.url, req_id
);

let client = reqwest::Client::new();
let request_builder = client
.post(format!("{}/v1/fetch_key", member.url))
.header(HEADER_CLIENT_SDK_TYPE, SDK_TYPE_AGGREGATOR)
Expand Down Expand Up @@ -664,9 +667,19 @@ async fn load_committee_state(
// Check and warn about missing API credentials for current committee.
check_missing_api_credentials(&members, &options.api_credentials);

// Create HTTP client with increased connection pool limits for localhost testing
// and high-concurrency production scenarios
let http_client = reqwest::Client::builder()
.pool_max_idle_per_host(500) // Allow 500 idle connections per host (for localhost testing)
.pool_idle_timeout(Duration::from_secs(90))
.timeout(Duration::from_secs(10))
.build()
.context("Failed to create HTTP client")?;

Ok(AppState {
aggregator_metrics: metrics,
grpc_client,
http_client,
committee_members: Arc::new(RwLock::new(members)),
threshold: Arc::new(RwLock::new(threshold)),
options,
Expand Down Expand Up @@ -841,10 +854,17 @@ mod tests {
let registry = Registry::new();
let metrics = Arc::new(AggregatorMetrics::new(&registry));
let grpc_client = SuiGrpcClient::new(options.node_url()).unwrap();
let http_client = reqwest::Client::builder()
.pool_max_idle_per_host(500)
.pool_idle_timeout(Duration::from_secs(90))
.timeout(Duration::from_secs(10))
.build()
.unwrap();

AppState {
aggregator_metrics: metrics,
grpc_client,
http_client,
threshold: Arc::new(RwLock::new(threshold)),
committee_members: Arc::new(RwLock::new(VecMap(SuiVecMap {
contents: committee_contents,
Expand Down
2 changes: 1 addition & 1 deletion crates/key-server/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use moka::sync::Cache;
use std::hash::Hash;
use std::time::Duration;

pub(crate) const DEFAULT_SIZE: u64 = 1000;
pub(crate) const DEFAULT_SIZE: u64 = 0;
pub(crate) const DEFAULT_TTL_IN_MILLIS: u64 = 60 * 60 * 1000; // 1 hour

/// Creates a new thread-safe LRU cache with the specified TTL and size.
Expand Down
Loading
Loading