diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index 8781859..29f80c5 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -26,7 +26,7 @@ jobs: - name: Run llvm-cov run: | - cargo llvm-cov nextest --release --lcov --output-path lcov.info + cargo llvm-cov nextest --release --lcov --output-path lcov.info --test-threads=1 - name: Upload coverage to codecov.io uses: codecov/codecov-action@v3 diff --git a/Cargo.lock b/Cargo.lock index 2be6c7f..a53256b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -467,6 +467,12 @@ version = "1.0.81" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0952808a6c2afd1aa8947271f3a60f1a6763c7b912d210184c5149b5cf147247" +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "ark-ff" version = "0.3.0" @@ -3902,6 +3908,8 @@ dependencies = [ name = "orchestrator" version = "0.1.0" dependencies = [ + "arc-swap", + "async-std", "async-trait", "axum 0.7.5", "axum-macros", @@ -4330,7 +4338,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "efb6c9a1dd1def8e2124d17e83a20af56f1570d6c2d2bd9e266ccb768df3840e" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools 0.11.0", "proc-macro2", "quote", "syn 2.0.58", diff --git a/Cargo.toml b/Cargo.toml index fce6b65..1de2544 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,5 +42,6 @@ uuid = { version = "1.7.0" } num-bigint = { version = "0.4.4" } httpmock = { version = "0.7.0" } utils = { path = "crates/utils" } +arc-swap = { version = "1.7.1" } num-traits = "0.2" lazy_static = "1.4.0" diff --git a/crates/orchestrator/Cargo.toml b/crates/orchestrator/Cargo.toml index 99aa2ad..0dd7365 100644 --- a/crates/orchestrator/Cargo.toml +++ b/crates/orchestrator/Cargo.toml @@ -12,7 +12,8 @@ name = "orchestrator" path = "src/main.rs" [dependencies] - +arc-swap = { workspace = true } +async-std = "1.12.0" async-trait = { workspace = true } axum = { workspace = true, features = ["macros"] } axum-macros = { workspace = true } diff --git a/crates/orchestrator/src/config.rs b/crates/orchestrator/src/config.rs index 1e3d81f..ef05d9e 100644 --- a/crates/orchestrator/src/config.rs +++ b/crates/orchestrator/src/config.rs @@ -4,6 +4,7 @@ use crate::database::{Database, DatabaseConfig}; use crate::queue::sqs::SqsQueue; use crate::queue::QueueProvider; use crate::utils::env_utils::get_env_var_or_panic; +use arc_swap::{ArcSwap, Guard}; use da_client_interface::DaClient; use da_client_interface::DaConfig; use dotenvy::dotenv; @@ -79,11 +80,29 @@ impl Config { /// The app config. It can be accessed from anywhere inside the service. /// It's initialized only once. -pub static CONFIG: OnceCell = OnceCell::const_new(); +/// We are using `ArcSwap` as it allow us to replace the new `Config` with +/// a new one which is required when running test cases. This approach was +/// inspired from here - https://github.com/matklad/once_cell/issues/127 +pub static CONFIG: OnceCell> = OnceCell::const_new(); /// Returns the app config. Initializes if not already done. -pub async fn config() -> &'static Config { - CONFIG.get_or_init(init_config).await +pub async fn config() -> Guard> { + let cfg = CONFIG.get_or_init(|| async { ArcSwap::from_pointee(init_config().await) }).await; + cfg.load() +} + +/// OnceCell only allows us to initialize the config once and that's how it should be on production. +/// However, when running tests, we often want to reinitialize because we want to clear the DB and +/// set it up again for reuse in new tests. By calling `config_force_init` we replace the already +/// stored config inside `ArcSwap` with the new configuration and pool settings. +#[cfg(test)] +pub async fn config_force_init(config: Config) { + match CONFIG.get() { + Some(arc) => arc.store(Arc::new(config)), + None => { + CONFIG.get_or_init(|| async { ArcSwap::from_pointee(config) }).await; + } + } } /// Builds the DA client based on the environment variable DA_LAYER diff --git a/crates/orchestrator/src/database/mod.rs b/crates/orchestrator/src/database/mod.rs index e9639cf..3da862a 100644 --- a/crates/orchestrator/src/database/mod.rs +++ b/crates/orchestrator/src/database/mod.rs @@ -33,6 +33,7 @@ pub trait Database: Send + Sync { ) -> Result<()>; async fn update_metadata(&self, job: &JobItem, metadata: HashMap) -> Result<()>; + async fn get_latest_job_by_type_and_internal_id(&self, job_type: JobType) -> Result>; } pub trait DatabaseConfig { diff --git a/crates/orchestrator/src/database/mongodb/mod.rs b/crates/orchestrator/src/database/mongodb/mod.rs index b86d4bd..308fb43 100644 --- a/crates/orchestrator/src/database/mongodb/mod.rs +++ b/crates/orchestrator/src/database/mongodb/mod.rs @@ -5,7 +5,7 @@ use async_trait::async_trait; use color_eyre::eyre::eyre; use color_eyre::Result; use mongodb::bson::Document; -use mongodb::options::UpdateOptions; +use mongodb::options::{FindOneOptions, UpdateOptions}; use mongodb::{ bson::doc, options::{ClientOptions, ServerApi, ServerApiVersion}, @@ -115,4 +115,16 @@ impl Database for MongoDb { self.update_job_optimistically(job, update).await?; Ok(()) } + + async fn get_latest_job_by_type_and_internal_id(&self, job_type: JobType) -> Result> { + let filter = doc! { + "job_type": mongodb::bson::to_bson(&job_type)?, + }; + let find_options = FindOneOptions::builder().sort(doc! { "internal_id": -1 }).build(); + Ok(self + .get_job_collection() + .find_one(filter, find_options) + .await + .expect("Failed to fetch latest job by given job type")) + } } diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index e19d4c3..8e357a0 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -61,7 +61,7 @@ pub async fn create_job(job_type: JobType, internal_id: String, metadata: HashMa } let job_handler = get_job_handler(&job_type); - let job_item = job_handler.create_job(config, internal_id, metadata).await?; + let job_item = job_handler.create_job(config.as_ref(), internal_id, metadata).await?; config.database().create_job(job_item.clone()).await?; add_job_to_process_queue(job_item.id).await?; @@ -90,7 +90,7 @@ pub async fn process_job(id: Uuid) -> Result<()> { config.database().update_job_status(&job, JobStatus::LockedForProcessing).await?; let job_handler = get_job_handler(&job.job_type); - let external_id = job_handler.process_job(config, &job).await?; + let external_id = job_handler.process_job(config.as_ref(), &job).await?; let metadata = increment_key_in_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)?; config @@ -122,7 +122,7 @@ pub async fn verify_job(id: Uuid) -> Result<()> { } let job_handler = get_job_handler(&job.job_type); - let verification_status = job_handler.verify_job(config, &job).await?; + let verification_status = job_handler.verify_job(config.as_ref(), &job).await?; match verification_status { JobVerificationStatus::Verified => { @@ -170,6 +170,7 @@ pub async fn verify_job(id: Uuid) -> Result<()> { fn get_job_handler(job_type: &JobType) -> Box { match job_type { JobType::DataSubmission => Box::new(da_job::DaJob), + JobType::SnosRun => Box::new(snos_job::SnosJob), _ => unimplemented!("Job type not implemented yet."), } } diff --git a/crates/orchestrator/src/jobs/types.rs b/crates/orchestrator/src/jobs/types.rs index bcd8556..7640cca 100644 --- a/crates/orchestrator/src/jobs/types.rs +++ b/crates/orchestrator/src/jobs/types.rs @@ -7,7 +7,7 @@ use std::collections::HashMap; use uuid::Uuid; /// An external id. -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] #[serde(untagged)] pub enum ExternalId { /// A string. @@ -98,7 +98,7 @@ pub enum JobStatus { VerificationFailed, } -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] pub struct JobItem { /// an uuid to identify a job #[serde(with = "uuid_1_as_binary")] diff --git a/crates/orchestrator/src/main.rs b/crates/orchestrator/src/main.rs index b650268..4df82c9 100644 --- a/crates/orchestrator/src/main.rs +++ b/crates/orchestrator/src/main.rs @@ -26,7 +26,7 @@ async fn main() { // init consumer init_consumers().await.expect("Failed to init consumers"); - // spawn a thread for each worker + // spawn a thread for each workers // changes in rollup mode - sovereign, validity, validiums etc. // will likely involve changes in these workers as well tokio::spawn(start_cron(Box::new(SnosWorker), 60)); @@ -40,7 +40,7 @@ async fn main() { async fn start_cron(worker: Box, interval: u64) { loop { - worker.run_worker().await; + worker.run_worker().await.expect("Error in running the worker."); tokio::time::sleep(tokio::time::Duration::from_secs(interval)).await; } } diff --git a/crates/orchestrator/src/queue/job_queue.rs b/crates/orchestrator/src/queue/job_queue.rs index 78fbd28..709d3aa 100644 --- a/crates/orchestrator/src/queue/job_queue.rs +++ b/crates/orchestrator/src/queue/job_queue.rs @@ -14,8 +14,8 @@ const JOB_PROCESSING_QUEUE: &str = "madara_orchestrator_job_processing_queue"; const JOB_VERIFICATION_QUEUE: &str = "madara_orchestrator_job_verification_queue"; #[derive(Debug, Serialize, Deserialize)] -struct JobQueueMessage { - id: Uuid, +pub struct JobQueueMessage { + pub(crate) id: Uuid, } pub async fn add_job_to_process_queue(id: Uuid) -> Result<()> { diff --git a/crates/orchestrator/src/tests/mod.rs b/crates/orchestrator/src/tests/mod.rs index 1da4304..cbd1bc0 100644 --- a/crates/orchestrator/src/tests/mod.rs +++ b/crates/orchestrator/src/tests/mod.rs @@ -7,3 +7,4 @@ pub mod server; pub mod queue; pub mod common; +mod workers; diff --git a/crates/orchestrator/src/tests/workers/mod.rs b/crates/orchestrator/src/tests/workers/mod.rs new file mode 100644 index 0000000..18f0ee8 --- /dev/null +++ b/crates/orchestrator/src/tests/workers/mod.rs @@ -0,0 +1,99 @@ +use crate::config::config_force_init; +use crate::database::MockDatabase; +use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType}; +use crate::queue::MockQueueProvider; +use crate::tests::common::init_config; +use crate::workers::snos::SnosWorker; +use crate::workers::Worker; +use da_client_interface::MockDaClient; +use httpmock::MockServer; +use mockall::predicate::eq; +use rstest::rstest; +use serde_json::json; +use std::collections::HashMap; +use std::error::Error; +use uuid::Uuid; + +#[rstest] +#[case(false)] +#[case(true)] +#[tokio::test] +async fn test_snos_worker(#[case] db_val: bool) -> Result<(), Box> { + let server = MockServer::start(); + let da_client = MockDaClient::new(); + let mut db = MockDatabase::new(); + let mut queue = MockQueueProvider::new(); + let start_job_index; + let block; + + const JOB_PROCESSING_QUEUE: &str = "madara_orchestrator_job_processing_queue"; + + // Mocking db function expectations + if !db_val { + db.expect_get_latest_job_by_type_and_internal_id().times(1).with(eq(JobType::SnosRun)).returning(|_| Ok(None)); + start_job_index = 1; + block = 5; + } else { + let uuid_temp = Uuid::new_v4(); + + db.expect_get_latest_job_by_type_and_internal_id() + .with(eq(JobType::SnosRun)) + .returning(move |_| Ok(Some(get_job_item_mock_by_id("1".to_string(), uuid_temp)))); + block = 6; + start_job_index = 2; + } + + for i in start_job_index..block + 1 { + // Getting jobs for check expectations + db.expect_get_job_by_internal_id_and_type() + .times(1) + .with(eq(i.clone().to_string()), eq(JobType::SnosRun)) + .returning(|_, _| Ok(None)); + + let uuid = Uuid::new_v4(); + + // creating jobs call expectations + db.expect_create_job() + .times(1) + .withf(move |item| item.internal_id == i.clone().to_string()) + .returning(move |_| Ok(get_job_item_mock_by_id(i.clone().to_string(), uuid))); + } + + // Queue function call simulations + queue + .expect_send_message_to_queue() + .returning(|_, _, _| Ok(())) + .withf(|queue, _payload, _delay| queue == JOB_PROCESSING_QUEUE); + + // mock block number (madara) : 5 + let rpc_response_block_number = block; + let response = json!({ "id": 1,"jsonrpc":"2.0","result": rpc_response_block_number }); + let config = + init_config(Some(format!("http://localhost:{}", server.port())), Some(db), Some(queue), Some(da_client)).await; + config_force_init(config).await; + + // mocking block call + let rpc_block_call_mock = server.mock(|when, then| { + when.path("/").body_contains("starknet_blockNumber"); + then.status(200).body(serde_json::to_vec(&response).unwrap()); + }); + + let snos_worker = SnosWorker {}; + snos_worker.run_worker().await?; + + rpc_block_call_mock.assert(); + + Ok(()) +} + +fn get_job_item_mock_by_id(id: String, uuid: Uuid) -> JobItem { + JobItem { + id: uuid, + internal_id: id.clone(), + job_type: JobType::SnosRun, + status: JobStatus::Created, + external_id: ExternalId::Number(0), + metadata: HashMap::new(), + version: 0, + } +} diff --git a/crates/orchestrator/src/workers/mod.rs b/crates/orchestrator/src/workers/mod.rs index 846d0c8..7d198c0 100644 --- a/crates/orchestrator/src/workers/mod.rs +++ b/crates/orchestrator/src/workers/mod.rs @@ -1,4 +1,5 @@ use async_trait::async_trait; +use std::error::Error; pub mod proof_registration; pub mod proving; @@ -7,5 +8,5 @@ pub mod update_state; #[async_trait] pub trait Worker: Send + Sync { - async fn run_worker(&self); + async fn run_worker(&self) -> Result<(), Box>; } diff --git a/crates/orchestrator/src/workers/proof_registration.rs b/crates/orchestrator/src/workers/proof_registration.rs index 5ad5bc2..a51bd41 100644 --- a/crates/orchestrator/src/workers/proof_registration.rs +++ b/crates/orchestrator/src/workers/proof_registration.rs @@ -1,5 +1,6 @@ use crate::workers::Worker; use async_trait::async_trait; +use std::error::Error; pub struct ProofRegistrationWorker; @@ -8,7 +9,7 @@ impl Worker for ProofRegistrationWorker { /// 1. Fetch all blocks with a successful proving job run /// 2. Group blocks that have the same proof /// 3. For each group, create a proof registration job with from and to block in metadata - async fn run_worker(&self) { + async fn run_worker(&self) -> Result<(), Box> { todo!() } } diff --git a/crates/orchestrator/src/workers/proving.rs b/crates/orchestrator/src/workers/proving.rs index 9476ea7..61bc19c 100644 --- a/crates/orchestrator/src/workers/proving.rs +++ b/crates/orchestrator/src/workers/proving.rs @@ -1,5 +1,6 @@ use crate::workers::Worker; use async_trait::async_trait; +use std::error::Error; pub struct ProvingWorker; @@ -7,7 +8,7 @@ pub struct ProvingWorker; impl Worker for ProvingWorker { /// 1. Fetch all successful SNOS job runs that don't have a proving job /// 2. Create a proving job for each SNOS job run - async fn run_worker(&self) { + async fn run_worker(&self) -> Result<(), Box> { todo!() } } diff --git a/crates/orchestrator/src/workers/snos.rs b/crates/orchestrator/src/workers/snos.rs index bdc0416..f116cf6 100644 --- a/crates/orchestrator/src/workers/snos.rs +++ b/crates/orchestrator/src/workers/snos.rs @@ -1,5 +1,11 @@ +use crate::config::config; +use crate::jobs::create_job; +use crate::jobs::types::JobType; use crate::workers::Worker; use async_trait::async_trait; +use starknet::providers::Provider; +use std::collections::HashMap; +use std::error::Error; pub struct SnosWorker; @@ -8,7 +14,31 @@ impl Worker for SnosWorker { /// 1. Fetch the latest completed block from the Starknet chain /// 2. Fetch the last block that had a SNOS job run. /// 3. Create SNOS run jobs for all the remaining blocks - async fn run_worker(&self) { - todo!() + async fn run_worker(&self) -> Result<(), Box> { + let config = config().await; + let provider = config.starknet_client(); + let latest_block_number = provider.block_number().await?; + let latest_block_processed_data = config + .database() + .get_latest_job_by_type_and_internal_id(JobType::SnosRun) + .await + .unwrap() + .map(|item| item.internal_id) + .unwrap_or("0".to_string()); + + let latest_block_processed: u64 = latest_block_processed_data.parse()?; + + let block_diff = latest_block_number - latest_block_processed; + + // if all blocks are processed + if block_diff == 0 { + return Ok(()); + } + + for x in latest_block_processed + 1..latest_block_number + 1 { + create_job(JobType::SnosRun, x.to_string(), HashMap::new()).await?; + } + + Ok(()) } } diff --git a/crates/orchestrator/src/workers/update_state.rs b/crates/orchestrator/src/workers/update_state.rs index c359e99..faca16a 100644 --- a/crates/orchestrator/src/workers/update_state.rs +++ b/crates/orchestrator/src/workers/update_state.rs @@ -1,5 +1,6 @@ use crate::workers::Worker; use async_trait::async_trait; +use std::error::Error; pub struct UpdateStateWorker; @@ -8,7 +9,7 @@ impl Worker for UpdateStateWorker { /// 1. Fetch the last succesful state update job /// 2. Fetch all succesful proving jobs covering blocks after the last state update /// 3. Create state updates for all the blocks that don't have a state update job - async fn run_worker(&self) { + async fn run_worker(&self) -> Result<(), Box> { todo!() } }