Skip to content

Commit

Permalink
fix: only import stacks tsv if chainstate is empty
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaelcr committed Nov 11, 2024
1 parent defd86f commit 5012aaa
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 127 deletions.
2 changes: 1 addition & 1 deletion components/chainhook-cli/src/archive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ pub async fn download_stacks_dataset_if_required(
) -> Result<bool, String> {
if config.is_initial_ingestion_required() {
// Download default tsv.
if config.rely_on_remote_stacks_tsv() && config.should_download_remote_stacks_tsv() {
if config.contains_remote_stacks_tsv_url() && config.should_download_remote_stacks_tsv() {
let url = config.expected_remote_stacks_tsv_url()?;
let mut tsv_file_path = config.expected_cache_path();
tsv_file_path.push(default_tsv_file_path(&config.network.stacks_network));
Expand Down
13 changes: 6 additions & 7 deletions components/chainhook-cli/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::config::generator::generate_config;
use crate::config::Config;
use crate::scan::bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate;
use crate::scan::stacks::{
consolidate_local_stacks_chainstate_using_csv, scan_stacks_chainstate_via_csv_using_predicate,
import_stacks_chainstate_from_remote_tsv, scan_stacks_chainstate_via_csv_using_predicate,
scan_stacks_chainstate_via_rocksdb_using_predicate,
};
use crate::service::http_api::document_predicate_api_server;
Expand All @@ -24,6 +24,7 @@ use chainhook_sdk::chainhooks::stacks::StacksChainhookSpecificationNetworkMap;
use chainhook_sdk::chainhooks::stacks::StacksPredicate;
use chainhook_sdk::chainhooks::stacks::StacksPrintEventBasedPredicate;
use chainhook_sdk::chainhooks::types::{ChainhookSpecificationNetworkMap, FileHook, HookAction};
use chainhook_sdk::try_info;
use chainhook_sdk::types::{BitcoinNetwork, BlockIdentifier, StacksNetwork};
use chainhook_sdk::utils::{BlockHeights, Context};
use clap::{Parser, Subcommand};
Expand Down Expand Up @@ -342,19 +343,17 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
ServiceCommand::Start(cmd) => {
let mut config =
Config::default(cmd.devnet, cmd.testnet, cmd.mainnet, &cmd.config_path)?;

if cmd.prometheus_monitoring_port.is_some() {
config.monitoring.prometheus_monitoring_port = cmd.prometheus_monitoring_port;
}

let predicates = cmd
.predicates_paths
.iter()
.map(|p| load_predicate_from_path(p))
.collect::<Result<Vec<ChainhookSpecificationNetworkMap>, _>>()?;

info!(ctx.expect_logger(), "Starting service...",);

try_info!(ctx, "Starting chainhook service");
import_stacks_chainstate_from_remote_tsv(&mut config, &ctx).await?;
let mut service = Service::new(config, ctx);
return service.run(predicates, None).await;
}
Expand Down Expand Up @@ -541,7 +540,7 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
};
match open_readonly_stacks_db_conn(&config.expected_cache_path(), &ctx) {
Ok(_) => {
let _ = consolidate_local_stacks_chainstate_using_csv(
let _ = import_stacks_chainstate_from_remote_tsv(
&mut config,
&ctx,
)
Expand Down Expand Up @@ -812,7 +811,7 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
}
StacksCommand::Db(StacksDbCommand::Update(cmd)) => {
let mut config = Config::default(false, false, false, &cmd.config_path)?;
consolidate_local_stacks_chainstate_using_csv(&mut config, &ctx).await?;
import_stacks_chainstate_from_remote_tsv(&mut config, &ctx).await?;
}
StacksCommand::Db(StacksDbCommand::Check(cmd)) => {
let config = Config::default(false, false, false, &cmd.config_path)?;
Expand Down
8 changes: 7 additions & 1 deletion components/chainhook-cli/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,12 @@ impl Config {
destination_path
}

pub fn is_cache_path_empty(&self) -> Result<bool, String> {
let mut dir = std::fs::read_dir(self.expected_cache_path())
.map_err(|e| format!("unable to read cache directory: {e}"))?;
Ok(dir.next().is_none())
}

fn expected_remote_stacks_tsv_base_url(&self) -> Result<&String, String> {
for source in self.event_sources.iter() {
if let EventSourceConfig::StacksTsvUrl(config) = source {
Expand All @@ -323,7 +329,7 @@ impl Config {
.map(|url| format!("{}.gz", url))
}

pub fn rely_on_remote_stacks_tsv(&self) -> bool {
pub fn contains_remote_stacks_tsv_url(&self) -> bool {
for source in self.event_sources.iter() {
if let EventSourceConfig::StacksTsvUrl(_config) = source {
return true;
Expand Down
169 changes: 82 additions & 87 deletions components/chainhook-cli/src/scan/stacks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::{
use chainhook_sdk::{
chainhooks::stacks::evaluate_stacks_chainhook_on_blocks,
indexer::{self, stacks::standardize_stacks_serialized_block_header, Indexer},
try_info,
utils::Context,
};
use chainhook_sdk::{
Expand Down Expand Up @@ -338,11 +339,8 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
&mut db_conns.signers_db,
&block_data.block_identifier,
)?;
let (hits_per_events, _) = evaluate_stacks_predicate_on_non_consensus_events(
&events,
predicate_spec,
ctx,
);
let (hits_per_events, _) =
evaluate_stacks_predicate_on_non_consensus_events(&events, predicate_spec, ctx);

if hits_per_blocks.is_empty() && hits_per_events.is_empty() {
continue;
Expand Down Expand Up @@ -584,101 +582,98 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate(
Ok(last_block_scanned)
}

pub async fn consolidate_local_stacks_chainstate_using_csv(
/// Downloads a remote archive TSV that contains Stacks node events and imports it into chainhook in order to fill up the Stacks
/// blocks database. This import will only happen if chainhook is starting from a fresh install with an empty index.
pub async fn import_stacks_chainstate_from_remote_tsv(
config: &mut Config,
ctx: &Context,
) -> Result<(), String> {
if !config.is_cache_path_empty()? {
try_info!(ctx, "A Stacks chainstate already exists, skipping TSV chainstante import");
return Ok(());
}
if !config.contains_remote_stacks_tsv_url() {
try_info!(ctx, "No remote Stacks TSV location was specified in config file, skipping TSV chainstante import");
return Ok(());
}
try_info!(ctx, "Importing Stacks chainstate from TSV");

download_stacks_dataset_if_required(config, ctx).await?;
let stacks_db = open_readonly_stacks_db_conn_with_retry(&config.expected_cache_path(), 3, ctx)?;
let confirmed_tip = get_last_block_height_inserted(&stacks_db, ctx);
let mut canonical_fork: VecDeque<(BlockIdentifier, BlockIdentifier, u64)> =
get_canonical_fork_from_tsv(config, confirmed_tip, ctx).await?;

let mut indexer = Indexer::new(config.network.clone());
let mut blocks_inserted = 0;
let mut blocks_read = 0;
let blocks_to_insert = canonical_fork.len();
let stacks_db_rw = open_readwrite_stacks_db_conn(&config.expected_cache_path(), ctx)?;
info!(
ctx.expect_logger(),
"Building local chainstate from Stacks archive file"
"Beginning import of {} Stacks blocks into rocks db", blocks_to_insert
);
// TODO: To avoid repeating code with `scan_stacks_chainstate_via_csv_using_predicate`, we should move this block
// retrieval code into a reusable function.
let tsv_path = config.expected_local_stacks_tsv_file()?.clone();
let mut tsv_reader = BufReader::new(File::open(tsv_path).map_err(|e| e.to_string())?);
let mut tsv_current_line = 0;
for (block_identifier, _parent_block_identifier, tsv_line_number) in canonical_fork.drain(..) {
blocks_read += 1;

let downloaded_new_dataset = download_stacks_dataset_if_required(config, ctx).await?;
if downloaded_new_dataset {
let stacks_db =
open_readonly_stacks_db_conn_with_retry(&config.expected_cache_path(), 3, ctx)?;
let confirmed_tip = get_last_block_height_inserted(&stacks_db, ctx);
let mut canonical_fork: VecDeque<(BlockIdentifier, BlockIdentifier, u64)> =
get_canonical_fork_from_tsv(config, confirmed_tip, ctx).await?;

let mut indexer = Indexer::new(config.network.clone());
let mut blocks_inserted = 0;
let mut blocks_read = 0;
let blocks_to_insert = canonical_fork.len();
let stacks_db_rw = open_readwrite_stacks_db_conn(&config.expected_cache_path(), ctx)?;
info!(
ctx.expect_logger(),
"Beginning import of {} Stacks blocks into rocks db", blocks_to_insert
);
// TODO: To avoid repeating code with `scan_stacks_chainstate_via_csv_using_predicate`, we should move this block
// retrieval code into a reusable function.
let tsv_path = config.expected_local_stacks_tsv_file()?.clone();
let mut tsv_reader = BufReader::new(File::open(tsv_path).map_err(|e| e.to_string())?);
let mut tsv_current_line = 0;
for (block_identifier, _parent_block_identifier, tsv_line_number) in
canonical_fork.drain(..)
{
blocks_read += 1;
// If blocks already stored, move on
if is_stacks_block_present(&block_identifier, 3, &stacks_db_rw) {
continue;
}
blocks_inserted += 1;

// If blocks already stored, move on
if is_stacks_block_present(&block_identifier, 3, &stacks_db_rw) {
continue;
}
blocks_inserted += 1;

// Seek to required line from TSV and retrieve its block payload.
let mut tsv_line = String::new();
while tsv_current_line < tsv_line_number {
tsv_line.clear();
let bytes_read = tsv_reader
.read_line(&mut tsv_line)
.map_err(|e| e.to_string())?;
if bytes_read == 0 {
return Err("Unexpected EOF when reading TSV".to_string());
}
tsv_current_line += 1;
// Seek to required line from TSV and retrieve its block payload.
let mut tsv_line = String::new();
while tsv_current_line < tsv_line_number {
tsv_line.clear();
let bytes_read = tsv_reader
.read_line(&mut tsv_line)
.map_err(|e| e.to_string())?;
if bytes_read == 0 {
return Err("Unexpected EOF when reading TSV".to_string());
}
let Some(serialized_block) = tsv_line.split('\t').last() else {
return Err("Unable to retrieve serialized block from TSV line".to_string());
};

let block_data = match indexer::stacks::standardize_stacks_serialized_block(
&indexer.config,
serialized_block,
&mut indexer.stacks_context,
ctx,
) {
Ok(block) => block,
Err(e) => {
error!(
&ctx.expect_logger(),
"Failed to standardize stacks block: {e}"
);
continue;
}
};

// TODO(rafaelcr): Store signer messages
insert_entry_in_stacks_blocks(&block_data, &stacks_db_rw, ctx)?;
tsv_current_line += 1;
}
let Some(serialized_block) = tsv_line.split('\t').last() else {
return Err("Unable to retrieve serialized block from TSV line".to_string());
};

if blocks_inserted % 2500 == 0 {
info!(
ctx.expect_logger(),
"Importing Stacks blocks into rocks db: {}/{}", blocks_read, blocks_to_insert
let block_data = match indexer::stacks::standardize_stacks_serialized_block(
&indexer.config,
serialized_block,
&mut indexer.stacks_context,
ctx,
) {
Ok(block) => block,
Err(e) => {
error!(
&ctx.expect_logger(),
"Failed to standardize stacks block: {e}"
);
let _ = stacks_db_rw.flush();
continue;
}
};

// TODO(rafaelcr): Store signer messages
insert_entry_in_stacks_blocks(&block_data, &stacks_db_rw, ctx)?;

if blocks_inserted % 2500 == 0 {
info!(
ctx.expect_logger(),
"Importing Stacks blocks into rocks db: {}/{}", blocks_read, blocks_to_insert
);
let _ = stacks_db_rw.flush();
}
let _ = stacks_db_rw.flush();
info!(
ctx.expect_logger(),
"{blocks_read} Stacks blocks read, {blocks_inserted} inserted"
);
} else {
info!(
ctx.expect_logger(),
"Skipping database consolidation - no new archive found since last consolidation."
);
}
let _ = stacks_db_rw.flush();
info!(
ctx.expect_logger(),
"{blocks_read} Stacks blocks read, {blocks_inserted} inserted"
);
Ok(())
}
27 changes: 0 additions & 27 deletions components/chainhook-cli/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ pub(crate) mod http_api;
mod runloops;

use crate::config::{Config, PredicatesApi, PredicatesApiConfig};
use crate::scan::stacks::consolidate_local_stacks_chainstate_using_csv;
use crate::service::http_api::{load_predicates_from_redis, start_predicate_api_server};
use crate::service::runloops::{start_bitcoin_scan_runloop, start_stacks_scan_runloop};
use crate::storage::signers::{initialize_signers_db, store_signer_db_messages};
Expand Down Expand Up @@ -164,11 +163,6 @@ impl Service {
let mut event_observer_config = self.config.get_event_observer_config();
event_observer_config.registered_chainhooks = chainhook_store;

// Download and ingest a Stacks dump
if self.config.rely_on_remote_stacks_tsv() {
consolidate_local_stacks_chainstate_using_csv(&mut self.config, &self.ctx).await?;
}

// Stacks scan operation threadpool
let (stacks_scan_op_tx, stacks_scan_op_rx) = crossbeam_channel::unbounded();
let ctx = self.ctx.clone();
Expand Down Expand Up @@ -292,8 +286,6 @@ impl Service {
self.ctx.clone(),
);

let mut stacks_event = 0;

let ctx = self.ctx.clone();
match self.config.http_api {
PredicatesApi::On(ref api_config) => {
Expand Down Expand Up @@ -586,7 +578,6 @@ impl Service {

match &chain_event {
StacksChainEvent::ChainUpdatedWithBlocks(data) => {
stacks_event += 1;
for confirmed_block in &data.confirmed_blocks {
if let Some(expired_predicate_uuids) =
expire_predicates_for_block(
Expand Down Expand Up @@ -649,24 +640,6 @@ impl Service {
&ctx,
);
};

// Every 32 blocks, we will check if there's a new Stacks file archive to ingest
if stacks_event > 32 {
stacks_event = 0;
if self.config.rely_on_remote_stacks_tsv() {
if let Err(e) = consolidate_local_stacks_chainstate_using_csv(
&mut self.config,
&self.ctx,
)
.await
{
error!(
self.ctx.expect_logger(),
"Failed to update database from archive: {e}"
)
};
}
}
}
ObserverEvent::PredicateInterrupted(PredicateInterruptedData {
predicate_key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::config::{
Config, EventSourceConfig, LimitsConfig, MonitoringConfig, PathConfig, PredicatesApi,
PredicatesApiConfig, StorageConfig, DEFAULT_REDIS_URI,
};
use crate::scan::stacks::consolidate_local_stacks_chainstate_using_csv;
use crate::scan::stacks::import_stacks_chainstate_from_remote_tsv;
use crate::service::{
http_api::start_predicate_api_server, update_predicate_spec, update_predicate_status,
PredicateStatus, Service,
Expand Down Expand Up @@ -442,7 +442,7 @@ pub async fn setup_stacks_chainhook_test(
Some(prometheus_port),
);

consolidate_local_stacks_chainstate_using_csv(&mut config, &ctx)
import_stacks_chainstate_from_remote_tsv(&mut config, &ctx)
.await
.unwrap_or_else(|e| {
std::fs::remove_dir_all(&working_dir).unwrap();
Expand Down
4 changes: 2 additions & 2 deletions components/chainhook-cli/src/service/tests/runloop_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use chainhook_sdk::{

use crate::{
config::{Config, EventSourceConfig, PathConfig},
scan::stacks::consolidate_local_stacks_chainstate_using_csv,
scan::stacks::import_stacks_chainstate_from_remote_tsv,
service::{
runloops::{
start_bitcoin_scan_runloop, start_stacks_scan_runloop, BitcoinScanOp, StacksScanOp,
Expand Down Expand Up @@ -49,7 +49,7 @@ async fn test_stacks_runloop_kill_scan() {
tracer: false,
};

consolidate_local_stacks_chainstate_using_csv(&mut config, &ctx)
import_stacks_chainstate_from_remote_tsv(&mut config, &ctx)
.await
.unwrap_or_else(|e| {
std::fs::remove_dir_all(&working_dir).unwrap();
Expand Down

0 comments on commit 5012aaa

Please sign in to comment.