From 80737addce9d6df7035b5586da11f33640ee72d2 Mon Sep 17 00:00:00 2001 From: Ludo Galabru Date: Wed, 11 Oct 2023 10:37:16 -0400 Subject: [PATCH] fix: redis conn (#442) This PR is fixing the management of the redis connection. In the current approach, we're reusing an existing connection that is being opened at startup for test purposes, but that can get broken after some time if blocks take time to arrive. Fix: we open/close a new connection every time a new block hits the API. --- components/chainhook-cli/src/service/mod.rs | 179 +++++++++--------- .../chainhook-sdk/src/indexer/stacks/mod.rs | 2 +- 2 files changed, 92 insertions(+), 89 deletions(-) diff --git a/components/chainhook-cli/src/service/mod.rs b/components/chainhook-cli/src/service/mod.rs index aaf204679..48d509d5c 100644 --- a/components/chainhook-cli/src/service/mod.rs +++ b/components/chainhook-cli/src/service/mod.rs @@ -225,11 +225,12 @@ impl Service { let mut stacks_event = 0; let ctx = self.ctx.clone(); - let mut predicates_db_conn = match self.config.http_api { + match self.config.http_api { PredicatesApi::On(ref api_config) => { - Some(open_readwrite_predicates_db_conn_or_panic(api_config, &ctx)) + // Test redis connection + open_readwrite_predicates_db_conn(api_config)?; } - PredicatesApi::Off => None, + PredicatesApi::Off => {} }; for predicate_with_last_scanned_block in leftover_scans { @@ -265,17 +266,8 @@ impl Service { // If no start block specified, depending on the nature the hook, we'd like to retrieve: // - contract-id if let PredicatesApi::On(ref config) = self.config.http_api { - let mut predicates_db_conn = match open_readwrite_predicates_db_conn(config) - { - Ok(con) => con, - Err(e) => { - error!( - self.ctx.expect_logger(), - "unable to register predicate: {}", - e.to_string() - ); - continue; - } + let Ok(mut predicates_db_conn) = open_readwrite_predicates_db_conn_verbose(&config, &ctx) else { + continue; }; update_predicate_spec( &spec.key(), @@ -301,17 +293,8 @@ impl Service { } ObserverEvent::PredicateEnabled(spec) => { if let PredicatesApi::On(ref config) = self.config.http_api { - let mut predicates_db_conn = match open_readwrite_predicates_db_conn(config) - { - Ok(con) => con, - Err(e) => { - error!( - self.ctx.expect_logger(), - "unable to enable predicate: {}", - e.to_string() - ); - continue; - } + let Ok(mut predicates_db_conn) = open_readwrite_predicates_db_conn_verbose(&config, &ctx) else { + continue; }; update_predicate_spec( &spec.key(), @@ -329,17 +312,8 @@ impl Service { } ObserverEvent::PredicateDeregistered(spec) => { if let PredicatesApi::On(ref config) = self.config.http_api { - let mut predicates_db_conn = match open_readwrite_predicates_db_conn(config) - { - Ok(con) => con, - Err(e) => { - error!( - self.ctx.expect_logger(), - "unable to deregister predicate: {}", - e.to_string() - ); - continue; - } + let Ok(mut predicates_db_conn) = open_readwrite_predicates_db_conn_verbose(&config, &ctx) else { + continue; }; let predicate_key = spec.key(); let res: Result<(), redis::RedisError> = @@ -355,14 +329,20 @@ impl Service { } ObserverEvent::BitcoinChainEvent((chain_update, report)) => { debug!(self.ctx.expect_logger(), "Bitcoin update not stored"); - match chain_update { - chainhook_sdk::types::BitcoinChainEvent::ChainUpdatedWithBlocks(data) => { - if let Some(ref mut predicates_db_conn) = predicates_db_conn { + if let PredicatesApi::On(ref config) = self.config.http_api { + let Ok(mut predicates_db_conn) = open_readwrite_predicates_db_conn_verbose(&config, &ctx) else { + continue; + }; + + match chain_update { + chainhook_sdk::types::BitcoinChainEvent::ChainUpdatedWithBlocks( + data, + ) => { for confirmed_block in &data.confirmed_blocks { match expire_predicates_for_block( &Chain::Bitcoin, confirmed_block.block_identifier.index, - predicates_db_conn, + &mut predicates_db_conn, &ctx, ) { Some(expired_predicate_uuids) => { @@ -383,14 +363,14 @@ impl Service { } } } - } - chainhook_sdk::types::BitcoinChainEvent::ChainUpdatedWithReorg(data) => { - if let Some(ref mut predicates_db_conn) = predicates_db_conn { + chainhook_sdk::types::BitcoinChainEvent::ChainUpdatedWithReorg( + data, + ) => { for confirmed_block in &data.confirmed_blocks { match expire_predicates_for_block( &Chain::Bitcoin, confirmed_block.block_identifier.index, - predicates_db_conn, + &mut predicates_db_conn, &ctx, ) { Some(expired_predicate_uuids) => { @@ -412,9 +392,12 @@ impl Service { } } } - } - if let Some(ref mut predicates_db_conn) = predicates_db_conn { - update_stats_from_report(Chain::Bitcoin, report, predicates_db_conn, &ctx); + update_stats_from_report( + Chain::Bitcoin, + report, + &mut predicates_db_conn, + &ctx, + ); } } ObserverEvent::StacksChainEvent((chain_event, report)) => { @@ -432,15 +415,49 @@ impl Service { continue; } }; + match &chain_event { StacksChainEvent::ChainUpdatedWithBlocks(data) => { - stacks_event += 1; - if let Some(ref mut predicates_db_conn) = predicates_db_conn { + confirm_entries_in_stacks_blocks( + &data.confirmed_blocks, + &stacks_db_conn_rw, + &self.ctx, + ); + draft_entries_in_stacks_blocks( + &data.new_blocks, + &stacks_db_conn_rw, + &self.ctx, + ) + } + StacksChainEvent::ChainUpdatedWithReorg(data) => { + confirm_entries_in_stacks_blocks( + &data.confirmed_blocks, + &stacks_db_conn_rw, + &self.ctx, + ); + draft_entries_in_stacks_blocks( + &data.blocks_to_apply, + &stacks_db_conn_rw, + &self.ctx, + ) + } + StacksChainEvent::ChainUpdatedWithMicroblocks(_) + | StacksChainEvent::ChainUpdatedWithMicroblocksReorg(_) => {} + }; + + if let PredicatesApi::On(ref config) = self.config.http_api { + let Ok(mut predicates_db_conn) = open_readwrite_predicates_db_conn_verbose(&config, &ctx) else { + continue; + }; + + match &chain_event { + StacksChainEvent::ChainUpdatedWithBlocks(data) => { + stacks_event += 1; for confirmed_block in &data.confirmed_blocks { match expire_predicates_for_block( &Chain::Stacks, confirmed_block.block_identifier.index, - predicates_db_conn, + &mut predicates_db_conn, &ctx, ) { Some(expired_predicate_uuids) => { @@ -461,24 +478,12 @@ impl Service { } } } - confirm_entries_in_stacks_blocks( - &data.confirmed_blocks, - &stacks_db_conn_rw, - &self.ctx, - ); - draft_entries_in_stacks_blocks( - &data.new_blocks, - &stacks_db_conn_rw, - &self.ctx, - ) - } - StacksChainEvent::ChainUpdatedWithReorg(data) => { - if let Some(ref mut predicates_db_conn) = predicates_db_conn { + StacksChainEvent::ChainUpdatedWithReorg(data) => { for confirmed_block in &data.confirmed_blocks { match expire_predicates_for_block( &Chain::Stacks, confirmed_block.block_identifier.index, - predicates_db_conn, + &mut predicates_db_conn, &ctx, ) { Some(expired_predicate_uuids) => { @@ -499,23 +504,17 @@ impl Service { } } } - confirm_entries_in_stacks_blocks( - &data.confirmed_blocks, - &stacks_db_conn_rw, - &self.ctx, - ); - draft_entries_in_stacks_blocks( - &data.blocks_to_apply, - &stacks_db_conn_rw, - &self.ctx, - ) - } - StacksChainEvent::ChainUpdatedWithMicroblocks(_) - | StacksChainEvent::ChainUpdatedWithMicroblocksReorg(_) => {} + StacksChainEvent::ChainUpdatedWithMicroblocks(_) + | StacksChainEvent::ChainUpdatedWithMicroblocksReorg(_) => {} + }; + update_stats_from_report( + Chain::Stacks, + report, + &mut predicates_db_conn, + &ctx, + ); }; - if let Some(ref mut predicates_db_conn) = predicates_db_conn { - update_stats_from_report(Chain::Stacks, report, predicates_db_conn, &ctx); - } + // Every 32 blocks, we will check if there's a new Stacks file archive to ingest if stacks_event > 32 { stacks_event = 0; @@ -1103,18 +1102,22 @@ pub fn open_readwrite_predicates_db_conn( .map_err(|e| format!("unable to connect to db: {}", e.to_string())) } +pub fn open_readwrite_predicates_db_conn_verbose( + config: &PredicatesApiConfig, + ctx: &Context, +) -> Result { + let res = open_readwrite_predicates_db_conn(config); + if let Err(ref e) = res { + error!(ctx.expect_logger(), "{}", e.to_string()); + } + res +} + pub fn open_readwrite_predicates_db_conn_or_panic( config: &PredicatesApiConfig, ctx: &Context, ) -> Connection { - let redis_con = match open_readwrite_predicates_db_conn(config) { - Ok(con) => con, - Err(message) => { - error!(ctx.expect_logger(), "Redis: {}", message.to_string()); - panic!(); - } - }; - redis_con + open_readwrite_predicates_db_conn_verbose(config, ctx).expect("unable to open redis conn") } #[cfg(test)] diff --git a/components/chainhook-sdk/src/indexer/stacks/mod.rs b/components/chainhook-sdk/src/indexer/stacks/mod.rs index 49025e621..5d7e3a060 100644 --- a/components/chainhook-sdk/src/indexer/stacks/mod.rs +++ b/components/chainhook-sdk/src/indexer/stacks/mod.rs @@ -342,7 +342,7 @@ pub fn standardize_stacks_block( index: match block.block_height { 0 => 0, _ => block.block_height - 1, - } + }, }, timestamp: block.parent_burn_block_timestamp, metadata: StacksBlockMetadata {