Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: store signer messages in local sqlite database #664

Merged
merged 11 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions components/chainhook-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ reqwest = { version = "0.12", default-features = false, features = [
"rustls-tls",
] }
tokio = { version = "1.38.1", features = ["full"] }
rusqlite = { version = "0.31.0", features = ["bundled"] }
slog = { version = "2.7.0" }
futures-util = "0.3.24"
flate2 = "1.0.24"
tar = "0.4.38"
Expand Down
9 changes: 4 additions & 5 deletions components/chainhook-cli/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use crate::storage::{
delete_confirmed_entry_from_stacks_blocks, delete_unconfirmed_entry_from_stacks_blocks,
get_last_block_height_inserted, get_last_unconfirmed_block_height_inserted,
get_stacks_block_at_block_height, insert_unconfirmed_entry_in_stacks_blocks,
is_stacks_block_present, open_readonly_stacks_db_conn, open_readonly_stacks_db_conn_with_retry,
open_readwrite_stacks_db_conn, set_last_confirmed_insert_key,
is_stacks_block_present, open_readonly_stacks_db_conn, open_readwrite_stacks_db_conn,
set_last_confirmed_insert_key, StacksDbConnections,
};
use chainhook_sdk::chainhooks::bitcoin::BitcoinChainhookSpecification;
use chainhook_sdk::chainhooks::bitcoin::BitcoinChainhookSpecificationNetworkMap;
Expand Down Expand Up @@ -547,15 +547,14 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
)
.await;
// Refresh DB connection so it picks up recent changes made by TSV consolidation.
let new_conn = open_readonly_stacks_db_conn_with_retry(
let mut db_conns = StacksDbConnections::open_readonly(
&config.expected_cache_path(),
5,
&ctx,
)?;
scan_stacks_chainstate_via_rocksdb_using_predicate(
&predicate_spec,
None,
&new_conn,
&mut db_conns,
&config,
None,
&ctx,
Expand Down
29 changes: 21 additions & 8 deletions components/chainhook-cli/src/scan/stacks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,25 @@ use crate::{
get_last_block_height_inserted, get_last_unconfirmed_block_height_inserted,
get_stacks_block_at_block_height, insert_entry_in_stacks_blocks, is_stacks_block_present,
open_readonly_stacks_db_conn_with_retry, open_readwrite_stacks_db_conn,
signers::get_signer_db_messages_received_at_block, StacksDbConnections,
},
};
use chainhook_sdk::types::{BlockIdentifier, Chain};
use chainhook_sdk::{
chainhooks::stacks::evaluate_stacks_chainhook_on_blocks,
indexer::{self, stacks::standardize_stacks_serialized_block_header, Indexer},
utils::Context,
};
use chainhook_sdk::{
chainhooks::stacks::evaluate_stacks_predicate_on_non_consensus_events,
types::{BlockIdentifier, Chain},
};
use chainhook_sdk::{
chainhooks::stacks::{
handle_stacks_hook_action, StacksChainhookInstance, StacksChainhookOccurrence,
StacksTriggerChainhook,
},
utils::{file_append, send_request, AbstractStacksBlock},
};
use rocksdb::DB;

use super::common::PredicateScanResult;

Expand Down Expand Up @@ -180,11 +183,12 @@ pub async fn get_canonical_fork_from_tsv(
pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
predicate_spec: &StacksChainhookInstance,
unfinished_scan_data: Option<ScanningData>,
stacks_db_conn: &DB,
db_conns: &mut StacksDbConnections,
config: &Config,
kill_signal: Option<Arc<RwLock<bool>>>,
ctx: &Context,
) -> Result<PredicateScanResult, String> {
let stacks_db_conn = &db_conns.stacks_db;
let predicate_uuid = &predicate_spec.uuid;
let mut chain_tip = match get_last_unconfirmed_block_height_inserted(stacks_db_conn, ctx) {
Some(chain_tip) => chain_tip,
Expand Down Expand Up @@ -327,20 +331,28 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
last_block_scanned = block_data.block_identifier.clone();

let blocks: Vec<&dyn AbstractStacksBlock> = vec![&block_data];

let (hits_per_blocks, _predicates_expired) =
evaluate_stacks_chainhook_on_blocks(blocks, predicate_spec, ctx);

if hits_per_blocks.is_empty() {
let events = get_signer_db_messages_received_at_block(
&mut db_conns.signers_db,
&block_data.block_identifier,
)?;
let (hits_per_events, _) = evaluate_stacks_predicate_on_non_consensus_events(
&events,
predicate_spec,
ctx,
);

if hits_per_blocks.is_empty() && hits_per_events.is_empty() {
continue;
}

let trigger = StacksTriggerChainhook {
chainhook: predicate_spec,
apply: hits_per_blocks,
rollback: vec![],
// TODO(rafaelcr): Query for non consensus events which fall between block timestamps to fill in here
events: vec![]
events: hits_per_events,
};
let res = match handle_stacks_hook_action(
trigger,
Expand Down Expand Up @@ -536,7 +548,7 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate(
apply: hits_per_blocks,
rollback: vec![],
// TODO(rafaelcr): Consider StackerDB chunks that come from TSVs.
events: vec![]
events: vec![],
};
match handle_stacks_hook_action(trigger, &proofs, &config.get_event_observer_config(), ctx)
{
Expand Down Expand Up @@ -646,6 +658,7 @@ pub async fn consolidate_local_stacks_chainstate_using_csv(
}
};

// TODO(rafaelcr): Store signer messages
insert_entry_in_stacks_blocks(&block_data, &stacks_db_rw, ctx)?;

if blocks_inserted % 2500 == 0 {
Expand Down
81 changes: 50 additions & 31 deletions components/chainhook-cli/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::config::{Config, PredicatesApi, PredicatesApiConfig};
use crate::scan::stacks::consolidate_local_stacks_chainstate_using_csv;
use crate::service::http_api::{load_predicates_from_redis, start_predicate_api_server};
use crate::service::runloops::{start_bitcoin_scan_runloop, start_stacks_scan_runloop};
use crate::storage::signers::{initialize_signers_db, store_signer_db_messages};
use crate::storage::{
confirm_entries_in_stacks_blocks, draft_entries_in_stacks_blocks, get_all_unconfirmed_blocks,
get_last_block_height_inserted, open_readonly_stacks_db_conn_with_retry,
Expand All @@ -19,6 +20,7 @@ use chainhook_sdk::observer::{
PredicateDeregisteredEvent, PredicateEvaluationReport, PredicateInterruptedData,
StacksObserverStartupContext,
};
use chainhook_sdk::{try_error, try_info};
use chainhook_sdk::types::{Chain, StacksBlockData, StacksChainEvent};
use chainhook_sdk::utils::Context;
use redis::{Commands, Connection};
Expand Down Expand Up @@ -152,10 +154,12 @@ impl Service {
}
}

initialize_signers_db(&self.config.expected_cache_path(), &self.ctx)
.map_err(|e| format!("unable to initialize signers db: {e}"))?;

let (observer_command_tx, observer_command_rx) =
observer_commands_tx_rx.unwrap_or(channel());
let (observer_event_tx, observer_event_rx) = crossbeam_channel::unbounded();
// let (ordinal_indexer_command_tx, ordinal_indexer_command_rx) = channel();

let mut event_observer_config = self.config.get_event_observer_config();
event_observer_config.registered_chainhooks = chainhook_store;
Expand Down Expand Up @@ -441,12 +445,14 @@ impl Service {
data,
) => {
for confirmed_block in &data.confirmed_blocks {
if let Some(expired_predicate_uuids) = expire_predicates_for_block(
&Chain::Bitcoin,
confirmed_block.block_identifier.index,
&mut predicates_db_conn,
&ctx,
) {
if let Some(expired_predicate_uuids) =
expire_predicates_for_block(
&Chain::Bitcoin,
confirmed_block.block_identifier.index,
&mut predicates_db_conn,
&ctx,
)
{
for uuid in expired_predicate_uuids.into_iter() {
let _ = observer_command_tx.send(
ObserverCommand::ExpireBitcoinPredicate(
Expand All @@ -466,12 +472,14 @@ impl Service {
data,
) => {
for confirmed_block in &data.confirmed_blocks {
if let Some(expired_predicate_uuids) = expire_predicates_for_block(
&Chain::Bitcoin,
confirmed_block.block_identifier.index,
&mut predicates_db_conn,
&ctx,
) {
if let Some(expired_predicate_uuids) =
expire_predicates_for_block(
&Chain::Bitcoin,
confirmed_block.block_identifier.index,
&mut predicates_db_conn,
&ctx,
)
{
for uuid in expired_predicate_uuids.into_iter() {
let _ = observer_command_tx.send(
ObserverCommand::ExpireBitcoinPredicate(
Expand Down Expand Up @@ -547,10 +555,16 @@ impl Service {
};
}
StacksChainEvent::ChainUpdatedWithMicroblocks(_)
| StacksChainEvent::ChainUpdatedWithMicroblocksReorg(_) => {},
| StacksChainEvent::ChainUpdatedWithMicroblocksReorg(_) => {}
StacksChainEvent::ChainUpdatedWithNonConsensusEvents(data) => {
// TODO(rafaelcr): Store signer data.
println!("signer message: {:?}", data);
if let Err(e) = store_signer_db_messages(
&self.config.expected_cache_path(),
&data.events,
&self.ctx,
) {
try_error!(self.ctx, "unable to store signer messages: {e}");
};
try_info!(self.ctx, "Stored {} stacks non-consensus events", data.events.len());
}
},
Err(e) => {
Expand All @@ -574,12 +588,14 @@ impl Service {
StacksChainEvent::ChainUpdatedWithBlocks(data) => {
stacks_event += 1;
for confirmed_block in &data.confirmed_blocks {
if let Some(expired_predicate_uuids) = expire_predicates_for_block(
&Chain::Stacks,
confirmed_block.block_identifier.index,
&mut predicates_db_conn,
&ctx,
) {
if let Some(expired_predicate_uuids) =
expire_predicates_for_block(
&Chain::Stacks,
confirmed_block.block_identifier.index,
&mut predicates_db_conn,
&ctx,
)
{
for uuid in expired_predicate_uuids.into_iter() {
let _ = observer_command_tx.send(
ObserverCommand::ExpireStacksPredicate(
Expand All @@ -597,12 +613,14 @@ impl Service {
}
StacksChainEvent::ChainUpdatedWithReorg(data) => {
for confirmed_block in &data.confirmed_blocks {
if let Some(expired_predicate_uuids) = expire_predicates_for_block(
&Chain::Stacks,
confirmed_block.block_identifier.index,
&mut predicates_db_conn,
&ctx,
) {
if let Some(expired_predicate_uuids) =
expire_predicates_for_block(
&Chain::Stacks,
confirmed_block.block_identifier.index,
&mut predicates_db_conn,
&ctx,
)
{
for uuid in expired_predicate_uuids.into_iter() {
let _ = observer_command_tx.send(
ObserverCommand::ExpireStacksPredicate(
Expand All @@ -619,10 +637,10 @@ impl Service {
}
}
StacksChainEvent::ChainUpdatedWithMicroblocks(_)
| StacksChainEvent::ChainUpdatedWithMicroblocksReorg(_) => {},
| StacksChainEvent::ChainUpdatedWithMicroblocksReorg(_) => {}
StacksChainEvent::ChainUpdatedWithNonConsensusEvents(_) => {
// TODO(rafaelcr): Expire signer message predicates when appropriate
},
}
};
update_status_from_report(
Chain::Stacks,
Expand All @@ -640,7 +658,8 @@ impl Service {
&mut self.config,
&self.ctx,
)
.await {
.await
{
error!(
self.ctx.expect_logger(),
"Failed to update database from archive: {e}"
Expand Down
7 changes: 3 additions & 4 deletions components/chainhook-cli/src/service/runloops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ use crate::{
bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate, common::PredicateScanResult,
stacks::scan_stacks_chainstate_via_rocksdb_using_predicate,
},
service::{open_readwrite_predicates_db_conn_or_panic, set_predicate_interrupted_status},
storage::open_readonly_stacks_db_conn,
service::{open_readwrite_predicates_db_conn_or_panic, set_predicate_interrupted_status}, storage::StacksDbConnections,
};

use super::ScanningData;
Expand Down Expand Up @@ -54,7 +53,7 @@ pub fn start_stacks_scan_runloop(
let kill_signal = Arc::new(RwLock::new(false));
kill_signals.insert(predicate_spec.uuid.clone(), kill_signal.clone());
stacks_scan_pool.execute(move || {
let stacks_db_conn = match open_readonly_stacks_db_conn(
let mut db_conns = match StacksDbConnections::open_readonly(
&moved_config.expected_cache_path(),
&moved_ctx,
) {
Expand All @@ -75,7 +74,7 @@ pub fn start_stacks_scan_runloop(
let op = scan_stacks_chainstate_via_rocksdb_using_predicate(
&predicate_spec,
unfinished_scan_data,
&stacks_db_conn,
&mut db_conns,
&moved_config,
Some(kill_signal),
&moved_ctx,
Expand Down
Loading
Loading