Skip to content

Commit

Permalink
fix: allow aborting scans
Browse files Browse the repository at this point in the history
  • Loading branch information
MicaiahReid committed May 30, 2024
1 parent 57bfc39 commit 50d0f9a
Show file tree
Hide file tree
Showing 10 changed files with 474 additions and 145 deletions.
2 changes: 2 additions & 0 deletions components/chainhook-cli/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,7 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
&predicate_spec,
None,
&config,
None,
&ctx,
)
.await?;
Expand Down Expand Up @@ -545,6 +546,7 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
None,
&db_conn,
&config,
None,
&ctx,
)
.await?;
Expand Down
23 changes: 19 additions & 4 deletions components/chainhook-cli/src/scan/bitcoin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@ use chainhook_sdk::types::{
};
use chainhook_sdk::utils::{file_append, send_request, Context};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};

use super::common::PredicateScanResult;

pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
predicate_spec: &BitcoinChainhookSpecification,
unfinished_scan_data: Option<ScanningData>,
config: &Config,
kill_signal: Option<Arc<RwLock<bool>>>,
ctx: &Context,
) -> Result<bool, String> {
) -> Result<PredicateScanResult, String> {
let predicate_uuid = &predicate_spec.uuid;
let auth = Auth::UserPass(
config.network.bitcoind_rpc_username.clone(),
Expand Down Expand Up @@ -62,7 +66,7 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
let mut block_heights_to_scan = match block_heights_to_scan {
Some(h) => h,
// no blocks to scan, go straight to streaming
None => return Ok(false),
None => return Ok(PredicateScanResult::ChainTipReached),
};

let mut predicates_db_conn = match config.http_api {
Expand Down Expand Up @@ -99,6 +103,17 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
let http_client = build_http_client();

while let Some(current_block_height) = block_heights_to_scan.pop_front() {
if let Some(kill_signal) = kill_signal.clone() {
match kill_signal.read() {
Ok(kill_signal) => {
// if true, we're received the kill signal, so break out of the loop
if *kill_signal {
return Ok(PredicateScanResult::Derigistered);
}
}
Err(_) => {}
}
}
if let Some(ref mut predicates_db_conn) = predicates_db_conn {
if number_of_blocks_scanned % 10 == 0 || number_of_blocks_scanned == 0 {
set_predicate_scanning_status(
Expand Down Expand Up @@ -235,10 +250,10 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
set_confirmed_expiration_status(&predicate_spec.key(), predicates_db_conn, ctx);
}
}
return Ok(true);
return Ok(PredicateScanResult::Expired);
}

return Ok(false);
return Ok(PredicateScanResult::ChainTipReached);
}

pub async fn process_block_with_predicates(
Expand Down
6 changes: 6 additions & 0 deletions components/chainhook-cli/src/scan/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,9 @@ pub fn get_block_heights_to_scan(
};
Ok(block_heights_to_scan)
}

pub enum PredicateScanResult {
ChainTipReached,
Expired,
Derigistered,
}
37 changes: 30 additions & 7 deletions components/chainhook-cli/src/scan/stacks.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::collections::{HashMap, VecDeque};
use std::{
collections::{HashMap, VecDeque},
sync::{Arc, RwLock},
};

use crate::{
archive::download_stacks_dataset_if_required,
Expand Down Expand Up @@ -29,6 +32,8 @@ use chainhook_sdk::{
};
use rocksdb::DB;

use super::common::PredicateScanResult;

#[derive(Debug, Clone, Eq, PartialEq)]
pub enum DigestingCommand {
DigestSeedBlock(BlockIdentifier),
Expand Down Expand Up @@ -170,16 +175,17 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
unfinished_scan_data: Option<ScanningData>,
stacks_db_conn: &DB,
config: &Config,
kill_signal: Option<Arc<RwLock<bool>>>,
ctx: &Context,
) -> Result<(Option<BlockIdentifier>, bool), String> {
) -> Result<PredicateScanResult, String> {
let predicate_uuid = &predicate_spec.uuid;
let mut chain_tip = match get_last_unconfirmed_block_height_inserted(stacks_db_conn, ctx) {
Some(chain_tip) => chain_tip,
None => match get_last_block_height_inserted(stacks_db_conn, ctx) {
Some(chain_tip) => chain_tip,
None => {
info!(ctx.expect_logger(), "No blocks inserted in db; cannot determine Stacks chain tip. Skipping scan of predicate {}", predicate_uuid);
return Ok((None, false));
return Ok(PredicateScanResult::ChainTipReached);
}
},
};
Expand All @@ -194,7 +200,13 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
let mut block_heights_to_scan = match block_heights_to_scan {
Some(h) => h,
// no blocks to scan, go straight to streaming
None => return Ok((None, false)),
None => {
debug!(
ctx.expect_logger(),
"Stacks chainstate scan completed. 0 blocks scanned."
);
return Ok(PredicateScanResult::ChainTipReached);
}
};

let mut predicates_db_conn = match config.http_api {
Expand Down Expand Up @@ -225,6 +237,17 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
};

while let Some(current_block_height) = block_heights_to_scan.pop_front() {
if let Some(kill_signal) = kill_signal.clone() {
match kill_signal.read() {
Ok(kill_signal) => {
// if true, we're received the kill signal, so break out of the loop
if *kill_signal {
return Ok(PredicateScanResult::Derigistered);
}
}
Err(_) => {}
}
}
if let Some(ref mut predicates_db_conn) = predicates_db_conn {
if number_of_blocks_scanned % 10 == 0 || number_of_blocks_scanned == 0 {
set_predicate_scanning_status(
Expand All @@ -248,7 +271,7 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
Some(chain_tip) => chain_tip,
None => {
warn!(ctx.expect_logger(), "No blocks inserted in db; cannot determine Stacks chain tip. Skipping scan of predicate {}", predicate_uuid);
return Ok((None, false));
return Ok(PredicateScanResult::ChainTipReached);
}
},
};
Expand Down Expand Up @@ -403,10 +426,10 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
set_confirmed_expiration_status(&predicate_spec.key(), predicates_db_conn, ctx);
}
}
return Ok((Some(last_block_scanned), true));
return Ok(PredicateScanResult::Expired);
}

Ok((Some(last_block_scanned), false))
Ok(PredicateScanResult::ChainTipReached)
}

pub async fn scan_stacks_chainstate_via_csv_using_predicate(
Expand Down
44 changes: 37 additions & 7 deletions components/chainhook-cli/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use chainhook_sdk::chainhooks::types::{ChainhookConfig, ChainhookFullSpecificati
use chainhook_sdk::chainhooks::types::ChainhookSpecification;
use chainhook_sdk::observer::{
start_event_observer, HookExpirationData, ObserverCommand, ObserverEvent,
PredicateEvaluationReport, PredicateInterruptedData, StacksObserverStartupContext,
PredicateDeregisteredEvent, PredicateEvaluationReport, PredicateInterruptedData,
StacksObserverStartupContext,
};
use chainhook_sdk::types::{Chain, StacksBlockData, StacksChainEvent};
use chainhook_sdk::utils::Context;
Expand All @@ -26,6 +27,7 @@ use std::sync::mpsc::{channel, Receiver, Sender};
use std::time::{SystemTime, UNIX_EPOCH};

use self::http_api::get_entry_from_predicates_db;
use self::runloops::{BitcoinScanOp, StacksScanOp};

pub struct Service {
config: Config,
Expand Down Expand Up @@ -304,10 +306,16 @@ impl Service {
for predicate_with_last_scanned_block in leftover_scans {
match predicate_with_last_scanned_block {
(ChainhookSpecification::Stacks(spec), last_scanned_block) => {
let _ = stacks_scan_op_tx.send((spec, last_scanned_block));
let _ = stacks_scan_op_tx.send(StacksScanOp::StartScan {
predicate_spec: spec,
unfinished_scan_data: last_scanned_block,
});
}
(ChainhookSpecification::Bitcoin(spec), last_scanned_block) => {
let _ = bitcoin_scan_op_tx.send((spec, last_scanned_block));
let _ = bitcoin_scan_op_tx.send(BitcoinScanOp::StartScan {
predicate_spec: spec,
unfinished_scan_data: last_scanned_block,
});
}
}
}
Expand Down Expand Up @@ -354,10 +362,16 @@ impl Service {
}
match spec {
ChainhookSpecification::Stacks(predicate_spec) => {
let _ = stacks_scan_op_tx.send((predicate_spec, None));
let _ = stacks_scan_op_tx.send(StacksScanOp::StartScan {
predicate_spec,
unfinished_scan_data: None,
});
}
ChainhookSpecification::Bitcoin(predicate_spec) => {
let _ = bitcoin_scan_op_tx.send((predicate_spec, None));
let _ = bitcoin_scan_op_tx.send(BitcoinScanOp::StartScan {
predicate_spec,
unfinished_scan_data: None,
});
}
}
}
Expand All @@ -382,14 +396,30 @@ impl Service {
);
}
}
ObserverEvent::PredicateDeregistered(uuid) => {
ObserverEvent::PredicateDeregistered(PredicateDeregisteredEvent {
predicate_uuid,
chain,
}) => {
if let PredicatesApi::On(ref config) = self.config.http_api {
let Ok(mut predicates_db_conn) =
open_readwrite_predicates_db_conn_verbose(&config, &ctx)
else {
continue;
};
let predicate_key = ChainhookSpecification::either_stx_or_btc_key(&uuid);

match chain {
Chain::Bitcoin => {
let _ = bitcoin_scan_op_tx
.send(BitcoinScanOp::KillScan(predicate_uuid.clone()));
}
Chain::Stacks => {
let _ = stacks_scan_op_tx
.send(StacksScanOp::KillScan(predicate_uuid.clone()));
}
};

let predicate_key =
ChainhookSpecification::either_stx_or_btc_key(&predicate_uuid);
let res: Result<(), redis::RedisError> =
predicates_db_conn.del(predicate_key.clone());
if let Err(e) = res {
Expand Down
Loading

0 comments on commit 50d0f9a

Please sign in to comment.