Skip to content

Commit

Permalink
fix(publisher): Allow healthcheck while awaiting relayer node sync (#347
Browse files Browse the repository at this point in the history
)
  • Loading branch information
Jurshsmith authored Dec 5, 2024
1 parent b35dfd4 commit e369592
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ mod tests {
fn blocks_subscription(&self) -> broadcast::Receiver<FuelCoreImporterResult>;
async fn start(&self) -> anyhow::Result<()>;
fn is_started(&self) -> bool;
async fn await_synced_at_least_once(&self) -> anyhow::Result<()>;
async fn stop(&self);
fn base_asset_id(&self) -> &FuelCoreAssetId;
fn chain_id(&self) -> &FuelCoreChainId;
Expand Down
8 changes: 5 additions & 3 deletions crates/fuel-streams-publisher/src/publisher/fuel_core_like.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub type OffchainDatabase = GenericDatabase<
pub trait FuelCoreLike: Sync + Send {
async fn start(&self) -> anyhow::Result<()>;
fn is_started(&self) -> bool;
async fn await_synced_at_least_once(&self) -> anyhow::Result<()>;
async fn stop(&self);

fn base_asset_id(&self) -> &FuelCoreAssetId;
Expand Down Expand Up @@ -163,8 +164,6 @@ impl FuelCore {
let fuel_service =
fuel_core_bin::cli::run::get_service(command).await?;

fuel_service.await_relayer_synced().await?;

let fuel_core: Self = fuel_service.into();

Ok(fuel_core.arc())
Expand All @@ -183,10 +182,13 @@ impl FuelCoreLike for FuelCore {

Ok(())
}

fn is_started(&self) -> bool {
self.fuel_service.state().started()
}
async fn await_synced_at_least_once(&self) -> anyhow::Result<()> {
self.fuel_service.await_relayer_synced().await?;
Ok(())
}

async fn stop(&self) {
if matches!(
Expand Down
11 changes: 11 additions & 0 deletions crates/fuel-streams-publisher/src/publisher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ impl Publisher {
})
}

pub fn is_healthy(&self) -> bool {
// TODO: Update this condition to include more health checks
self.fuel_core.is_started() && self.nats_client.is_connected()
}

#[cfg(feature = "test-helpers")]
pub async fn default(
nats_client: &NatsClient,
Expand Down Expand Up @@ -99,6 +104,12 @@ impl Publisher {
&self,
shutdown_token: ShutdownToken,
) -> anyhow::Result<()> {
tracing::info!("Awaiting FuelCore Sync...");

self.fuel_core.await_synced_at_least_once().await?;

tracing::info!("FuelCore has synced successfully!");

tracing::info!("Publishing started...");

let mut blocks_stream = build_blocks_stream(
Expand Down
8 changes: 1 addition & 7 deletions crates/fuel-streams-publisher/src/server/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,7 @@ impl ServerState {

impl ServerState {
pub fn is_healthy(&self) -> bool {
if !self.publisher.fuel_core.is_started() {
return false;
}
if !self.publisher.nats_client.is_connected() {
return false;
}
true
self.publisher.is_healthy()
}

pub async fn get_health(&self) -> HealthResponse {
Expand Down
3 changes: 3 additions & 0 deletions tests/tests/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ impl FuelCoreLike for TestFuelCore {
fn is_started(&self) -> bool {
true
}
async fn await_synced_at_least_once(&self) -> anyhow::Result<()> {
Ok(())
}
async fn stop(&self) {}

async fn await_offchain_db_sync(
Expand Down

0 comments on commit e369592

Please sign in to comment.