Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce SnapshotRepository find_latest and wire up partition restore #2353

Open
wants to merge 1 commit into
base: feat/snapshot-upload
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions crates/partition-store/src/partition_store_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,11 @@ impl PartitionStoreManager {
})
}

pub async fn has_partition(&self, partition_id: PartitionId) -> bool {
let guard = self.lookup.lock().await;
guard.live.contains_key(&partition_id)
/// Check whether we have a partition store for the given partition id, irrespective of whether
/// the store is open or not.
pub async fn has_partition_store(&self, partition_id: PartitionId) -> bool {
let cf_name = cf_for_partition(partition_id);
self.rocksdb.inner().cf_handle(&cf_name).is_some()
}

pub async fn get_partition_store(&self, partition_id: PartitionId) -> Option<PartitionStore> {
Expand Down Expand Up @@ -169,8 +171,9 @@ impl PartitionStoreManager {

info!(
?partition_id,
min_applied_lsn = ?snapshot.min_applied_lsn,
"Initializing partition store from snapshot"
lsn = ?snapshot.min_applied_lsn,
path = ?snapshot.base_dir,
"Importing partition store snapshot"
);

if let Err(e) = self
Expand Down
1 change: 1 addition & 0 deletions crates/worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ serde = { workspace = true }
serde_json = { workspace = true }
serde_with = { workspace = true }
strum = { workspace = true }
tempfile = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
Expand Down
101 changes: 99 additions & 2 deletions crates/worker/src/partition/snapshots/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ use object_store::aws::AmazonS3Builder;
use object_store::{MultipartUpload, ObjectStore, PutPayload};
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use tempfile::TempDir;
use tokio::io::AsyncReadExt;
use tracing::{debug, trace};
use tracing::{debug, info, trace};
use url::Url;

use restate_partition_store::snapshots::{PartitionSnapshotMetadata, SnapshotFormatVersion};
use restate_partition_store::snapshots::{
LocalPartitionSnapshot, PartitionSnapshotMetadata, SnapshotFormatVersion,
};
use restate_types::config::SnapshotsOptions;
use restate_types::identifiers::{PartitionId, SnapshotId};
use restate_types::logs::Lsn;
Expand All @@ -48,6 +51,7 @@ pub struct SnapshotRepository {
object_store: Arc<dyn ObjectStore>,
destination: Url,
prefix: String,
base_dir: PathBuf,
}

#[serde_as]
Expand Down Expand Up @@ -138,6 +142,7 @@ impl SnapshotRepository {
object_store,
destination,
prefix,
base_dir,
})
}

Expand Down Expand Up @@ -237,6 +242,98 @@ impl SnapshotRepository {

Ok(())
}

/// Discover and download the latest snapshot available. Dropping the returned
/// `LocalPartitionSnapshot` will delete the local snapshot data files.
Comment on lines +246 to +247
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it because the files are stored in a temp directory? On LocalPartitionSnapshot itself I couldn't find how the files are deleted when dropping it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the temp dir also the mechanism to clean things up if downloading it failed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that TempDir::with_prefix_in takes care of it since it deletes the files when it gets dropped. This is a nice solution!

pub(crate) async fn get_latest(
&self,
partition_id: PartitionId,
) -> anyhow::Result<Option<LocalPartitionSnapshot>> {
let latest_path = object_store::path::Path::from(format!(
"{prefix}{partition_id}/latest.json",
prefix = self.prefix,
partition_id = partition_id,
));

let latest = self.object_store.get(&latest_path).await;

let latest = match latest {
Ok(result) => result,
Err(object_store::Error::NotFound { .. }) => {
debug!("Latest snapshot data not found in repository");
return Ok(None);
}
Err(e) => return Err(e.into()),
};

let latest: LatestSnapshot =
serde_json::from_slice(latest.bytes().await?.iter().as_slice())?;
trace!("Latest snapshot metadata: {:?}", latest);

let snapshot_metadata_path = object_store::path::Path::from(format!(
"{prefix}{partition_id}/{path}/metadata.json",
prefix = self.prefix,
partition_id = partition_id,
path = latest.path,
));
let snapshot_metadata = self.object_store.get(&snapshot_metadata_path).await;

let snapshot_metadata = match snapshot_metadata {
Ok(result) => result,
Err(object_store::Error::NotFound { .. }) => {
info!("Latest snapshot points to a snapshot that was not found in the repository!");
return Ok(None); // arguably this could also be an error
}
Err(e) => return Err(e.into()),
};

let mut snapshot_metadata: PartitionSnapshotMetadata =
serde_json::from_slice(snapshot_metadata.bytes().await?.iter().as_slice())?;
if snapshot_metadata.version != SnapshotFormatVersion::V1 {
return Err(anyhow!(
"Unsupported snapshot format version: {:?}",
snapshot_metadata.version
));
}

// The snapshot ingest directory should be on the same filesystem as the partition store
// to minimize IO and disk space usage during import.
let snapshot_dir = TempDir::with_prefix_in(
format!("{}-", snapshot_metadata.snapshot_id),
&self.base_dir,
)?;
debug!(
snapshot_id = %snapshot_metadata.snapshot_id,
path = ?snapshot_dir.path(),
"Getting snapshot data",
);

// todo(pavel): stream the data from the object store
for file in &mut snapshot_metadata.files {
let filename = file.name.trim_start_matches("/");
let key = object_store::path::Path::from(format!(
"{prefix}{partition_id}/{path}/{filename}",
prefix = self.prefix,
partition_id = partition_id,
path = latest.path,
filename = filename,
));
let file_path = snapshot_dir.path().join(filename);
let file_data = self.object_store.get(&key).await?;
tokio::fs::write(&file_path, file_data.bytes().await?).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it would indeed be great to write the file in streaming fashion to disk. Especially once our SSTs grow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe something like

let mut file_data = self.object_store.get(&key).await?.into_stream();
let mut snapshot_file = tokio::fs::File::create_new(&file_path).await?;
while let Some(data) = file_data.next().await {
       snapshot_file.write_all(&data?).await?;
}

can already be enough. Do you know how large the chunks of the stream returned by self.object_store.get(&key).await?.into_stream() will be?

trace!(%key, "Downloaded snapshot data file to {:?}", file_path);
// Patch paths to point to the local staging directory
file.directory = snapshot_dir.path().to_string_lossy().to_string();
}

Ok(Some(LocalPartitionSnapshot {
base_dir: snapshot_dir.into_path(),
min_applied_lsn: snapshot_metadata.min_applied_lsn,
db_comparator_name: snapshot_metadata.db_comparator_name,
files: snapshot_metadata.files,
key_range: snapshot_metadata.key_range.clone(),
}))
}
}

