From bb2c99d94ec5f4a4143aeba12af51cfb8c56ad7e Mon Sep 17 00:00:00 2001 From: Micaiah Reid Date: Wed, 26 Jun 2024 20:51:56 -0400 Subject: [PATCH] fix: allow aborting a predicate scan (#601) Previously, when Chainhook was run as a service and the runloop to scan stacks/bitcoin predicates was set, we had no way to abort that scan. If a predicate was set to scan 1m blocks, but the user discovered the predicate was wrong and needed to delete, the user could delete the predicate from the store, but the scan thread had already started and would run until completion. This PR adds an abort signal to an ongoing scan so that when a predicate is deregistered, the scan is canceled. --- components/chainhook-cli/src/cli/mod.rs | 2 + components/chainhook-cli/src/scan/bitcoin.rs | 23 +- components/chainhook-cli/src/scan/common.rs | 6 + components/chainhook-cli/src/scan/stacks.rs | 39 ++- components/chainhook-cli/src/service/mod.rs | 44 ++- .../chainhook-cli/src/service/runloops.rs | 277 ++++++++++-------- .../chainhook-cli/src/service/tests/mod.rs | 1 + .../src/service/tests/runloop_tests.rs | 169 +++++++++++ components/chainhook-sdk/src/observer/mod.rs | 40 ++- .../chainhook-sdk/src/observer/tests/mod.rs | 22 +- components/chainhook-types-rs/src/lib.rs | 1 + 11 files changed, 477 insertions(+), 147 deletions(-) create mode 100644 components/chainhook-cli/src/service/tests/runloop_tests.rs diff --git a/components/chainhook-cli/src/cli/mod.rs b/components/chainhook-cli/src/cli/mod.rs index 005c4662a..387b6dac8 100644 --- a/components/chainhook-cli/src/cli/mod.rs +++ b/components/chainhook-cli/src/cli/mod.rs @@ -517,6 +517,7 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> { &predicate_spec, None, &config, + None, &ctx, ) .await?; @@ -545,6 +546,7 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> { None, &db_conn, &config, + None, &ctx, ) .await?; diff --git a/components/chainhook-cli/src/scan/bitcoin.rs b/components/chainhook-cli/src/scan/bitcoin.rs index b3d3185ce..0f80e7f54 100644 --- a/components/chainhook-cli/src/scan/bitcoin.rs +++ b/components/chainhook-cli/src/scan/bitcoin.rs @@ -22,13 +22,17 @@ use chainhook_sdk::types::{ }; use chainhook_sdk::utils::{file_append, send_request, Context}; use std::collections::HashMap; +use std::sync::{Arc, RwLock}; + +use super::common::PredicateScanResult; pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate( predicate_spec: &BitcoinChainhookSpecification, unfinished_scan_data: Option, config: &Config, + kill_signal: Option>>, ctx: &Context, -) -> Result { +) -> Result { let predicate_uuid = &predicate_spec.uuid; let auth = Auth::UserPass( config.network.bitcoind_rpc_username.clone(), @@ -62,7 +66,7 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate( let mut block_heights_to_scan = match block_heights_to_scan { Some(h) => h, // no blocks to scan, go straight to streaming - None => return Ok(false), + None => return Ok(PredicateScanResult::ChainTipReached), }; let mut predicates_db_conn = match config.http_api { @@ -100,6 +104,17 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate( let mut loop_did_trigger = false; while let Some(current_block_height) = block_heights_to_scan.pop_front() { + if let Some(kill_signal) = kill_signal.clone() { + match kill_signal.read() { + Ok(kill_signal) => { + // if true, we're received the kill signal, so break out of the loop + if *kill_signal { + return Ok(PredicateScanResult::Deregistered); + } + } + Err(_) => {} + } + } if let Some(ref mut predicates_db_conn) = predicates_db_conn { if number_of_blocks_scanned % 100 == 0 || number_of_blocks_scanned == 0 @@ -242,10 +257,10 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate( set_confirmed_expiration_status(&predicate_spec.key(), predicates_db_conn, ctx); } } - return Ok(true); + return Ok(PredicateScanResult::Expired); } - return Ok(false); + return Ok(PredicateScanResult::ChainTipReached); } pub async fn process_block_with_predicates( diff --git a/components/chainhook-cli/src/scan/common.rs b/components/chainhook-cli/src/scan/common.rs index c6261dff8..ee5ddef61 100644 --- a/components/chainhook-cli/src/scan/common.rs +++ b/components/chainhook-cli/src/scan/common.rs @@ -65,3 +65,9 @@ pub fn get_block_heights_to_scan( }; Ok(block_heights_to_scan) } + +pub enum PredicateScanResult { + ChainTipReached, + Expired, + Deregistered, +} diff --git a/components/chainhook-cli/src/scan/stacks.rs b/components/chainhook-cli/src/scan/stacks.rs index b626da29d..5212af0ec 100644 --- a/components/chainhook-cli/src/scan/stacks.rs +++ b/components/chainhook-cli/src/scan/stacks.rs @@ -1,4 +1,7 @@ -use std::collections::{HashMap, VecDeque}; +use std::{ + collections::{HashMap, VecDeque}, + sync::{Arc, RwLock}, +}; use crate::{ archive::download_stacks_dataset_if_required, @@ -29,6 +32,8 @@ use chainhook_sdk::{ }; use rocksdb::DB; +use super::common::PredicateScanResult; + #[derive(Debug, Clone, Eq, PartialEq)] pub enum DigestingCommand { DigestSeedBlock(BlockIdentifier), @@ -170,16 +175,17 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate( unfinished_scan_data: Option, stacks_db_conn: &DB, config: &Config, + kill_signal: Option>>, ctx: &Context, -) -> Result<(Option, bool), String> { +) -> Result { let predicate_uuid = &predicate_spec.uuid; let mut chain_tip = match get_last_unconfirmed_block_height_inserted(stacks_db_conn, ctx) { Some(chain_tip) => chain_tip, None => match get_last_block_height_inserted(stacks_db_conn, ctx) { Some(chain_tip) => chain_tip, None => { - info!(ctx.expect_logger(), "No blocks inserted in db; cannot determing Stacks chain tip. Skipping scan of predicate {}", predicate_uuid); - return Ok((None, false)); + info!(ctx.expect_logger(), "No blocks inserted in db; cannot determine Stacks chain tip. Skipping scan of predicate {}", predicate_uuid); + return Ok(PredicateScanResult::ChainTipReached); } }, }; @@ -194,7 +200,13 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate( let mut block_heights_to_scan = match block_heights_to_scan { Some(h) => h, // no blocks to scan, go straight to streaming - None => return Ok((None, false)), + None => { + debug!( + ctx.expect_logger(), + "Stacks chainstate scan completed. 0 blocks scanned." + ); + return Ok(PredicateScanResult::ChainTipReached); + } }; let mut predicates_db_conn = match config.http_api { @@ -226,6 +238,17 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate( let mut loop_did_trigger = false; while let Some(current_block_height) = block_heights_to_scan.pop_front() { + if let Some(kill_signal) = kill_signal.clone() { + match kill_signal.read() { + Ok(kill_signal) => { + // if true, we're received the kill signal, so break out of the loop + if *kill_signal { + return Ok(PredicateScanResult::Deregistered); + } + } + Err(_) => {} + } + } if let Some(ref mut predicates_db_conn) = predicates_db_conn { if number_of_blocks_scanned % 1000 == 0 || number_of_blocks_scanned == 0 @@ -255,7 +278,7 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate( Some(chain_tip) => chain_tip, None => { warn!(ctx.expect_logger(), "No blocks inserted in db; cannot determine Stacks chain tip. Skipping scan of predicate {}", predicate_uuid); - return Ok((None, false)); + return Ok(PredicateScanResult::ChainTipReached); } }, }; @@ -411,10 +434,10 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate( set_confirmed_expiration_status(&predicate_spec.key(), predicates_db_conn, ctx); } } - return Ok((Some(last_block_scanned), true)); + return Ok(PredicateScanResult::Expired); } - Ok((Some(last_block_scanned), false)) + Ok(PredicateScanResult::ChainTipReached) } pub async fn scan_stacks_chainstate_via_csv_using_predicate( diff --git a/components/chainhook-cli/src/service/mod.rs b/components/chainhook-cli/src/service/mod.rs index 2e74bb5bb..e62f76bb8 100644 --- a/components/chainhook-cli/src/service/mod.rs +++ b/components/chainhook-cli/src/service/mod.rs @@ -16,7 +16,8 @@ use chainhook_sdk::chainhooks::types::{ChainhookConfig, ChainhookFullSpecificati use chainhook_sdk::chainhooks::types::ChainhookSpecification; use chainhook_sdk::observer::{ start_event_observer, HookExpirationData, ObserverCommand, ObserverEvent, - PredicateEvaluationReport, PredicateInterruptedData, StacksObserverStartupContext, + PredicateDeregisteredEvent, PredicateEvaluationReport, PredicateInterruptedData, + StacksObserverStartupContext, }; use chainhook_sdk::types::{Chain, StacksBlockData, StacksChainEvent}; use chainhook_sdk::utils::Context; @@ -26,6 +27,7 @@ use std::sync::mpsc::{channel, Receiver, Sender}; use std::time::{SystemTime, UNIX_EPOCH}; use self::http_api::get_entry_from_predicates_db; +use self::runloops::{BitcoinScanOp, StacksScanOp}; pub struct Service { config: Config, @@ -304,10 +306,16 @@ impl Service { for predicate_with_last_scanned_block in leftover_scans { match predicate_with_last_scanned_block { (ChainhookSpecification::Stacks(spec), last_scanned_block) => { - let _ = stacks_scan_op_tx.send((spec, last_scanned_block)); + let _ = stacks_scan_op_tx.send(StacksScanOp::StartScan { + predicate_spec: spec, + unfinished_scan_data: last_scanned_block, + }); } (ChainhookSpecification::Bitcoin(spec), last_scanned_block) => { - let _ = bitcoin_scan_op_tx.send((spec, last_scanned_block)); + let _ = bitcoin_scan_op_tx.send(BitcoinScanOp::StartScan { + predicate_spec: spec, + unfinished_scan_data: last_scanned_block, + }); } } } @@ -354,10 +362,16 @@ impl Service { } match spec { ChainhookSpecification::Stacks(predicate_spec) => { - let _ = stacks_scan_op_tx.send((predicate_spec, None)); + let _ = stacks_scan_op_tx.send(StacksScanOp::StartScan { + predicate_spec, + unfinished_scan_data: None, + }); } ChainhookSpecification::Bitcoin(predicate_spec) => { - let _ = bitcoin_scan_op_tx.send((predicate_spec, None)); + let _ = bitcoin_scan_op_tx.send(BitcoinScanOp::StartScan { + predicate_spec, + unfinished_scan_data: None, + }); } } } @@ -382,14 +396,30 @@ impl Service { ); } } - ObserverEvent::PredicateDeregistered(uuid) => { + ObserverEvent::PredicateDeregistered(PredicateDeregisteredEvent { + predicate_uuid, + chain, + }) => { if let PredicatesApi::On(ref config) = self.config.http_api { let Ok(mut predicates_db_conn) = open_readwrite_predicates_db_conn_verbose(&config, &ctx) else { continue; }; - let predicate_key = ChainhookSpecification::either_stx_or_btc_key(&uuid); + + match chain { + Chain::Bitcoin => { + let _ = bitcoin_scan_op_tx + .send(BitcoinScanOp::KillScan(predicate_uuid.clone())); + } + Chain::Stacks => { + let _ = stacks_scan_op_tx + .send(StacksScanOp::KillScan(predicate_uuid.clone())); + } + }; + + let predicate_key = + ChainhookSpecification::either_stx_or_btc_key(&predicate_uuid); let res: Result<(), redis::RedisError> = predicates_db_conn.del(predicate_key.clone()); if let Err(e) = res { diff --git a/components/chainhook-cli/src/service/runloops.rs b/components/chainhook-cli/src/service/runloops.rs index a0b85b472..c2f28f394 100644 --- a/components/chainhook-cli/src/service/runloops.rs +++ b/components/chainhook-cli/src/service/runloops.rs @@ -1,4 +1,7 @@ -use std::sync::mpsc::Sender; +use std::{ + collections::HashMap, + sync::{mpsc::Sender, Arc, RwLock}, +}; use chainhook_sdk::{ chainhooks::types::{ @@ -12,7 +15,7 @@ use threadpool::ThreadPool; use crate::{ config::{Config, PredicatesApi}, scan::{ - bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate, + bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate, common::PredicateScanResult, stacks::scan_stacks_chainstate_via_rocksdb_using_predicate, }, service::{open_readwrite_predicates_db_conn_or_panic, set_predicate_interrupted_status}, @@ -21,149 +24,191 @@ use crate::{ use super::ScanningData; +pub enum StacksScanOp { + StartScan { + predicate_spec: StacksChainhookSpecification, + unfinished_scan_data: Option, + }, + KillScan(String), +} + pub fn start_stacks_scan_runloop( config: &Config, - stacks_scan_op_rx: crossbeam_channel::Receiver<( - StacksChainhookSpecification, - Option, - )>, + stacks_scan_op_rx: crossbeam_channel::Receiver, observer_command_tx: Sender, ctx: &Context, ) { let stacks_scan_pool = ThreadPool::new(config.limits.max_number_of_concurrent_stacks_scans); - while let Ok((predicate_spec, unfinished_scan_data)) = stacks_scan_op_rx.recv() { - let moved_ctx = ctx.clone(); - let moved_config = config.clone(); - let observer_command_tx = observer_command_tx.clone(); - stacks_scan_pool.execute(move || { - let stacks_db_conn = - match open_readonly_stacks_db_conn(&moved_config.expected_cache_path(), &moved_ctx) - { - Ok(db_conn) => db_conn, - Err(e) => { - // todo: if we repeatedly can't connect to the database, we should restart the - // service to get to a healthy state. I don't know if this has been an issue, though - // so we can monitor and possibly remove this todo - error!( - moved_ctx.expect_logger(), - "unable to open stacks db: {}", - e.to_string() - ); - unimplemented!() - } - }; + let mut kill_signals = HashMap::new(); - let op = scan_stacks_chainstate_via_rocksdb_using_predicate( - &predicate_spec, + while let Ok(op) = stacks_scan_op_rx.recv() { + match op { + StacksScanOp::StartScan { + predicate_spec, unfinished_scan_data, - &stacks_db_conn, - &moved_config, - &moved_ctx, - ); - let res = hiro_system_kit::nestable_block_on(op); - let (last_block_scanned, predicate_is_expired) = match res { - Ok(last_block_scanned) => last_block_scanned, - Err(e) => { - warn!( - moved_ctx.expect_logger(), - "Unable to evaluate predicate on Stacks chainstate: {e}", + } => { + let moved_ctx = ctx.clone(); + let moved_config = config.clone(); + let observer_command_tx = observer_command_tx.clone(); + let kill_signal = Arc::new(RwLock::new(false)); + kill_signals.insert(predicate_spec.uuid.clone(), kill_signal.clone()); + stacks_scan_pool.execute(move || { + let stacks_db_conn = match open_readonly_stacks_db_conn( + &moved_config.expected_cache_path(), + &moved_ctx, + ) { + Ok(db_conn) => db_conn, + Err(e) => { + // todo: if we repeatedly can't connect to the database, we should restart the + // service to get to a healthy state. I don't know if this has been an issue, though + // so we can monitor and possibly remove this todo + error!( + moved_ctx.expect_logger(), + "unable to open stacks db: {}", + e.to_string() + ); + unimplemented!() + } + }; + + let op = scan_stacks_chainstate_via_rocksdb_using_predicate( + &predicate_spec, + unfinished_scan_data, + &stacks_db_conn, + &moved_config, + Some(kill_signal), + &moved_ctx, ); + let res = hiro_system_kit::nestable_block_on(op); + match res { + Ok(PredicateScanResult::Expired) + | Ok(PredicateScanResult::Deregistered) => {} + Ok(PredicateScanResult::ChainTipReached) => { + let _ = observer_command_tx.send(ObserverCommand::EnablePredicate( + ChainhookSpecification::Stacks(predicate_spec), + )); + } + Err(e) => { + warn!( + moved_ctx.expect_logger(), + "Unable to evaluate predicate on Stacks chainstate: {e}", + ); - // Update predicate status in redis - if let PredicatesApi::On(ref api_config) = moved_config.http_api { - let error = - format!("Unable to evaluate predicate on Stacks chainstate: {e}"); - let mut predicates_db_conn = - open_readwrite_predicates_db_conn_or_panic(api_config, &moved_ctx); - set_predicate_interrupted_status( - error, - &predicate_spec.key(), - &mut predicates_db_conn, - &moved_ctx, - ); - } + // Update predicate status in redis + if let PredicatesApi::On(ref api_config) = moved_config.http_api { + let error = format!( + "Unable to evaluate predicate on Stacks chainstate: {e}" + ); + let mut predicates_db_conn = + open_readwrite_predicates_db_conn_or_panic( + api_config, &moved_ctx, + ); + set_predicate_interrupted_status( + error, + &predicate_spec.key(), + &mut predicates_db_conn, + &moved_ctx, + ); + } - return; - } - }; - match last_block_scanned { - Some(last_block_scanned) => { - info!( - moved_ctx.expect_logger(), - "Stacks chainstate scan completed up to block: {}", - last_block_scanned.index - ); - } - None => { - info!( - moved_ctx.expect_logger(), - "Stacks chainstate scan completed. 0 blocks scanned." - ); - } + return; + } + }; + }); } - if !predicate_is_expired { - let _ = observer_command_tx.send(ObserverCommand::EnablePredicate( - ChainhookSpecification::Stacks(predicate_spec), - )); + StacksScanOp::KillScan(predicate_uuid) => { + let Some(kill_signal) = kill_signals.remove(&predicate_uuid) else { + continue; + }; + let mut kill_signal_writer = kill_signal.write().unwrap(); + *kill_signal_writer = true; } - }); + } } let _ = stacks_scan_pool.join(); } +pub enum BitcoinScanOp { + StartScan { + predicate_spec: BitcoinChainhookSpecification, + unfinished_scan_data: Option, + }, + KillScan(String), +} + pub fn start_bitcoin_scan_runloop( config: &Config, - bitcoin_scan_op_rx: crossbeam_channel::Receiver<( - BitcoinChainhookSpecification, - Option, - )>, + bitcoin_scan_op_rx: crossbeam_channel::Receiver, observer_command_tx: Sender, ctx: &Context, ) { let bitcoin_scan_pool = ThreadPool::new(config.limits.max_number_of_concurrent_bitcoin_scans); + let mut kill_signals = HashMap::new(); - while let Ok((predicate_spec, unfinished_scan_data)) = bitcoin_scan_op_rx.recv() { - let moved_ctx = ctx.clone(); - let moved_config = config.clone(); - let observer_command_tx = observer_command_tx.clone(); - bitcoin_scan_pool.execute(move || { - let op = scan_bitcoin_chainstate_via_rpc_using_predicate( - &predicate_spec, + while let Ok(op) = bitcoin_scan_op_rx.recv() { + match op { + BitcoinScanOp::StartScan { + predicate_spec, unfinished_scan_data, - &moved_config, - &moved_ctx, - ); + } => { + let moved_ctx = ctx.clone(); + let moved_config = config.clone(); + let observer_command_tx = observer_command_tx.clone(); + let kill_signal = Arc::new(RwLock::new(false)); + kill_signals.insert(predicate_spec.uuid.clone(), kill_signal.clone()); - let predicate_is_expired = match hiro_system_kit::nestable_block_on(op) { - Ok(predicate_is_expired) => predicate_is_expired, - Err(e) => { - warn!( - moved_ctx.expect_logger(), - "Unable to evaluate predicate on Bitcoin chainstate: {e}", + bitcoin_scan_pool.execute(move || { + let op = scan_bitcoin_chainstate_via_rpc_using_predicate( + &predicate_spec, + unfinished_scan_data, + &moved_config, + Some(kill_signal), + &moved_ctx, ); - // Update predicate status in redis - if let PredicatesApi::On(ref api_config) = moved_config.http_api { - let error = - format!("Unable to evaluate predicate on Bitcoin chainstate: {e}"); - let mut predicates_db_conn = - open_readwrite_predicates_db_conn_or_panic(api_config, &moved_ctx); - set_predicate_interrupted_status( - error, - &predicate_spec.key(), - &mut predicates_db_conn, - &moved_ctx, - ) - } - return; - } - }; - if !predicate_is_expired { - let _ = observer_command_tx.send(ObserverCommand::EnablePredicate( - ChainhookSpecification::Bitcoin(predicate_spec), - )); + match hiro_system_kit::nestable_block_on(op) { + Ok(PredicateScanResult::Expired) + | Ok(PredicateScanResult::Deregistered) => {} + Ok(PredicateScanResult::ChainTipReached) => { + let _ = observer_command_tx.send(ObserverCommand::EnablePredicate( + ChainhookSpecification::Bitcoin(predicate_spec), + )); + } + Err(e) => { + warn!( + moved_ctx.expect_logger(), + "Unable to evaluate predicate on Bitcoin chainstate: {e}", + ); + + // Update predicate status in redis + if let PredicatesApi::On(ref api_config) = moved_config.http_api { + let error = format!( + "Unable to evaluate predicate on Bitcoin chainstate: {e}" + ); + let mut predicates_db_conn = + open_readwrite_predicates_db_conn_or_panic( + api_config, &moved_ctx, + ); + set_predicate_interrupted_status( + error, + &predicate_spec.key(), + &mut predicates_db_conn, + &moved_ctx, + ) + } + return; + } + }; + }); + } + BitcoinScanOp::KillScan(predicate_uuid) => { + let Some(kill_signal) = kill_signals.remove(&predicate_uuid) else { + continue; + }; + let mut kill_signal_writer = kill_signal.write().unwrap(); + *kill_signal_writer = true; } - }); + } } let _ = bitcoin_scan_pool.join(); } diff --git a/components/chainhook-cli/src/service/tests/mod.rs b/components/chainhook-cli/src/service/tests/mod.rs index be20a78cc..71e03ca3f 100644 --- a/components/chainhook-cli/src/service/tests/mod.rs +++ b/components/chainhook-cli/src/service/tests/mod.rs @@ -34,6 +34,7 @@ use super::http_api::document_predicate_api_server; pub mod helpers; mod observer_tests; +mod runloop_tests; async fn test_register_predicate(predicate: JsonValue) -> Result<(), (String, Shutdown)> { // perhaps a little janky, we bind to the port 0 to find an open one, then diff --git a/components/chainhook-cli/src/service/tests/runloop_tests.rs b/components/chainhook-cli/src/service/tests/runloop_tests.rs new file mode 100644 index 000000000..e8d8410fd --- /dev/null +++ b/components/chainhook-cli/src/service/tests/runloop_tests.rs @@ -0,0 +1,169 @@ +use std::{path::PathBuf, sync::mpsc::channel, thread::sleep, time::Duration}; + +use chainhook_sdk::{ + chainhooks::types::{ + BitcoinChainhookSpecification, BitcoinPredicateType, BlockIdentifierIndexRule, HookAction, + StacksChainhookSpecification, StacksPredicate, + }, + types::{BitcoinNetwork, StacksNetwork}, + utils::Context, +}; + +use crate::{ + config::{Config, EventSourceConfig, PathConfig}, + scan::stacks::consolidate_local_stacks_chainstate_using_csv, + service::{ + runloops::{ + start_bitcoin_scan_runloop, start_stacks_scan_runloop, BitcoinScanOp, StacksScanOp, + }, + tests::helpers::{ + mock_bitcoin_rpc::mock_bitcoin_rpc, mock_service::setup_chainhook_service_ports, + }, + }, +}; + +use super::helpers::mock_stacks_node::{create_tmp_working_dir, write_stacks_blocks_to_tsv}; + +#[tokio::test] +async fn test_stacks_runloop_kill_scan() { + let (working_dir, tsv_dir) = create_tmp_working_dir().unwrap_or_else(|e| { + panic!("test failed with error: {e}"); + }); + + write_stacks_blocks_to_tsv(1000, &tsv_dir).unwrap_or_else(|e| { + std::fs::remove_dir_all(&working_dir).unwrap(); + panic!("test failed with error: {e}"); + }); + + let mut config = Config::devnet_default(); + config.storage.working_dir = working_dir.clone(); + config.event_sources = vec![EventSourceConfig::StacksTsvPath(PathConfig { + file_path: PathBuf::from(tsv_dir), + })]; + + let logger = hiro_system_kit::log::setup_logger(); + let _guard = hiro_system_kit::log::setup_global_logger(logger.clone()); + let ctx = Context { + logger: Some(logger), + tracer: false, + }; + + consolidate_local_stacks_chainstate_using_csv(&mut config, &ctx) + .await + .unwrap_or_else(|e| { + std::fs::remove_dir_all(&working_dir).unwrap(); + panic!("test failed with error: {e}"); + }); + + let (scan_op_tx, scan_op_rx) = crossbeam_channel::unbounded(); + let (observer_command_tx, _observer_command_rx) = channel(); + + let _ = hiro_system_kit::thread_named("Stacks scan runloop") + .spawn(move || { + start_stacks_scan_runloop(&config, scan_op_rx, observer_command_tx.clone(), &ctx); + }) + .expect("unable to spawn thread"); + + let uuid = "test".to_string(); + let predicate_spec = StacksChainhookSpecification { + uuid: uuid.clone(), + owner_uuid: None, + name: "idc".to_string(), + network: StacksNetwork::Devnet, + version: 0, + blocks: None, + start_block: Some(1), + end_block: Some(1_000), + expire_after_occurrence: None, + capture_all_events: None, + decode_clarity_values: None, + include_contract_abi: None, + predicate: StacksPredicate::BlockHeight(BlockIdentifierIndexRule::LowerThan(0)), + action: HookAction::Noop, + enabled: false, + expired_at: None, + }; + let op = StacksScanOp::StartScan { + predicate_spec, + unfinished_scan_data: None, + }; + let _ = scan_op_tx.send(op); + sleep(Duration::new(0, 500_000)); + let _ = scan_op_tx.send(StacksScanOp::KillScan(uuid)); + sleep(Duration::new(0, 500_000)); + // todo: currently the scanning runloop is a bit of a black box. we have no insight + // into what or how many predicates are being scanned. so for this test, there's no + // good way to determine if we successfully killed the scan. + // this [issue](https://github.com/hirosystems/chainhook/issues/509) will give us + // more data on these threads. When this is done we should update these tests + // to do some actual verification that the predicate is no longer being scanned + std::fs::remove_dir_all(&working_dir).unwrap(); +} + +#[tokio::test] +async fn test_stacks_bitcoin_kill_scan() { + let (_, _, _, _, bitcoin_rpc_port, _) = + setup_chainhook_service_ports().unwrap_or_else(|e| panic!("test failed with error: {e}")); + + let _ = hiro_system_kit::thread_named("Bitcoin rpc service") + .spawn(move || { + let future = mock_bitcoin_rpc(bitcoin_rpc_port, 1_000); + let _ = hiro_system_kit::nestable_block_on(future); + }) + .expect("unable to spawn thread"); + + sleep(Duration::new(1, 0)); + let mut config = Config::devnet_default(); + config.network.bitcoind_rpc_url = format!("http://0.0.0.0:{bitcoin_rpc_port}"); + + let logger = hiro_system_kit::log::setup_logger(); + let _guard = hiro_system_kit::log::setup_global_logger(logger.clone()); + let ctx = Context { + logger: Some(logger), + tracer: false, + }; + + let (scan_op_tx, scan_op_rx) = crossbeam_channel::unbounded(); + let (observer_command_tx, _observer_command_rx) = channel(); + + let _ = hiro_system_kit::thread_named("Stacks scan runloop") + .spawn(move || { + start_bitcoin_scan_runloop(&config, scan_op_rx, observer_command_tx.clone(), &ctx); + }) + .expect("unable to spawn thread"); + + let uuid = "test".to_string(); + let predicate_spec = BitcoinChainhookSpecification { + uuid: uuid.clone(), + owner_uuid: None, + name: "idc".to_string(), + network: BitcoinNetwork::Regtest, + version: 0, + blocks: None, + start_block: Some(1), + end_block: Some(1_000), + expire_after_occurrence: None, + predicate: BitcoinPredicateType::Block, + action: HookAction::Noop, + enabled: false, + expired_at: None, + include_proof: false, + include_inputs: false, + include_outputs: false, + include_witness: false, + }; + + let op = BitcoinScanOp::StartScan { + predicate_spec, + unfinished_scan_data: None, + }; + let _ = scan_op_tx.send(op); + sleep(Duration::new(0, 50_000_000)); + let _ = scan_op_tx.send(BitcoinScanOp::KillScan(uuid)); + // todo: currently the scanning runloop is a bit of a black box. we have no insight + // into what or how many predicates are being scanned. so for this test, there's no + // good way to determine if we successfully killed the scan. + // this [issue](https://github.com/hirosystems/chainhook/issues/509) will give us + // more data on these threads. When this is done we should update these tests + // to do some actual verification that the predicate is no longer being scanned +} diff --git a/components/chainhook-sdk/src/observer/mod.rs b/components/chainhook-sdk/src/observer/mod.rs index 4f5197371..c4d2dbc40 100644 --- a/components/chainhook-sdk/src/observer/mod.rs +++ b/components/chainhook-sdk/src/observer/mod.rs @@ -26,7 +26,7 @@ use bitcoincore_rpc::bitcoin::{BlockHash, Txid}; use bitcoincore_rpc::{Auth, Client, RpcApi}; use chainhook_types::{ BitcoinBlockData, BitcoinBlockSignaling, BitcoinChainEvent, BitcoinChainUpdatedWithBlocksData, - BitcoinChainUpdatedWithReorgData, BitcoinNetwork, BlockIdentifier, BlockchainEvent, + BitcoinChainUpdatedWithReorgData, BitcoinNetwork, BlockIdentifier, BlockchainEvent, Chain, StacksBlockData, StacksChainEvent, StacksNetwork, StacksNodeConfig, TransactionIdentifier, DEFAULT_STACKS_NODE_RPC, }; @@ -312,7 +312,7 @@ pub enum ObserverEvent { StacksChainEvent((StacksChainEvent, PredicateEvaluationReport)), NotifyBitcoinTransactionProxied, PredicateRegistered(ChainhookSpecification), - PredicateDeregistered(String), + PredicateDeregistered(PredicateDeregisteredEvent), PredicateEnabled(ChainhookSpecification), BitcoinPredicateTriggered(BitcoinChainhookOccurrencePayload), StacksPredicateTriggered(StacksChainhookOccurrencePayload), @@ -322,6 +322,12 @@ pub enum ObserverEvent { StacksChainMempoolEvent(StacksChainMempoolEvent), } +#[derive(Clone, Debug)] +pub struct PredicateDeregisteredEvent { + pub predicate_uuid: String, + pub chain: Chain, +} + #[derive(Debug, Clone, Deserialize, Serialize)] /// JSONRPC Request pub struct BitcoinRPCRequest { @@ -1203,7 +1209,12 @@ pub async fn start_observer_commands_handler( prometheus_monitoring.btc_metrics_deregister_predicate(); } if let Some(ref tx) = observer_events_tx { - let _ = tx.send(ObserverEvent::PredicateDeregistered(hook_uuid.clone())); + let _ = tx.send(ObserverEvent::PredicateDeregistered( + PredicateDeregisteredEvent { + predicate_uuid: hook_uuid.clone(), + chain: Chain::Bitcoin, + }, + )); } } @@ -1384,7 +1395,12 @@ pub async fn start_observer_commands_handler( prometheus_monitoring.stx_metrics_deregister_predicate(); } if let Some(ref tx) = observer_events_tx { - let _ = tx.send(ObserverEvent::PredicateDeregistered(hook_uuid.clone())); + let _ = tx.send(ObserverEvent::PredicateDeregistered( + PredicateDeregisteredEvent { + predicate_uuid: hook_uuid.clone(), + chain: Chain::Stacks, + }, + )); } } @@ -1502,7 +1518,12 @@ pub async fn start_observer_commands_handler( }; // event if the predicate wasn't in the `chainhook_store`, propogate this event to delete from redis if let Some(tx) = &observer_events_tx { - let _ = tx.send(ObserverEvent::PredicateDeregistered(hook_uuid)); + let _ = tx.send(ObserverEvent::PredicateDeregistered( + PredicateDeregisteredEvent { + predicate_uuid: hook_uuid, + chain: Chain::Stacks, + }, + )); }; } ObserverCommand::DeregisterBitcoinPredicate(hook_uuid) => { @@ -1518,9 +1539,14 @@ pub async fn start_observer_commands_handler( // so only those that we find in the store should be removed prometheus_monitoring.btc_metrics_deregister_predicate(); }; - // event if the predicate wasn't in the `chainhook_store`, propogate this event to delete from redis + // even if the predicate wasn't in the `chainhook_store`, propogate this event to delete from redis if let Some(tx) = &observer_events_tx { - let _ = tx.send(ObserverEvent::PredicateDeregistered(hook_uuid)); + let _ = tx.send(ObserverEvent::PredicateDeregistered( + PredicateDeregisteredEvent { + predicate_uuid: hook_uuid.clone(), + chain: Chain::Bitcoin, + }, + )); }; } ObserverCommand::ExpireStacksPredicate(HookExpirationData { diff --git a/components/chainhook-sdk/src/observer/tests/mod.rs b/components/chainhook-sdk/src/observer/tests/mod.rs index 557854726..51525de73 100644 --- a/components/chainhook-sdk/src/observer/tests/mod.rs +++ b/components/chainhook-sdk/src/observer/tests/mod.rs @@ -14,7 +14,7 @@ use crate::indexer::tests::helpers::{ use crate::monitoring::PrometheusMonitoring; use crate::observer::{ start_observer_commands_handler, ChainhookStore, EventObserverConfig, ObserverCommand, - ObserverSidecar, + ObserverSidecar, PredicateDeregisteredEvent, }; use crate::utils::{AbstractBlock, Context}; use chainhook_types::{ @@ -501,7 +501,10 @@ fn test_stacks_chainhook_register_deregister() { chainhook.uuid.clone(), )); assert!(match observer_events_rx.recv() { - Ok(ObserverEvent::PredicateDeregistered(deregistered_chainhook)) => { + Ok(ObserverEvent::PredicateDeregistered(PredicateDeregisteredEvent { + predicate_uuid: deregistered_chainhook, + .. + })) => { assert_eq!(chainhook.uuid, deregistered_chainhook); true } @@ -690,7 +693,10 @@ fn test_stacks_chainhook_auto_deregister() { // Should signal that a hook was deregistered assert!(match observer_events_rx.recv() { - Ok(ObserverEvent::PredicateDeregistered(deregistered_hook)) => { + Ok(ObserverEvent::PredicateDeregistered(PredicateDeregisteredEvent { + predicate_uuid: deregistered_hook, + .. + })) => { assert_eq!(deregistered_hook, chainhook.uuid); true } @@ -856,7 +862,10 @@ fn test_bitcoin_chainhook_register_deregister() { chainhook.uuid.clone(), )); assert!(match observer_events_rx.recv() { - Ok(ObserverEvent::PredicateDeregistered(deregistered_chainhook)) => { + Ok(ObserverEvent::PredicateDeregistered(PredicateDeregisteredEvent { + predicate_uuid: deregistered_chainhook, + .. + })) => { assert_eq!(chainhook.uuid, deregistered_chainhook); true } @@ -1064,7 +1073,10 @@ fn test_bitcoin_chainhook_auto_deregister() { // Should signal that a hook was deregistered assert!(match observer_events_rx.recv() { - Ok(ObserverEvent::PredicateDeregistered(deregistered_hook)) => { + Ok(ObserverEvent::PredicateDeregistered(PredicateDeregisteredEvent { + predicate_uuid: deregistered_hook, + .. + })) => { assert_eq!(deregistered_hook, chainhook.uuid); true } diff --git a/components/chainhook-types-rs/src/lib.rs b/components/chainhook-types-rs/src/lib.rs index b5242d163..b015b6c75 100644 --- a/components/chainhook-types-rs/src/lib.rs +++ b/components/chainhook-types-rs/src/lib.rs @@ -18,6 +18,7 @@ pub use rosetta::*; pub const DEFAULT_STACKS_NODE_RPC: &str = "http://localhost:20443"; +#[derive(Clone, Debug)] pub enum Chain { Bitcoin, Stacks,