Skip to content

Commit

Permalink
Garbage collect shards in SQS Filesource
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Aug 23, 2024
1 parent a2ffa08 commit 784bb16
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};

use itertools::Itertools;
use quickwit_actors::{ActorExitStatus, Mailbox};
Expand Down Expand Up @@ -105,6 +105,10 @@ impl QueueCoordinator {
reacquire_grace_period: Duration::from_secs(
2 * source_runtime.indexing_setting.commit_timeout_secs as u64,
),
last_pruning: Instant::now(),
max_age: None,
max_count: None,
pruning_interval: Duration::from_secs(60),
},
local_state: QueueLocalState::default(),
pipeline_id: source_runtime.pipeline_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::BTreeMap;
use std::time::Duration;
use std::time::{Duration, Instant};

use anyhow::{bail, Context};
use quickwit_metastore::checkpoint::PartitionId;
use quickwit_proto::metastore::{
AcquireShardsRequest, MetastoreService, MetastoreServiceClient, OpenShardSubrequest,
OpenShardsRequest,
OpenShardsRequest, PruneShardsRequest,
};
use quickwit_proto::types::{DocMappingUid, IndexUid, Position, ShardId};
use time::OffsetDateTime;
use tracing::info;
use tracing::{error, info};

use super::message::PreProcessedMessage;

Expand All @@ -40,9 +40,31 @@ pub struct QueueSharedState {
/// Duration after which the processing of a shard is considered stale and
/// should be reacquired
pub reacquire_grace_period: Duration,
pub max_age: Option<u32>,
pub max_count: Option<u32>,
pub last_pruning: Instant,
pub pruning_interval: Duration,
}

impl QueueSharedState {
async fn clean_partitions(&self) {
if self.max_count.is_none() && self.max_age.is_none() {
return;
}
let result = self
.metastore
.prune_shards(PruneShardsRequest {
index_uid: Some(self.index_uid.clone()),
source_id: self.source_id.clone(),
max_age: self.max_age,
max_count: self.max_count,
})
.await;
if let Err(err) = result {
error!(error = ?err, "failed to prune shards");
}
}

/// Tries to acquire the ownership for the provided messages from the global
/// shared context. For each partition id, if the ownership was successfully
/// acquired or the partition was already successfully indexed, the position
Expand All @@ -53,6 +75,9 @@ impl QueueSharedState {
publish_token: &str,
partitions: Vec<PartitionId>,
) -> anyhow::Result<Vec<(PartitionId, Position)>> {
if self.last_pruning.elapsed() > self.pruning_interval {
self.clean_partitions().await;
}
let open_shard_subrequests = partitions
.iter()
.enumerate()
Expand Down Expand Up @@ -294,6 +319,10 @@ pub mod shared_state_for_tests {
index_uid,
source_id: "test-queue-src".to_string(),
reacquire_grace_period: Duration::from_secs(10),
last_pruning: Instant::now(),
max_age: None,
max_count: None,
pruning_interval: Duration::from_secs(10),
}
}
}
Expand Down Expand Up @@ -345,6 +374,10 @@ mod tests {
index_uid,
source_id: "test-sqs-source".to_string(),
reacquire_grace_period: Duration::from_secs(10),
last_pruning: Instant::now(),
max_age: None,
max_count: None,
pruning_interval: Duration::from_secs(10),
};

let aquired = shared_state
Expand Down Expand Up @@ -374,6 +407,10 @@ mod tests {
index_uid,
source_id: "test-sqs-source".to_string(),
reacquire_grace_period: Duration::from_secs(10),
last_pruning: Instant::now(),
max_age: None,
max_count: None,
pruning_interval: Duration::from_secs(10),
};

let acquired = shared_state
Expand Down Expand Up @@ -403,6 +440,10 @@ mod tests {
index_uid,
source_id: "test-sqs-source".to_string(),
reacquire_grace_period: Duration::from_secs(10),
last_pruning: Instant::now(),
max_age: None,
max_count: None,
pruning_interval: Duration::from_secs(10),
};

let aquired = shared_state
Expand Down Expand Up @@ -436,6 +477,10 @@ mod tests {
index_uid,
source_id: "test-sqs-source".to_string(),
reacquire_grace_period: Duration::from_secs(10),
last_pruning: Instant::now(),
max_age: None,
max_count: None,
pruning_interval: Duration::from_secs(10),
};

let checkpointed_msg = checkpoint_messages(&shared_state, "token1", source_messages)
Expand Down

0 comments on commit 784bb16

Please sign in to comment.