Skip to content

Commit

Permalink
Reaquire shards only if they are stale
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Aug 27, 2024
1 parent b2aad01 commit 34dce73
Show file tree
Hide file tree
Showing 3 changed files with 228 additions and 86 deletions.
117 changes: 88 additions & 29 deletions quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ pub struct QueueCoordinator {
shared_state: QueueSharedState,
local_state: QueueLocalState,
publish_token: String,
visible_settings: VisibilitySettings,
visibility_settings: VisibilitySettings,
}

impl fmt::Debug for QueueCoordinator {
Expand All @@ -102,6 +102,9 @@ impl QueueCoordinator {
metastore: source_runtime.metastore,
source_id: source_runtime.pipeline_id.source_id.clone(),
index_uid: source_runtime.pipeline_id.index_uid.clone(),
reacquire_grace_period: Duration::from_secs(
2 * source_runtime.indexing_setting.commit_timeout_secs as u64,
),
},
local_state: QueueLocalState::default(),
pipeline_id: source_runtime.pipeline_id,
Expand All @@ -113,7 +116,7 @@ impl QueueCoordinator {
message_type,
publish_lock: PublishLock::default(),
publish_token: Ulid::new().to_string(),
visible_settings: VisibilitySettings::from_commit_timeout(
visibility_settings: VisibilitySettings::from_commit_timeout(
source_runtime.indexing_setting.commit_timeout_secs,
),
}
Expand Down Expand Up @@ -157,7 +160,7 @@ impl QueueCoordinator {
async fn poll_messages(&mut self, ctx: &SourceContext) -> Result<(), ActorExitStatus> {
let raw_messages = self
.queue_receiver
.receive(1, self.visible_settings.deadline_for_receive)
.receive(1, self.visibility_settings.deadline_for_receive)
.await?;

let mut format_errors = Vec::new();
Expand Down Expand Up @@ -215,7 +218,7 @@ impl QueueCoordinator {
self.queue.clone(),
message.metadata.ack_id.clone(),
message.metadata.initial_deadline,
self.visible_settings.clone(),
self.visibility_settings.clone(),
),
content: message,
position,
Expand Down Expand Up @@ -254,7 +257,7 @@ impl QueueCoordinator {
.await?;
if in_progress_ref.batch_reader.is_eof() {
self.local_state
.drop_currently_read(self.visible_settings.deadline_for_last_extension)
.drop_currently_read(self.visibility_settings.deadline_for_last_extension)
.await?;
self.observable_state.num_messages_processed += 1;
}
Expand Down Expand Up @@ -319,7 +322,7 @@ mod tests {
use crate::source::doc_file_reader::file_test_helpers::{generate_dummy_doc_file, DUMMY_DOC};
use crate::source::queue_sources::memory_queue::MemoryQueueForTests;
use crate::source::queue_sources::message::PreProcessedPayload;
use crate::source::queue_sources::shared_state::shared_state_for_tests::shared_state_for_tests;
use crate::source::queue_sources::shared_state::shared_state_for_tests::init_state;
use crate::source::{SourceActor, BATCH_NUM_BYTES_LIMIT};

fn setup_coordinator(
Expand Down Expand Up @@ -347,7 +350,7 @@ mod tests {
source_type: SourceType::Unspecified,
storage_resolver: StorageResolver::for_test(),
publish_token: Ulid::new().to_string(),
visible_settings: VisibilitySettings::from_commit_timeout(5),
visibility_settings: VisibilitySettings::from_commit_timeout(5),
}
}

Expand Down Expand Up @@ -401,7 +404,7 @@ mod tests {
#[tokio::test]
async fn test_process_empty_queue() {
let queue = Arc::new(MemoryQueueForTests::new());
let shared_state = shared_state_for_tests("test-index", Default::default());
let shared_state = init_state("test-index", Default::default());
let mut coordinator = setup_coordinator(queue.clone(), shared_state);
let batches = process_messages(&mut coordinator, queue, &[]).await;
assert_eq!(batches.len(), 0);
Expand All @@ -410,7 +413,7 @@ mod tests {
#[tokio::test]
async fn test_process_one_small_message() {
let queue = Arc::new(MemoryQueueForTests::new());
let shared_state = shared_state_for_tests("test-index", Default::default());
let shared_state = init_state("test-index", Default::default());
let mut coordinator = setup_coordinator(queue.clone(), shared_state.clone());
let (dummy_doc_file, _) = generate_dummy_doc_file(false, 10).await;
let test_uri = Uri::from_str(dummy_doc_file.path().to_str().unwrap()).unwrap();
Expand All @@ -424,7 +427,7 @@ mod tests {
#[tokio::test]
async fn test_process_one_big_message() {
let queue = Arc::new(MemoryQueueForTests::new());
let shared_state = shared_state_for_tests("test-index", Default::default());
let shared_state = init_state("test-index", Default::default());
let mut coordinator = setup_coordinator(queue.clone(), shared_state);
let lines = BATCH_NUM_BYTES_LIMIT as usize / DUMMY_DOC.len() + 1;
let (dummy_doc_file, _) = generate_dummy_doc_file(true, lines).await;
Expand All @@ -437,7 +440,7 @@ mod tests {
#[tokio::test]
async fn test_process_two_messages_different_compression() {
let queue = Arc::new(MemoryQueueForTests::new());
let shared_state = shared_state_for_tests("test-index", Default::default());
let shared_state = init_state("test-index", Default::default());
let mut coordinator = setup_coordinator(queue.clone(), shared_state);
let (dummy_doc_file_1, _) = generate_dummy_doc_file(false, 10).await;
let test_uri_1 = Uri::from_str(dummy_doc_file_1.path().to_str().unwrap()).unwrap();
Expand All @@ -456,7 +459,7 @@ mod tests {
#[tokio::test]
async fn test_process_local_duplicate_message() {
let queue = Arc::new(MemoryQueueForTests::new());
let shared_state = shared_state_for_tests("test-index", Default::default());
let shared_state = init_state("test-index", Default::default());
let mut coordinator = setup_coordinator(queue.clone(), shared_state);
let (dummy_doc_file, _) = generate_dummy_doc_file(false, 10).await;
let test_uri = Uri::from_str(dummy_doc_file.path().to_str().unwrap()).unwrap();
Expand All @@ -477,11 +480,15 @@ mod tests {
let partition_id = PreProcessedPayload::ObjectUri(test_uri.clone()).partition_id();

let queue = Arc::new(MemoryQueueForTests::new());
let shared_state = shared_state_for_tests(
let shared_state = init_state(
"test-index",
&[(
partition_id.clone(),
("existing_token".to_string(), Position::eof(file_size)),
(
"existing_token".to_string(),
Position::eof(file_size),
false,
),
)],
);
let mut coordinator = setup_coordinator(queue.clone(), shared_state.clone());
Expand All @@ -492,30 +499,82 @@ mod tests {
assert!(coordinator.local_state.is_completed(&partition_id));
}

#[tokio::test]
async fn test_process_existing_messages() {
let (dummy_doc_file_1, _) = generate_dummy_doc_file(false, 10).await;
let test_uri_1 = Uri::from_str(dummy_doc_file_1.path().to_str().unwrap()).unwrap();
let partition_id_1 = PreProcessedPayload::ObjectUri(test_uri_1.clone()).partition_id();

let (dummy_doc_file_2, _) = generate_dummy_doc_file(false, 10).await;
let test_uri_2 = Uri::from_str(dummy_doc_file_2.path().to_str().unwrap()).unwrap();
let partition_id_2 = PreProcessedPayload::ObjectUri(test_uri_2.clone()).partition_id();

let (dummy_doc_file_3, _) = generate_dummy_doc_file(false, 10).await;
let test_uri_3 = Uri::from_str(dummy_doc_file_3.path().to_str().unwrap()).unwrap();
let partition_id_3 = PreProcessedPayload::ObjectUri(test_uri_3.clone()).partition_id();

let queue = Arc::new(MemoryQueueForTests::new());
let shared_state = init_state(
"test-index",
&[
(
partition_id_1.clone(),
("existing_token_1".to_string(), Position::Beginning, true),
),
(
partition_id_2.clone(),
(
"existing_token_2".to_string(),
Position::offset((DUMMY_DOC.len() + 1) * 2),
true,
),
),
(
partition_id_3.clone(),
(
"existing_token_3".to_string(),
Position::offset((DUMMY_DOC.len() + 1) * 6),
false, // should not be processed because not stale yet
),
),
],
);
let mut coordinator = setup_coordinator(queue.clone(), shared_state.clone());
let batches = process_messages(
&mut coordinator,
queue,
&[
(&test_uri_1, "ack-id-1"),
(&test_uri_2, "ack-id-2"),
(&test_uri_3, "ack-id-3"),
],
)
.await;
assert_eq!(batches.len(), 2);
assert_eq!(batches.iter().map(|b| b.docs.len()).sum::<usize>(), 18);
assert!(coordinator.local_state.is_awaiting_commit(&partition_id_1));
assert!(coordinator.local_state.is_awaiting_commit(&partition_id_2));
}

#[tokio::test]
async fn test_process_multiple_coordinator() {
let queue = Arc::new(MemoryQueueForTests::new());
let shared_state = shared_state_for_tests("test-index", Default::default());
let mut proc_1 = setup_coordinator(queue.clone(), shared_state.clone());
let mut proc_2 = setup_coordinator(queue.clone(), shared_state.clone());
let shared_state = init_state("test-index", Default::default());
let mut coord_1 = setup_coordinator(queue.clone(), shared_state.clone());
let mut coord_2 = setup_coordinator(queue.clone(), shared_state.clone());
let (dummy_doc_file, _) = generate_dummy_doc_file(false, 10).await;
let test_uri = Uri::from_str(dummy_doc_file.path().to_str().unwrap()).unwrap();
let partition_id = PreProcessedPayload::ObjectUri(test_uri.clone()).partition_id();

let batches_1 = process_messages(&mut proc_1, queue.clone(), &[(&test_uri, "ack1")]).await;
let batches_2 = process_messages(&mut proc_2, queue, &[(&test_uri, "ack2")]).await;
let batches_1 = process_messages(&mut coord_1, queue.clone(), &[(&test_uri, "ack1")]).await;
let batches_2 = process_messages(&mut coord_2, queue, &[(&test_uri, "ack2")]).await;

assert_eq!(batches_1.len(), 1);
assert_eq!(batches_1[0].docs.len(), 10);
assert!(proc_1.local_state.is_awaiting_commit(&partition_id));
// proc_2 doesn't know for sure what is happening with the message
// (proc_1 might have crashed), so it just acquires it and takes over
// processing
//
// TODO: this test should fail once we implement the grace
// period before a partition can be re-acquired
assert_eq!(batches_2.len(), 1);
assert_eq!(batches_2[0].docs.len(), 10);
assert!(proc_2.local_state.is_awaiting_commit(&partition_id));
assert!(coord_1.local_state.is_awaiting_commit(&partition_id));
// proc_2 learns from shared state that the message is likely still
// being processed and skips it
assert_eq!(batches_2.len(), 0);
assert!(!coord_2.local_state.is_tracked(&partition_id));
}
}
Loading

0 comments on commit 34dce73

Please sign in to comment.