Skip to content

Commit c8ec571

Browse files
feat: support cast logs query chunking (#12692)
* feat: support cast logs query chunking * comments * comments * docs * comments
1 parent d296f9e commit c8ec571

File tree

3 files changed

+143
-5
lines changed

3 files changed

+143
-5
lines changed

crates/cast/src/cmd/logs.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,27 @@ pub struct LogsArgs {
4545
#[arg(long)]
4646
subscribe: bool,
4747

48+
/// Number of blocks to query in each chunk when the provider has range limits.
49+
/// Defaults to 10000 blocks per chunk.
50+
#[arg(long, default_value_t = 10000)]
51+
query_size: u64,
52+
4853
#[command(flatten)]
4954
eth: EthereumOpts,
5055
}
5156

5257
impl LogsArgs {
5358
pub async fn run(self) -> Result<()> {
54-
let Self { from_block, to_block, address, sig_or_topic, topics_or_args, subscribe, eth } =
55-
self;
59+
let Self {
60+
from_block,
61+
to_block,
62+
address,
63+
sig_or_topic,
64+
topics_or_args,
65+
subscribe,
66+
query_size,
67+
eth,
68+
} = self;
5669

5770
let config = eth.load_config()?;
5871
let provider = utils::get_provider(&config)?;
@@ -77,7 +90,7 @@ impl LogsArgs {
7790
let filter = build_filter(from_block, to_block, addresses, sig_or_topic, topics_or_args)?;
7891

7992
if !subscribe {
80-
let logs = cast.filter_logs(filter).await?;
93+
let logs = cast.filter_logs_chunked(filter, query_size).await?;
8194
sh_println!("{logs}")?;
8295
return Ok(());
8396
}

crates/cast/src/lib.rs

Lines changed: 109 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ use alloy_provider::{
1919
};
2020
use alloy_rlp::Decodable;
2121
use alloy_rpc_types::{
22-
BlockId, BlockNumberOrTag, BlockOverrides, Filter, TransactionRequest, state::StateOverride,
22+
BlockId, BlockNumberOrTag, BlockOverrides, Filter, FilterBlockOption, Log, TransactionRequest,
23+
state::StateOverride,
2324
};
2425
use alloy_serde::WithOtherFields;
2526
use base::{Base, NumberWithBase, ToBase};
@@ -76,7 +77,7 @@ pub struct Cast<P> {
7677
provider: P,
7778
}
7879

79-
impl<P: Provider<AnyNetwork>> Cast<P> {
80+
impl<P: Provider<AnyNetwork> + Clone + Unpin> Cast<P> {
8081
/// Creates a new Cast instance from the provided client
8182
///
8283
/// # Example
@@ -964,7 +965,19 @@ impl<P: Provider<AnyNetwork>> Cast<P> {
964965

965966
pub async fn filter_logs(&self, filter: Filter) -> Result<String> {
966967
let logs = self.provider.get_logs(&filter).await?;
968+
Self::format_logs(logs)
969+
}
967970

971+
/// Retrieves logs using chunked requests to handle large block ranges.
972+
///
973+
/// Automatically divides large block ranges into smaller chunks to avoid provider limits
974+
/// and processes them with controlled concurrency to prevent rate limiting.
975+
pub async fn filter_logs_chunked(&self, filter: Filter, chunk_size: u64) -> Result<String> {
976+
let logs = self.get_logs_chunked(&filter, chunk_size).await?;
977+
Self::format_logs(logs)
978+
}
979+
980+
fn format_logs(logs: Vec<Log>) -> Result<String> {
968981
let res = if shell::is_json() {
969982
serde_json::to_string(&logs)?
970983
} else {
@@ -981,6 +994,100 @@ impl<P: Provider<AnyNetwork>> Cast<P> {
981994
Ok(res)
982995
}
983996

997+
fn extract_block_range(filter: &Filter) -> (Option<u64>, Option<u64>) {
998+
let FilterBlockOption::Range { from_block, to_block } = &filter.block_option else {
999+
return (None, None);
1000+
};
1001+
1002+
(from_block.and_then(|b| b.as_number()), to_block.and_then(|b| b.as_number()))
1003+
}
1004+
1005+
/// Retrieves logs with automatic chunking fallback.
1006+
///
1007+
/// First tries to fetch logs for the entire range. If that fails,
1008+
/// falls back to concurrent chunked requests with rate limiting.
1009+
async fn get_logs_chunked(&self, filter: &Filter, chunk_size: u64) -> Result<Vec<Log>>
1010+
where
1011+
P: Clone + Unpin,
1012+
{
1013+
// Try the full range first
1014+
if let Ok(logs) = self.provider.get_logs(filter).await {
1015+
return Ok(logs);
1016+
}
1017+
1018+
// Fallback: use concurrent chunked approach
1019+
self.get_logs_chunked_concurrent(filter, chunk_size).await
1020+
}
1021+
1022+
/// Retrieves logs using concurrent chunked requests with rate limiting.
1023+
///
1024+
/// Divides the block range into chunks and processes them with a maximum of
1025+
/// 5 concurrent requests. Falls back to single-block queries if chunks fail.
1026+
async fn get_logs_chunked_concurrent(
1027+
&self,
1028+
filter: &Filter,
1029+
chunk_size: u64,
1030+
) -> Result<Vec<Log>>
1031+
where
1032+
P: Clone + Unpin,
1033+
{
1034+
let (from_block, to_block) = Self::extract_block_range(filter);
1035+
let (Some(from), Some(to)) = (from_block, to_block) else {
1036+
return self.provider.get_logs(filter).await.map_err(Into::into);
1037+
};
1038+
1039+
if from >= to {
1040+
return Ok(vec![]);
1041+
}
1042+
1043+
// Create chunk ranges using iterator
1044+
let chunk_ranges: Vec<(u64, u64)> = (from..to)
1045+
.step_by(chunk_size as usize)
1046+
.map(|chunk_start| (chunk_start, (chunk_start + chunk_size).min(to)))
1047+
.collect();
1048+
1049+
// Process chunks with controlled concurrency using buffered stream
1050+
let mut all_results: Vec<(u64, Vec<Log>)> = futures::stream::iter(chunk_ranges)
1051+
.map(|(start_block, chunk_end)| {
1052+
let chunk_filter = filter.clone().from_block(start_block).to_block(chunk_end - 1);
1053+
let provider = self.provider.clone();
1054+
1055+
async move {
1056+
// Try direct chunk request with simplified fallback
1057+
match provider.get_logs(&chunk_filter).await {
1058+
Ok(logs) => (start_block, logs),
1059+
Err(_) => {
1060+
// Simple fallback: try individual blocks in this chunk
1061+
let mut fallback_logs = Vec::new();
1062+
for single_block in start_block..chunk_end {
1063+
let single_filter = chunk_filter
1064+
.clone()
1065+
.from_block(single_block)
1066+
.to_block(single_block);
1067+
if let Ok(logs) = provider.get_logs(&single_filter).await {
1068+
fallback_logs.extend(logs);
1069+
}
1070+
}
1071+
(start_block, fallback_logs)
1072+
}
1073+
}
1074+
}
1075+
})
1076+
.buffered(5) // Limit to 5 concurrent requests to avoid rate limits
1077+
.collect()
1078+
.await;
1079+
1080+
// Sort once at the end by block number and flatten
1081+
all_results.sort_by_key(|(block_num, _)| *block_num);
1082+
1083+
let mut all_logs = Vec::new();
1084+
for (_, logs) in all_results {
1085+
all_logs.extend(logs);
1086+
}
1087+
1088+
Ok(all_logs)
1089+
}
1090+
9841091
/// Converts a block identifier into a block number.
9851092
///
9861093
/// If the block identifier is a block number, then this function returns the block number. If

crates/cast/tests/cli/main.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1611,6 +1611,24 @@ casttest!(logs_sig_2, |_prj, cmd| {
16111611
.stdout_eq(file!["../fixtures/cast_logs.stdout"]);
16121612
});
16131613

1614+
casttest!(logs_chunked_large_range, |_prj, cmd| {
1615+
let rpc = next_http_archive_rpc_url();
1616+
cmd.args([
1617+
"logs",
1618+
"--rpc-url",
1619+
rpc.as_str(),
1620+
"--from-block",
1621+
"18000000",
1622+
"--to-block",
1623+
"18050000",
1624+
"--query-size",
1625+
"1000",
1626+
"Transfer(address indexed from, address indexed to, uint256 value)",
1627+
"0xA0b86a33E6441d02dd8C6B2b7E5D1E3eD7F73b4b",
1628+
])
1629+
.assert_success();
1630+
});
1631+
16141632
casttest!(mktx, |_prj, cmd| {
16151633
cmd.args([
16161634
"mktx",

0 commit comments

Comments
 (0)