Skip to content

Commit

Permalink
fix: make predicate status timestamps optional (#425)
Browse files Browse the repository at this point in the history
### Description

This changes the predicate status timestamps:
 - from String (converted from a u128) to a u64
- this means we are now storing the data in seconds rather than
milliseconds
- makes them optional (so they will not appear in a serialized status if
set to `None`)

#### Breaking change?

Previously the timestamp was in milliseconds, now it's in seconds.
Anyone depending on the timestamp being in milliseconds could be
impacted by this change.


---

### Checklist

- [ ] All tests pass
- [ ] Tests added in this PR (if applicable)
  • Loading branch information
MicaiahReid authored Oct 4, 2023
1 parent ec76f29 commit 10d6475
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 29 deletions.
43 changes: 19 additions & 24 deletions components/chainhook-cli/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,21 +547,18 @@ pub enum PredicateStatus {
}

#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
// note: last_occurrence (and other time-based fields on the status structs) originally were
// of type u128. serde can't handle deserializing u128s when using an adjacently tagged enum,
// so we're having to convert to a string. serde issue: https://github.com/serde-rs/json/issues/740
pub struct ScanningData {
pub number_of_blocks_to_scan: u64,
pub number_of_blocks_evaluated: u64,
pub number_of_times_triggered: u64,
pub last_occurrence: String,
pub last_occurrence: Option<u64>,
pub last_evaluated_block_height: u64,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct StreamingData {
pub last_occurrence: String,
pub last_evaluation: String,
pub last_occurrence: Option<u64>,
pub last_evaluation: u64,
pub number_of_times_triggered: u64,
pub number_of_blocks_evaluated: u64,
pub last_evaluated_block_height: u64,
Expand All @@ -571,7 +568,7 @@ pub struct StreamingData {
pub struct ExpiredData {
pub number_of_blocks_evaluated: u64,
pub number_of_times_triggered: u64,
pub last_occurrence: String,
pub last_occurrence: Option<u64>,
pub last_evaluated_block_height: u64,
pub expired_at_block_height: u64,
}
Expand Down Expand Up @@ -664,11 +661,10 @@ fn set_predicate_streaming_status(
predicates_db_conn: &mut Connection,
ctx: &Context,
) {
let now_ms = SystemTime::now()
let now_secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Could not get current time in ms")
.as_millis()
.to_string();
.as_secs();
let (
last_occurrence,
number_of_blocks_evaluated,
Expand Down Expand Up @@ -720,7 +716,7 @@ fn set_predicate_streaming_status(
unreachable!("unreachable predicate status: {:?}", status)
}
},
None => (format!("0"), 0, 0, 0),
None => (None, 0, 0, 0),
}
};
let (
Expand All @@ -733,7 +729,7 @@ fn set_predicate_streaming_status(
last_triggered_height,
triggered_count,
} => (
now_ms.clone(),
Some(now_secs.clone()),
number_of_times_triggered + triggered_count,
number_of_blocks_evaluated + triggered_count,
last_triggered_height,
Expand All @@ -759,7 +755,7 @@ fn set_predicate_streaming_status(
predicate_key,
PredicateStatus::Streaming(StreamingData {
last_occurrence,
last_evaluation: now_ms,
last_evaluation: now_secs,
number_of_times_triggered,
last_evaluated_block_height,
number_of_blocks_evaluated,
Expand All @@ -781,47 +777,46 @@ pub fn set_predicate_scanning_status(
predicates_db_conn: &mut Connection,
ctx: &Context,
) {
let now_ms = SystemTime::now()
let now_secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Could not get current time in ms")
.as_millis()
.to_string();
.as_secs();
let current_status = retrieve_predicate_status(&predicate_key, predicates_db_conn);
let last_occurrence = match current_status {
Some(status) => match status {
PredicateStatus::Scanning(scanning_data) => {
if number_of_times_triggered > scanning_data.number_of_times_triggered {
now_ms
Some(now_secs)
} else {
scanning_data.last_occurrence
}
}
PredicateStatus::Streaming(streaming_data) => {
if number_of_times_triggered > streaming_data.number_of_times_triggered {
now_ms
Some(now_secs)
} else {
streaming_data.last_occurrence
}
}
PredicateStatus::UnconfirmedExpiration(expired_data) => {
if number_of_times_triggered > expired_data.number_of_times_triggered {
now_ms
Some(now_secs)
} else {
expired_data.last_occurrence
}
}
PredicateStatus::New => {
if number_of_times_triggered > 0 {
now_ms
Some(now_secs)
} else {
format!("0")
None
}
}
PredicateStatus::Interrupted(_) | PredicateStatus::ConfirmedExpiration(_) => {
unreachable!("unreachable predicate status: {:?}", status)
}
},
None => format!("0"),
None => None,
};

update_predicate_status(
Expand Down Expand Up @@ -868,7 +863,7 @@ pub fn set_unconfirmed_expiration_status(
last_occurrence,
last_evaluated_block_height,
),
PredicateStatus::New => (0, 0, format!("0"), 0),
PredicateStatus::New => (0, 0, None, 0),
PredicateStatus::Streaming(StreamingData {
last_occurrence,
last_evaluation: _,
Expand Down Expand Up @@ -904,7 +899,7 @@ pub fn set_unconfirmed_expiration_status(
return;
}
},
None => (0, 0, format!("0"), 0),
None => (0, 0, None, 0),
};
update_predicate_status(
predicate_key,
Expand Down
10 changes: 5 additions & 5 deletions components/chainhook-cli/src/service/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -966,21 +966,21 @@ async fn test_deregister_predicate(chain: Chain) {
number_of_blocks_evaluated: 4,
number_of_blocks_to_scan: 1,
number_of_times_triggered: 0,
last_occurrence: format!("0"),
last_occurrence: None,
last_evaluated_block_height: 4
}), 6 => using assert_confirmed_expiration_status; "preloaded predicate with scanning status should get scanned until completion")]
#[test_case(Streaming(StreamingData {
number_of_blocks_evaluated: 4,
number_of_times_triggered: 0,
last_occurrence: format!("0"),
last_evaluation: format!("0"),
last_occurrence: None,
last_evaluation: 0,
last_evaluated_block_height: 4
}), 6 => using assert_confirmed_expiration_status; "preloaded predicate with streaming status and last evaluated height below tip should get scanned until completion")]
#[test_case(Streaming(StreamingData {
number_of_blocks_evaluated: 5,
number_of_times_triggered: 0,
last_occurrence: format!("0"),
last_evaluation: format!("0"),
last_occurrence: None,
last_evaluation: 0,
last_evaluated_block_height: 5
}), 5 => using assert_streaming_status; "preloaded predicate with streaming status and last evaluated height at tip should be streamed")]
#[tokio::test]
Expand Down

0 comments on commit 10d6475

Please sign in to comment.