Skip to content

Commit

Permalink
Avoid deserializing QueryAST for every split.
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Aug 1, 2024
1 parent 6fa3c29 commit f6ff304
Showing 1 changed file with 52 additions and 30 deletions.
82 changes: 52 additions & 30 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use tantivy::directory::FileSlice;
use tantivy::fastfield::FastFieldReaders;
use tantivy::schema::Field;
use tantivy::{DateTime, Index, ReloadPolicy, Searcher, Term};
use tokio::task::JoinError;
use tracing::*;

use crate::collector::{make_collector_for_split, make_merge_collector, IncrementalCollector};
Expand Down Expand Up @@ -342,9 +343,11 @@ fn get_leaf_resp_from_count(count: u64) -> LeafSearchResponse {
}

/// Apply a leaf search on a single split.
#[allow(clippy::too_many_arguments)]
async fn leaf_search_single_split(
searcher_context: &SearcherContext,
mut search_request: SearchRequest,
query_ast: Arc<QueryAst>,
storage: Arc<dyn Storage>,
split: SplitIdAndFooterOffsets,
doc_mapper: Arc<dyn DocMapper>,
Expand All @@ -363,9 +366,6 @@ async fn leaf_search_single_split(
return Ok(cached_answer);
}

let query_ast: QueryAst = serde_json::from_str(search_request.query_ast.as_str())
.map_err(|err| SearchError::InvalidQuery(err.to_string()))?;

// CanSplitDoBetter or rewrite_request may have changed the request to be a count only request
// This may be the case for AllQuery with a sort by date and time filter, where the current
// split can't have better results.
Expand Down Expand Up @@ -1217,12 +1217,17 @@ pub async fn leaf_search(

let split_filter = Arc::new(RwLock::new(split_filter));

let mut leaf_search_single_split_futures: Vec<_> = Vec::with_capacity(split_with_req.len());
let mut leaf_search_single_split_join_handles: Vec<(String, tokio::task::JoinHandle<()>)> =
Vec::with_capacity(split_with_req.len());

let merge_collector = make_merge_collector(&request, &aggregations_limits)?;
let incremental_merge_collector = IncrementalCollector::new(merge_collector);
let incremental_merge_collector = Arc::new(Mutex::new(incremental_merge_collector));

let query_ast: Arc<QueryAst> = serde_json::from_str::<QueryAst>(&request.query_ast)
.map_err(|err| SearchError::InvalidQuery(err.to_string()))?
.into();

for (split, mut request) in split_with_req {
let leaf_split_search_permit = searcher_context.leaf_search_split_semaphore
.clone()
Expand All @@ -1236,44 +1241,59 @@ pub async fn leaf_search(
continue;
}

leaf_search_single_split_futures.push(tokio::spawn(
leaf_search_single_split_wrapper(
request,
searcher_context.clone(),
index_storage.clone(),
doc_mapper.clone(),
split,
split_filter.clone(),
incremental_merge_collector.clone(),
leaf_split_search_permit,
aggregations_limits.clone(),
)
.in_current_span(),
leaf_search_single_split_join_handles.push((
split.split_id.clone(),
tokio::spawn(
leaf_search_single_split_wrapper(
request,
query_ast.clone(),
searcher_context.clone(),
index_storage.clone(),
doc_mapper.clone(),
split,
split_filter.clone(),
incremental_merge_collector.clone(),
leaf_split_search_permit,
aggregations_limits.clone(),
)
.in_current_span(),
),
));
}

// TODO we could cancel running splits when !run_all_splits and the running split can no
// longer give better results after some other split answered.
let split_search_results: Vec<Result<(), _>> =
futures::future::join_all(leaf_search_single_split_futures).await;
let mut split_search_join_errors: Vec<(String, JoinError)> = Vec::new();

// There is no need to use `join_all`, as these are spawned tasks.
for (split, leaf_search_join_handle) in leaf_search_single_split_join_handles {
// splits that did not panic were already added to the collector
if let Err(join_error) = leaf_search_join_handle.await {
if join_error.is_cancelled() {
// An explicit task cancellation is not an error.
continue;
}
if join_error.is_panic() {
error!(split=%split, "leaf search task panicked");
} else {
error!(split=%split, "please report: leaf search was not cancelled, and could not extract panic. this should never happen");
}
split_search_join_errors.push((split, join_error));
}
}

// we can't use unwrap_or_clone because mutexes aren't Clone
let mut incremental_merge_collector = match Arc::try_unwrap(incremental_merge_collector) {
Ok(filter_merger) => filter_merger.into_inner().unwrap(),
Err(filter_merger) => filter_merger.lock().unwrap().clone(),
};

for result in split_search_results {
// splits that did not panic were already added to the collector
if let Err(e) = result {
incremental_merge_collector.add_failed_split(SplitSearchError {
// we could reasonably add a wrapper to the JoinHandle to give us the
// split_id anyway
split_id: "unknown".to_string(),
error: format!("{}", SearchError::from(e)),
retryable_error: true,
})
}
for (split_id, split_search_join_error) in split_search_join_errors {
incremental_merge_collector.add_failed_split(SplitSearchError {
split_id,
error: format!("{}", SearchError::from(split_search_join_error)),
retryable_error: true,
});
}

let result = crate::search_thread_pool()
Expand All @@ -1289,6 +1309,7 @@ pub async fn leaf_search(
#[instrument(skip_all, fields(split_id = split.split_id))]
async fn leaf_search_single_split_wrapper(
request: SearchRequest,
query_ast: Arc<QueryAst>,
searcher_context: Arc<SearcherContext>,
index_storage: Arc<dyn Storage>,
doc_mapper: Arc<dyn DocMapper>,
Expand All @@ -1305,6 +1326,7 @@ async fn leaf_search_single_split_wrapper(
let leaf_search_single_split_res = leaf_search_single_split(
&searcher_context,
request,
query_ast,
index_storage,
split.clone(),
doc_mapper,
Expand Down

0 comments on commit f6ff304

Please sign in to comment.