diff --git a/components/chainhook-cli/src/archive/mod.rs b/components/chainhook-cli/src/archive/mod.rs index 084ddc3c0..6d8c971ef 100644 --- a/components/chainhook-cli/src/archive/mod.rs +++ b/components/chainhook-cli/src/archive/mod.rs @@ -132,7 +132,7 @@ pub async fn download_stacks_dataset_if_required( ) -> Result { if config.is_initial_ingestion_required() { // Download default tsv. - if config.rely_on_remote_stacks_tsv() && config.should_download_remote_stacks_tsv() { + if config.contains_remote_stacks_tsv_url() && config.should_download_remote_stacks_tsv() { let url = config.expected_remote_stacks_tsv_url()?; let mut tsv_file_path = config.expected_cache_path(); tsv_file_path.push(default_tsv_file_path(&config.network.stacks_network)); diff --git a/components/chainhook-cli/src/cli/mod.rs b/components/chainhook-cli/src/cli/mod.rs index 219950f36..b962fcdd1 100644 --- a/components/chainhook-cli/src/cli/mod.rs +++ b/components/chainhook-cli/src/cli/mod.rs @@ -2,7 +2,7 @@ use crate::config::generator::generate_config; use crate::config::Config; use crate::scan::bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate; use crate::scan::stacks::{ - consolidate_local_stacks_chainstate_using_csv, scan_stacks_chainstate_via_csv_using_predicate, + import_stacks_chainstate_from_remote_tsv, scan_stacks_chainstate_via_csv_using_predicate, scan_stacks_chainstate_via_rocksdb_using_predicate, }; use crate::service::http_api::document_predicate_api_server; @@ -24,6 +24,7 @@ use chainhook_sdk::chainhooks::stacks::StacksChainhookSpecificationNetworkMap; use chainhook_sdk::chainhooks::stacks::StacksPredicate; use chainhook_sdk::chainhooks::stacks::StacksPrintEventBasedPredicate; use chainhook_sdk::chainhooks::types::{ChainhookSpecificationNetworkMap, FileHook, HookAction}; +use chainhook_sdk::try_info; use chainhook_sdk::types::{BitcoinNetwork, BlockIdentifier, StacksNetwork}; use chainhook_sdk::utils::{BlockHeights, Context}; use clap::{Parser, Subcommand}; @@ -342,19 +343,17 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> { ServiceCommand::Start(cmd) => { let mut config = Config::default(cmd.devnet, cmd.testnet, cmd.mainnet, &cmd.config_path)?; - if cmd.prometheus_monitoring_port.is_some() { config.monitoring.prometheus_monitoring_port = cmd.prometheus_monitoring_port; } - let predicates = cmd .predicates_paths .iter() .map(|p| load_predicate_from_path(p)) .collect::, _>>()?; - info!(ctx.expect_logger(), "Starting service...",); - + try_info!(ctx, "Starting chainhook service"); + import_stacks_chainstate_from_remote_tsv(&mut config, &ctx).await?; let mut service = Service::new(config, ctx); return service.run(predicates, None).await; } @@ -541,7 +540,7 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> { }; match open_readonly_stacks_db_conn(&config.expected_cache_path(), &ctx) { Ok(_) => { - let _ = consolidate_local_stacks_chainstate_using_csv( + let _ = import_stacks_chainstate_from_remote_tsv( &mut config, &ctx, ) @@ -812,7 +811,7 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> { } StacksCommand::Db(StacksDbCommand::Update(cmd)) => { let mut config = Config::default(false, false, false, &cmd.config_path)?; - consolidate_local_stacks_chainstate_using_csv(&mut config, &ctx).await?; + import_stacks_chainstate_from_remote_tsv(&mut config, &ctx).await?; } StacksCommand::Db(StacksDbCommand::Check(cmd)) => { let config = Config::default(false, false, false, &cmd.config_path)?; diff --git a/components/chainhook-cli/src/config/mod.rs b/components/chainhook-cli/src/config/mod.rs index cba31806e..768555469 100644 --- a/components/chainhook-cli/src/config/mod.rs +++ b/components/chainhook-cli/src/config/mod.rs @@ -304,6 +304,12 @@ impl Config { destination_path } + pub fn is_cache_path_empty(&self) -> Result { + let mut dir = std::fs::read_dir(self.expected_cache_path()) + .map_err(|e| format!("unable to read cache directory: {e}"))?; + Ok(dir.next().is_none()) + } + fn expected_remote_stacks_tsv_base_url(&self) -> Result<&String, String> { for source in self.event_sources.iter() { if let EventSourceConfig::StacksTsvUrl(config) = source { @@ -323,7 +329,7 @@ impl Config { .map(|url| format!("{}.gz", url)) } - pub fn rely_on_remote_stacks_tsv(&self) -> bool { + pub fn contains_remote_stacks_tsv_url(&self) -> bool { for source in self.event_sources.iter() { if let EventSourceConfig::StacksTsvUrl(_config) = source { return true; diff --git a/components/chainhook-cli/src/scan/stacks.rs b/components/chainhook-cli/src/scan/stacks.rs index 77d74dad5..ebcae0d73 100644 --- a/components/chainhook-cli/src/scan/stacks.rs +++ b/components/chainhook-cli/src/scan/stacks.rs @@ -23,6 +23,7 @@ use crate::{ use chainhook_sdk::{ chainhooks::stacks::evaluate_stacks_chainhook_on_blocks, indexer::{self, stacks::standardize_stacks_serialized_block_header, Indexer}, + try_info, utils::Context, }; use chainhook_sdk::{ @@ -338,11 +339,8 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate( &mut db_conns.signers_db, &block_data.block_identifier, )?; - let (hits_per_events, _) = evaluate_stacks_predicate_on_non_consensus_events( - &events, - predicate_spec, - ctx, - ); + let (hits_per_events, _) = + evaluate_stacks_predicate_on_non_consensus_events(&events, predicate_spec, ctx); if hits_per_blocks.is_empty() && hits_per_events.is_empty() { continue; @@ -584,101 +582,98 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate( Ok(last_block_scanned) } -pub async fn consolidate_local_stacks_chainstate_using_csv( +/// Downloads a remote archive TSV that contains Stacks node events and imports it into chainhook in order to fill up the Stacks +/// blocks database. This import will only happen if chainhook is starting from a fresh install with an empty index. +pub async fn import_stacks_chainstate_from_remote_tsv( config: &mut Config, ctx: &Context, ) -> Result<(), String> { + if !config.is_cache_path_empty()? { + try_info!(ctx, "A Stacks chainstate already exists, skipping TSV chainstante import"); + return Ok(()); + } + if !config.contains_remote_stacks_tsv_url() { + try_info!(ctx, "No remote Stacks TSV location was specified in config file, skipping TSV chainstante import"); + return Ok(()); + } + try_info!(ctx, "Importing Stacks chainstate from TSV"); + + download_stacks_dataset_if_required(config, ctx).await?; + let stacks_db = open_readonly_stacks_db_conn_with_retry(&config.expected_cache_path(), 3, ctx)?; + let confirmed_tip = get_last_block_height_inserted(&stacks_db, ctx); + let mut canonical_fork: VecDeque<(BlockIdentifier, BlockIdentifier, u64)> = + get_canonical_fork_from_tsv(config, confirmed_tip, ctx).await?; + + let mut indexer = Indexer::new(config.network.clone()); + let mut blocks_inserted = 0; + let mut blocks_read = 0; + let blocks_to_insert = canonical_fork.len(); + let stacks_db_rw = open_readwrite_stacks_db_conn(&config.expected_cache_path(), ctx)?; info!( ctx.expect_logger(), - "Building local chainstate from Stacks archive file" + "Beginning import of {} Stacks blocks into rocks db", blocks_to_insert ); + // TODO: To avoid repeating code with `scan_stacks_chainstate_via_csv_using_predicate`, we should move this block + // retrieval code into a reusable function. + let tsv_path = config.expected_local_stacks_tsv_file()?.clone(); + let mut tsv_reader = BufReader::new(File::open(tsv_path).map_err(|e| e.to_string())?); + let mut tsv_current_line = 0; + for (block_identifier, _parent_block_identifier, tsv_line_number) in canonical_fork.drain(..) { + blocks_read += 1; - let downloaded_new_dataset = download_stacks_dataset_if_required(config, ctx).await?; - if downloaded_new_dataset { - let stacks_db = - open_readonly_stacks_db_conn_with_retry(&config.expected_cache_path(), 3, ctx)?; - let confirmed_tip = get_last_block_height_inserted(&stacks_db, ctx); - let mut canonical_fork: VecDeque<(BlockIdentifier, BlockIdentifier, u64)> = - get_canonical_fork_from_tsv(config, confirmed_tip, ctx).await?; - - let mut indexer = Indexer::new(config.network.clone()); - let mut blocks_inserted = 0; - let mut blocks_read = 0; - let blocks_to_insert = canonical_fork.len(); - let stacks_db_rw = open_readwrite_stacks_db_conn(&config.expected_cache_path(), ctx)?; - info!( - ctx.expect_logger(), - "Beginning import of {} Stacks blocks into rocks db", blocks_to_insert - ); - // TODO: To avoid repeating code with `scan_stacks_chainstate_via_csv_using_predicate`, we should move this block - // retrieval code into a reusable function. - let tsv_path = config.expected_local_stacks_tsv_file()?.clone(); - let mut tsv_reader = BufReader::new(File::open(tsv_path).map_err(|e| e.to_string())?); - let mut tsv_current_line = 0; - for (block_identifier, _parent_block_identifier, tsv_line_number) in - canonical_fork.drain(..) - { - blocks_read += 1; + // If blocks already stored, move on + if is_stacks_block_present(&block_identifier, 3, &stacks_db_rw) { + continue; + } + blocks_inserted += 1; - // If blocks already stored, move on - if is_stacks_block_present(&block_identifier, 3, &stacks_db_rw) { - continue; - } - blocks_inserted += 1; - - // Seek to required line from TSV and retrieve its block payload. - let mut tsv_line = String::new(); - while tsv_current_line < tsv_line_number { - tsv_line.clear(); - let bytes_read = tsv_reader - .read_line(&mut tsv_line) - .map_err(|e| e.to_string())?; - if bytes_read == 0 { - return Err("Unexpected EOF when reading TSV".to_string()); - } - tsv_current_line += 1; + // Seek to required line from TSV and retrieve its block payload. + let mut tsv_line = String::new(); + while tsv_current_line < tsv_line_number { + tsv_line.clear(); + let bytes_read = tsv_reader + .read_line(&mut tsv_line) + .map_err(|e| e.to_string())?; + if bytes_read == 0 { + return Err("Unexpected EOF when reading TSV".to_string()); } - let Some(serialized_block) = tsv_line.split('\t').last() else { - return Err("Unable to retrieve serialized block from TSV line".to_string()); - }; - - let block_data = match indexer::stacks::standardize_stacks_serialized_block( - &indexer.config, - serialized_block, - &mut indexer.stacks_context, - ctx, - ) { - Ok(block) => block, - Err(e) => { - error!( - &ctx.expect_logger(), - "Failed to standardize stacks block: {e}" - ); - continue; - } - }; - - // TODO(rafaelcr): Store signer messages - insert_entry_in_stacks_blocks(&block_data, &stacks_db_rw, ctx)?; + tsv_current_line += 1; + } + let Some(serialized_block) = tsv_line.split('\t').last() else { + return Err("Unable to retrieve serialized block from TSV line".to_string()); + }; - if blocks_inserted % 2500 == 0 { - info!( - ctx.expect_logger(), - "Importing Stacks blocks into rocks db: {}/{}", blocks_read, blocks_to_insert + let block_data = match indexer::stacks::standardize_stacks_serialized_block( + &indexer.config, + serialized_block, + &mut indexer.stacks_context, + ctx, + ) { + Ok(block) => block, + Err(e) => { + error!( + &ctx.expect_logger(), + "Failed to standardize stacks block: {e}" ); - let _ = stacks_db_rw.flush(); + continue; } + }; + + // TODO(rafaelcr): Store signer messages + insert_entry_in_stacks_blocks(&block_data, &stacks_db_rw, ctx)?; + + if blocks_inserted % 2500 == 0 { + info!( + ctx.expect_logger(), + "Importing Stacks blocks into rocks db: {}/{}", blocks_read, blocks_to_insert + ); + let _ = stacks_db_rw.flush(); } - let _ = stacks_db_rw.flush(); - info!( - ctx.expect_logger(), - "{blocks_read} Stacks blocks read, {blocks_inserted} inserted" - ); - } else { - info!( - ctx.expect_logger(), - "Skipping database consolidation - no new archive found since last consolidation." - ); } + let _ = stacks_db_rw.flush(); + info!( + ctx.expect_logger(), + "{blocks_read} Stacks blocks read, {blocks_inserted} inserted" + ); Ok(()) } diff --git a/components/chainhook-cli/src/service/mod.rs b/components/chainhook-cli/src/service/mod.rs index ea22ef976..a23e1bbfb 100644 --- a/components/chainhook-cli/src/service/mod.rs +++ b/components/chainhook-cli/src/service/mod.rs @@ -2,7 +2,6 @@ pub(crate) mod http_api; mod runloops; use crate::config::{Config, PredicatesApi, PredicatesApiConfig}; -use crate::scan::stacks::consolidate_local_stacks_chainstate_using_csv; use crate::service::http_api::{load_predicates_from_redis, start_predicate_api_server}; use crate::service::runloops::{start_bitcoin_scan_runloop, start_stacks_scan_runloop}; use crate::storage::signers::{initialize_signers_db, store_signer_db_messages}; @@ -164,11 +163,6 @@ impl Service { let mut event_observer_config = self.config.get_event_observer_config(); event_observer_config.registered_chainhooks = chainhook_store; - // Download and ingest a Stacks dump - if self.config.rely_on_remote_stacks_tsv() { - consolidate_local_stacks_chainstate_using_csv(&mut self.config, &self.ctx).await?; - } - // Stacks scan operation threadpool let (stacks_scan_op_tx, stacks_scan_op_rx) = crossbeam_channel::unbounded(); let ctx = self.ctx.clone(); @@ -292,8 +286,6 @@ impl Service { self.ctx.clone(), ); - let mut stacks_event = 0; - let ctx = self.ctx.clone(); match self.config.http_api { PredicatesApi::On(ref api_config) => { @@ -586,7 +578,6 @@ impl Service { match &chain_event { StacksChainEvent::ChainUpdatedWithBlocks(data) => { - stacks_event += 1; for confirmed_block in &data.confirmed_blocks { if let Some(expired_predicate_uuids) = expire_predicates_for_block( @@ -649,24 +640,6 @@ impl Service { &ctx, ); }; - - // Every 32 blocks, we will check if there's a new Stacks file archive to ingest - if stacks_event > 32 { - stacks_event = 0; - if self.config.rely_on_remote_stacks_tsv() { - if let Err(e) = consolidate_local_stacks_chainstate_using_csv( - &mut self.config, - &self.ctx, - ) - .await - { - error!( - self.ctx.expect_logger(), - "Failed to update database from archive: {e}" - ) - }; - } - } } ObserverEvent::PredicateInterrupted(PredicateInterruptedData { predicate_key, diff --git a/components/chainhook-cli/src/service/tests/helpers/mock_service.rs b/components/chainhook-cli/src/service/tests/helpers/mock_service.rs index e42e39d5a..5de156ad5 100644 --- a/components/chainhook-cli/src/service/tests/helpers/mock_service.rs +++ b/components/chainhook-cli/src/service/tests/helpers/mock_service.rs @@ -2,7 +2,7 @@ use crate::config::{ Config, EventSourceConfig, LimitsConfig, MonitoringConfig, PathConfig, PredicatesApi, PredicatesApiConfig, StorageConfig, DEFAULT_REDIS_URI, }; -use crate::scan::stacks::consolidate_local_stacks_chainstate_using_csv; +use crate::scan::stacks::import_stacks_chainstate_from_remote_tsv; use crate::service::{ http_api::start_predicate_api_server, update_predicate_spec, update_predicate_status, PredicateStatus, Service, @@ -442,7 +442,7 @@ pub async fn setup_stacks_chainhook_test( Some(prometheus_port), ); - consolidate_local_stacks_chainstate_using_csv(&mut config, &ctx) + import_stacks_chainstate_from_remote_tsv(&mut config, &ctx) .await .unwrap_or_else(|e| { std::fs::remove_dir_all(&working_dir).unwrap(); diff --git a/components/chainhook-cli/src/service/tests/runloop_tests.rs b/components/chainhook-cli/src/service/tests/runloop_tests.rs index 43a930464..093f07f8d 100644 --- a/components/chainhook-cli/src/service/tests/runloop_tests.rs +++ b/components/chainhook-cli/src/service/tests/runloop_tests.rs @@ -12,7 +12,7 @@ use chainhook_sdk::{ use crate::{ config::{Config, EventSourceConfig, PathConfig}, - scan::stacks::consolidate_local_stacks_chainstate_using_csv, + scan::stacks::import_stacks_chainstate_from_remote_tsv, service::{ runloops::{ start_bitcoin_scan_runloop, start_stacks_scan_runloop, BitcoinScanOp, StacksScanOp, @@ -49,7 +49,7 @@ async fn test_stacks_runloop_kill_scan() { tracer: false, }; - consolidate_local_stacks_chainstate_using_csv(&mut config, &ctx) + import_stacks_chainstate_from_remote_tsv(&mut config, &ctx) .await .unwrap_or_else(|e| { std::fs::remove_dir_all(&working_dir).unwrap();