Skip to content

Commit

Permalink
Refactoring: Move partition snapshotting to PPM (#2303)
Browse files Browse the repository at this point in the history
This change moves the responsibility for orchestrating snapshot creation to the
PartitionProcessorManager, allowing the PartitionProcessor to be more focused
on its core task of processing journal operations.
  • Loading branch information
pcholakov authored Nov 21, 2024
1 parent b523e98 commit 6293d25
Show file tree
Hide file tree
Showing 13 changed files with 452 additions and 285 deletions.
10 changes: 10 additions & 0 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,7 @@ mod tests {

struct NodeStateHandler {
persisted_lsn: Arc<AtomicU64>,
archived_lsn: Arc<AtomicU64>,
// set of node ids for which the handler won't send a response to the caller, this allows to simulate
// dead nodes
block_list: BTreeSet<GenerationalNodeId>,
Expand All @@ -578,6 +579,7 @@ mod tests {

let partition_processor_status = PartitionProcessorStatus {
last_persisted_log_lsn: Some(Lsn::from(self.persisted_lsn.load(Ordering::Relaxed))),
last_archived_log_lsn: Some(Lsn::from(self.archived_lsn.load(Ordering::Relaxed))),
..PartitionProcessorStatus::new()
};

Expand Down Expand Up @@ -606,8 +608,11 @@ mod tests {
};

let persisted_lsn = Arc::new(AtomicU64::new(0));
let archived_lsn = Arc::new(AtomicU64::new(0));

let get_node_state_handler = Arc::new(NodeStateHandler {
persisted_lsn: Arc::clone(&persisted_lsn),
archived_lsn: Arc::clone(&archived_lsn),
block_list: BTreeSet::new(),
});

Expand Down Expand Up @@ -689,8 +694,11 @@ mod tests {
};

let persisted_lsn = Arc::new(AtomicU64::new(0));
let archived_lsn = Arc::new(AtomicU64::new(0));

let get_node_state_handler = Arc::new(NodeStateHandler {
persisted_lsn: Arc::clone(&persisted_lsn),
archived_lsn: Arc::clone(&archived_lsn),
block_list: BTreeSet::new(),
});
let (node_env, bifrost) = create_test_env(config, |builder| {
Expand Down Expand Up @@ -768,6 +776,7 @@ mod tests {
};

let persisted_lsn = Arc::new(AtomicU64::new(0));
let archived_lsn = Arc::new(AtomicU64::new(0));

let (node_env, bifrost) = create_test_env(config, |builder| {
let black_list = builder
Expand All @@ -780,6 +789,7 @@ mod tests {

let get_node_state_handler = NodeStateHandler {
persisted_lsn: Arc::clone(&persisted_lsn),
archived_lsn: Arc::clone(&archived_lsn),
block_list: black_list,
};

Expand Down
55 changes: 51 additions & 4 deletions crates/core/src/worker_api/partition_processor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
// by the Apache License, Version 2.0.

use std::collections::BTreeMap;
use std::io;

use tokio::sync::{mpsc, oneshot};

Expand All @@ -21,7 +22,7 @@ use crate::ShutdownError;

#[derive(Debug)]
pub enum ProcessorsManagerCommand {
CreateSnapshot(PartitionId, oneshot::Sender<anyhow::Result<SnapshotId>>),
CreateSnapshot(PartitionId, oneshot::Sender<SnapshotResult>),
GetState(oneshot::Sender<BTreeMap<PartitionId, PartitionProcessorStatus>>),
}

Expand All @@ -33,12 +34,20 @@ impl ProcessorsManagerHandle {
Self(sender)
}

pub async fn create_snapshot(&self, partition_id: PartitionId) -> anyhow::Result<SnapshotId> {
pub async fn create_snapshot(&self, partition_id: PartitionId) -> SnapshotResult {
let (tx, rx) = oneshot::channel();
self.0
.send(ProcessorsManagerCommand::CreateSnapshot(partition_id, tx))
.await?;
rx.await?
.await
.map_err(|_| {
SnapshotError::Internal(
partition_id,
"Unable to send command to PartitionProcessorManager".to_string(),
)
})?;
rx.await.map_err(|_| {
SnapshotError::Internal(partition_id, "Unable to receive response".to_string())
})?
}

pub async fn get_state(
Expand All @@ -52,3 +61,41 @@ impl ProcessorsManagerHandle {
rx.await.map_err(|_| ShutdownError)
}
}

pub type SnapshotResult = Result<SnapshotCreated, SnapshotError>;

#[derive(Debug, Clone, derive_more::Display)]
#[display("{}", snapshot_id)]
pub struct SnapshotCreated {
pub snapshot_id: SnapshotId,
pub partition_id: PartitionId,
}

#[derive(Debug, thiserror::Error)]
pub enum SnapshotError {
#[error("Partition {0} not found")]
PartitionNotFound(PartitionId),
#[error("Snapshot creation already in progress for partition {0}")]
SnapshotInProgress(PartitionId),
#[error("Partition processor state does not allow snapshot export {0}")]
InvalidState(PartitionId),
#[error("Snapshot failed for partition {0}: {1}")]
SnapshotExportError(PartitionId, #[source] anyhow::Error),
#[error("Snapshot failed for partition {0}: {1}")]
SnapshotMetadataHeaderError(PartitionId, #[source] io::Error),
#[error("Internal error creating snapshot for partition {0}: {1}")]
Internal(PartitionId, String),
}

impl SnapshotError {
pub fn partition_id(&self) -> PartitionId {
match self {
SnapshotError::PartitionNotFound(partition_id) => *partition_id,
SnapshotError::SnapshotInProgress(partition_id) => *partition_id,
SnapshotError::InvalidState(partition_id) => *partition_id,
SnapshotError::SnapshotExportError(partition_id, _) => *partition_id,
SnapshotError::SnapshotMetadataHeaderError(partition_id, _) => *partition_id,
SnapshotError::Internal(partition_id, _) => *partition_id,
}
}
}
21 changes: 17 additions & 4 deletions crates/partition-store/src/partition_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::sync::Arc;
use bytes::Bytes;
use bytes::BytesMut;
use codederror::CodedError;
use enum_map::Enum;
use restate_rocksdb::CfName;
use restate_rocksdb::IoMode;
use restate_rocksdb::Priority;
Expand All @@ -28,8 +29,8 @@ use rocksdb::PrefixRange;
use rocksdb::ReadOptions;
use rocksdb::{BoundColumnFamily, SliceTransform};
use static_assertions::const_assert_eq;
use tracing::trace;

use enum_map::Enum;
use restate_core::ShutdownError;
use restate_rocksdb::{RocksDb, RocksError};
use restate_storage_api::{Storage, StorageError, Transaction};
Expand Down Expand Up @@ -444,11 +445,19 @@ impl PartitionStore {
.await
.map_err(|err| StorageError::Generic(err.into()))?;

trace!(
cf_name = ?self.data_cf_name,
%applied_lsn,
"Exported column family snapshot to {:?}",
snapshot_dir
);

Ok(LocalPartitionSnapshot {
base_dir: snapshot_dir,
files: metadata.get_files(),
db_comparator_name: metadata.get_db_comparator_name(),
min_applied_lsn: applied_lsn,
key_range: self.key_range.clone(),
})
}
}
Expand All @@ -474,8 +483,9 @@ impl Storage for PartitionStore {

impl StorageAccess for PartitionStore {
type DBAccess<'a>
= DB where
Self: 'a,;
= DB
where
Self: 'a;

fn iterator_from<K: TableKey>(
&self,
Expand Down Expand Up @@ -646,7 +656,10 @@ impl<'a> Transaction for PartitionStoreTransaction<'a> {
}

impl<'a> StorageAccess for PartitionStoreTransaction<'a> {
type DBAccess<'b> = DB where Self: 'b;
type DBAccess<'b>
= DB
where
Self: 'b;

fn iterator_from<K: TableKey>(
&self,
Expand Down
38 changes: 31 additions & 7 deletions crates/partition-store/src/partition_store_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,25 @@

use std::collections::BTreeMap;
use std::ops::RangeInclusive;
use std::path::Path;
use std::sync::Arc;

use rocksdb::ExportImportFilesMetaData;
use tokio::sync::Mutex;
use tracing::{debug, error, info, warn};

use crate::cf_options;
use crate::snapshots::LocalPartitionSnapshot;
use crate::PartitionStore;
use crate::DB;
use restate_core::worker_api::SnapshotError;
use restate_rocksdb::{
CfName, CfPrefixPattern, DbName, DbSpecBuilder, RocksDb, RocksDbManager, RocksError,
};
use restate_types::config::{RocksDbOptions, StorageOptions};
use restate_types::identifiers::{PartitionId, PartitionKey};
use restate_types::identifiers::{PartitionId, PartitionKey, SnapshotId};
use restate_types::live::{BoxedLiveLoad, LiveLoad};

use crate::cf_options;
use crate::snapshots::LocalPartitionSnapshot;
use crate::PartitionStore;
use crate::DB;

const DB_NAME: &str = "db";
const PARTITION_CF_PREFIX: &str = "data-";

Expand Down Expand Up @@ -133,7 +134,7 @@ impl PartitionStoreManager {
/// Imports a partition snapshot and opens it as a partition store.
/// The database must not have an existing column family for the partition id;
/// it will be created based on the supplied snapshot.
pub async fn restore_partition_store_snapshot(
pub async fn open_partition_store_from_snapshot(
&self,
partition_id: PartitionId,
partition_key_range: RangeInclusive<PartitionKey>,
Expand Down Expand Up @@ -194,6 +195,29 @@ impl PartitionStoreManager {
Ok(partition_store)
}

pub async fn export_partition_snapshot(
&self,
partition_id: PartitionId,
snapshot_id: SnapshotId,
snapshot_base_path: &Path,
) -> Result<LocalPartitionSnapshot, SnapshotError> {
let mut partition_store = self
.get_partition_store(partition_id)
.await
.ok_or(SnapshotError::PartitionNotFound(partition_id))?;

// RocksDB will create the snapshot directory but the parent must exist first:
tokio::fs::create_dir_all(snapshot_base_path)
.await
.map_err(|e| SnapshotError::SnapshotExportError(partition_id, e.into()))?;
let snapshot_dir = snapshot_base_path.join(snapshot_id.to_string());

partition_store
.create_snapshot(snapshot_dir)
.await
.map_err(|e| SnapshotError::SnapshotExportError(partition_id, e.into()))
}

pub async fn drop_partition(&self, partition_id: PartitionId) {
let mut guard = self.lookup.lock().await;
self.raw_db
Expand Down
1 change: 1 addition & 0 deletions crates/partition-store/src/snapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub struct LocalPartitionSnapshot {
pub min_applied_lsn: Lsn,
pub db_comparator_name: String,
pub files: Vec<LiveFile>,
pub key_range: RangeInclusive<PartitionKey>,
}

/// RocksDB SST file that is part of a snapshot. Serialization wrapper around [LiveFile].
Expand Down
6 changes: 4 additions & 2 deletions crates/partition-store/src/tests/snapshots_test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ pub(crate) async fn run_tests(manager: PartitionStoreManager, mut partition_stor

let snapshot = partition_store.create_snapshot(path_buf).await.unwrap();

let key_range = partition_store.partition_key_range().clone();
let snapshot_meta = PartitionSnapshotMetadata {
version: SnapshotFormatVersion::V1,
cluster_name: "cluster_name".to_string(),
partition_id,
node_name: "node".to_string(),
created_at: humantime::Timestamp::from(SystemTime::from(MillisSinceEpoch::new(0))),
snapshot_id: SnapshotId::from_parts(0, 0),
key_range: partition_store.partition_key_range().clone(),
key_range: key_range.clone(),
min_applied_lsn: snapshot.min_applied_lsn,
db_comparator_name: snapshot.db_comparator_name.clone(),
files: snapshot.files.clone(),
Expand All @@ -48,12 +49,13 @@ pub(crate) async fn run_tests(manager: PartitionStoreManager, mut partition_stor
min_applied_lsn: snapshot_meta.min_applied_lsn,
db_comparator_name: snapshot_meta.db_comparator_name.clone(),
files: snapshot_meta.files.clone(),
key_range,
};

let worker_options = Live::from_value(WorkerOptions::default());

let mut new_partition_store = manager
.restore_partition_store_snapshot(
.open_partition_store_from_snapshot(
partition_id,
RangeInclusive::new(0, PartitionKey::MAX - 1),
snapshot,
Expand Down
Loading

0 comments on commit 6293d25

Please sign in to comment.