/// S3 and other stores require a certain minimum size for the parts of a multipart upload. It is an
Expand Down
1 change: 1 addition & 0 deletions crates/worker/src/partition_processor_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,7 @@ impl PartitionProcessorManager {
self.metadata.clone(),
self.bifrost.clone(),
self.partition_store_manager.clone(),
self.snapshot_repository.clone(),
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
use std::ops::RangeInclusive;

use tokio::sync::{mpsc, watch};
use tracing::instrument;
use tracing::{info, instrument, trace};

use crate::invoker_integration::EntryEnricher;
use crate::partition::invoker_storage_reader::InvokerStorageReader;
use crate::partition::snapshots::SnapshotRepository;
use crate::partition_processor_manager::processor_state::StartedProcessor;
use crate::PartitionProcessorBuilder;
use restate_bifrost::Bifrost;
Expand All @@ -38,6 +39,7 @@ pub struct SpawnPartitionProcessorTask {
metadata: Metadata,
bifrost: Bifrost,
partition_store_manager: PartitionStoreManager,
snapshot_repository: SnapshotRepository,
}

impl SpawnPartitionProcessorTask {
Expand All @@ -51,6 +53,7 @@ impl SpawnPartitionProcessorTask {
metadata: Metadata,
bifrost: Bifrost,
partition_store_manager: PartitionStoreManager,
snapshot_repository: SnapshotRepository,
) -> Self {
Self {
task_name,
Expand All @@ -61,6 +64,7 @@ impl SpawnPartitionProcessorTask {
metadata,
bifrost,
partition_store_manager,
snapshot_repository,
}
}

Expand All @@ -82,6 +86,7 @@ impl SpawnPartitionProcessorTask {
metadata,
bifrost,
partition_store_manager,
snapshot_repository,
} = self;

let config = configuration.pinned();
Expand Down Expand Up @@ -129,14 +134,53 @@ impl SpawnPartitionProcessorTask {
{
let options = options.clone();
let key_range = key_range.clone();
let partition_store = partition_store_manager
.open_partition_store(
partition_id,
key_range,
OpenMode::CreateIfMissing,
&options.storage.rocksdb,
)
.await?;

let partition_store = if !partition_store_manager
.has_partition_store(pp_builder.partition_id)
.await
Comment on lines +138 to +140
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of scope of this PR: What is the plan how to handle a PP that has some data but the data is lagging too far behind? So starting the PP would result into a trim gap. Would we then drop the respective column family and restart it?

{
trace!(
partition_id = %partition_id,
"Looking for partition snapshot from which to bootstrap partition store",
);
let snapshot = snapshot_repository.get_latest(partition_id).await?;
if let Some(snapshot) = snapshot {
info!(
partition_id = %partition_id,
"Found snapshot to bootstrap partition, restoring it",
);
partition_store_manager
.open_partition_store_from_snapshot(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In

, we seem to copy the snapshot files to keep them intact. What is the reason for this? Wouldn't it be more efficient to move the files because it wouldn't inflict any I/O costs if the snapshot directory is on the same filesystem as the target directory?

partition_id,
key_range.clone(),
snapshot,
&options.storage.rocksdb,
)
.await?
} else {
info!(
partition_id = %partition_id,
"No snapshot found to bootstrap partition, creating new store",
);
partition_store_manager
.open_partition_store(
partition_id,
key_range,
OpenMode::CreateIfMissing,
&options.storage.rocksdb,
)
.await?
}
} else {
partition_store_manager
.open_partition_store(
partition_id,
key_range,
OpenMode::OpenExisting,
&options.storage.rocksdb,
)
.await?
};

move || async move {
tc.spawn_child(
Expand Down
Loading