-
Notifications
You must be signed in to change notification settings - Fork 37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce SnapshotRepository and object store integration #2310
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @pcholakov for creating this PR. It looks good to me! I left 2 very minor comments
crates/worker/src/lib.rs
Outdated
@@ -79,6 +80,9 @@ pub enum BuildError { | |||
), | |||
#[code(unknown)] | |||
Invoker(#[from] restate_invoker_impl::BuildError), | |||
#[error("failed opening partition store: {0}")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering if this message is very specific of a single error case (on opening) while the variant name and inner error are generic and wider.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copy-pasta from the RocksDb
error above, fixed :-)
.into_string() | ||
.map(|path| format!("file://{path}")) | ||
}) | ||
.map_err(|e| anyhow!("Unable to convert path to string: {:?}", e))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.map_err(|e| anyhow!("Unable to convert path to string: {:?}", e))?; | |
.context("Unable to convert path to string")?; |
This will still include the 'inner' error in the output string when printed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That approach doesn't work here because OsString::into_string()
returns Result<String, OsString>
, which doesn't meet Anyhow's trait bounds :-)
// All common object stores list objects in lexicographical order, with no option for | ||
// reverse order. We inject an explicit sort key into the snapshot prefix to make sure that | ||
// the latest snapshot is always first. | ||
let inverted_sort_key = format!("{:016x}", u64::MAX - lsn.as_u64()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🙌🏼
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's really nice :-)
), | ||
prefix => format!( | ||
"{trimmed_prefix}/{partition_id}/{sk}_{lsn}_{snapshot_id}.tar", | ||
trimmed_prefix = prefix.trim_start_matches('/').trim_end_matches('/'), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not do the trimming in create()
and then store it trimmed on self instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll revisit this for sure, this was the first stab :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for creating this PR @pcholakov. The changes look really good. The one question I have is whether there is a way to avoid materializing the tarball and re-reading into memory. It would be awesome if we can stream the tarballing into the object-store upload.
crates/types/Cargo.toml
Outdated
@@ -74,6 +74,7 @@ tracing = { workspace = true } | |||
tracing-opentelemetry = { workspace = true } | |||
ulid = { workspace = true } | |||
xxhash-rust = { workspace = true, features = ["xxh3"] } | |||
url = "2.5.3" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
workspace dependency?
.map(|s| Ok(s.clone())) | ||
.unwrap_or_else(|| { | ||
base_dir | ||
.join("pp-snapshots") | ||
.into_os_string() | ||
.into_string() | ||
.map(|path| format!("file://{path}")) | ||
}) | ||
.map_err(|e| anyhow!("Unable to convert path to string: {:?}", e))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part looks as if we are putting the inner destination
into a Result
in order to support the unwrap_or_else
case which can return an error. The last map_err
only applies to the unwrap_or_else
but not the something that can happen before. Maybe using a if let Some(...) = snapshot_options.destination {} else {}
would be a tad bit simpler?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it was a bit crude - definitely reads better as if let
, thank you!
/// Write a partition snapshot to the snapshot repository. | ||
pub(crate) async fn put( | ||
&self, | ||
partition_id: PartitionId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't partition_id
already part of PartitionSnapshotMetadata
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed :-)
let staging_path = self.staging_path.clone(); | ||
tokio::fs::create_dir_all(&staging_path).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this happen when we create the SnapshotRepository
instead of on every put
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call, will move!
), | ||
}; | ||
|
||
let staging_path = self.staging_path.clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can save the clone
if you move NamedTempFile::new_in(&staging_path)
to before you spawn the blocking task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll moved away from NamedTempFile
and cleaned this up in the latest iteration :-)
let packaging_task = task_center().spawn_blocking_unmanaged( | ||
"package-snapshot", | ||
Some(partition_id), | ||
async move { | ||
trace_span!("package-snapshot", %snapshot_id).in_scope(|| { | ||
let mut tarball = tar::Builder::new(NamedTempFile::new_in(&staging_path)?); | ||
debug!( | ||
"Creating snapshot tarball of {:?} in: {:?}...", | ||
&staging_path, | ||
tarball.get_ref() | ||
); | ||
tarball.append_dir_all(".", snapshot_path)?; | ||
tarball.finish()?; | ||
tarball.into_inner() | ||
}) | ||
}, | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we creating a tar? Because it is simpler to download it? Because it saves some put/get requests (cost efficiency)? Because it makes the upload more efficient (performance)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way to avoid the materialization of the tarball? It looks as if this could entail quite a bit of I/O if the snapshot consists of many SSTs or is very large.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was a suggestion from both Jack and Ahmed that I went along with, believing I can stream the archive directly to S3. The upside is:
- simpler bucket layout with a single file (though that'll be short-lived as we move towards incremental snapshots)
- simpler to split up into chunks for multi-part upload
The major downside is that we have 2x the on-node storage footprint, AND we do a bunch of I/O to boot. I don't think it's pulling its weight, but I only just realized yesterday that I couldn't make streaming the tar work in memory (blocked on tokio-rs/tokio#6914 which doesn't seem to be going anywhere).
I'll be removing the tar archiving in a follow-up iteration.
// todo(pavel): don't buffer the entire snapshot in memory! | ||
let payload = PutPayload::from(tokio::fs::read(tarball.path()).await?); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would indeed be great. Especially once we have larger snapshots.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ObjecStore already supports multi part upload, you can use that to upload the tar in chunks instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented in the latest revision! 🎉
// the latest snapshot is always first. | ||
let inverted_sort_key = format!("{:016x}", u64::MAX - lsn.as_u64()); | ||
|
||
// The snapshot data / metadata key format is: [<base_prefix>/]<partition_id>/<sort_key>_<lsn>_<snapshot_id>.tar |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the idea for distinguishing full from incremental snapshots in the future? Would the latter have a completely different path or contain a marker file that denotes them as incremental?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm about to introduce this shortly to this PR - the key idea is to upload the tar archives and metadata JSON files separately, so that interested nodes can easily query just the metadata. We can gradually introduce additional attributes to the metadata JSON schema to support referencing the constituent parts of an incremental snapshot. The snapshot format version field within the metadata blob will allow nodes to know how to interpret it - or fail loudly if the Restate server is an older version that doesn't understand it.
The paths will be something like:
[<prefix>/]metadata/<partition_id>/<sort_key>-<snapshot_id>-{lsn}.json
[<prefix>/]snapshot/<partition_id>/<sort_key>-<snapshot_id>-{lsn}.tar
I imagine that at some point we'll add incremental snapshots and the repository format will then look something along the lines of:
[<prefix>/]metadata/<partition_id>/<sort_key>-<snapshot_id>-{lsn}.json
(V2)[<prefix>/]files/<partition_id>/<snapshot_id>-{filename}.sst
In this world, there will no longer be 1:1 metadata-to-snapshot correspondence but rather a 1:n relationship. Additionally, we may want to write some sort of index metadata to make it cheaper to garbage collect disused SSTs - but I haven't thought too much about that yet.
// All common object stores list objects in lexicographical order, with no option for | ||
// reverse order. We inject an explicit sort key into the snapshot prefix to make sure that | ||
// the latest snapshot is always first. | ||
let inverted_sort_key = format!("{:016x}", u64::MAX - lsn.as_u64()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's really nice :-)
crates/worker/Cargo.toml
Outdated
@@ -63,13 +67,16 @@ serde = { workspace = true } | |||
serde_json = { workspace = true } | |||
serde_with = { workspace = true } | |||
strum = { workspace = true } | |||
tar = "0.4.43" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this might need to be moved to the workspace dependencies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will be replaced in the next revision anyway :-)
9f6d162
to
d686e7e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @tillrohrmann and @muhamadazmy for your early input, it was really valuable! I've pushed a new revision but I still want to remove tar archiving before I mark it ready for review.
crates/worker/src/lib.rs
Outdated
@@ -79,6 +80,9 @@ pub enum BuildError { | |||
), | |||
#[code(unknown)] | |||
Invoker(#[from] restate_invoker_impl::BuildError), | |||
#[error("failed opening partition store: {0}")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copy-pasta from the RocksDb
error above, fixed :-)
crates/worker/Cargo.toml
Outdated
@@ -63,13 +67,16 @@ serde = { workspace = true } | |||
serde_json = { workspace = true } | |||
serde_with = { workspace = true } | |||
strum = { workspace = true } | |||
tar = "0.4.43" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will be replaced in the next revision anyway :-)
.into_string() | ||
.map(|path| format!("file://{path}")) | ||
}) | ||
.map_err(|e| anyhow!("Unable to convert path to string: {:?}", e))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That approach doesn't work here because OsString::into_string()
returns Result<String, OsString>
, which doesn't meet Anyhow's trait bounds :-)
.map(|s| Ok(s.clone())) | ||
.unwrap_or_else(|| { | ||
base_dir | ||
.join("pp-snapshots") | ||
.into_os_string() | ||
.into_string() | ||
.map(|path| format!("file://{path}")) | ||
}) | ||
.map_err(|e| anyhow!("Unable to convert path to string: {:?}", e))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it was a bit crude - definitely reads better as if let
, thank you!
), | ||
}; | ||
|
||
let staging_path = self.staging_path.clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll moved away from NamedTempFile
and cleaned this up in the latest iteration :-)
let staging_path = self.staging_path.clone(); | ||
tokio::fs::create_dir_all(&staging_path).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call, will move!
let mut tarball = tar::Builder::new(NamedTempFile::new_in(&staging_path)?); | ||
debug!( | ||
"Creating snapshot tarball of {:?} in: {:?}...", | ||
&staging_path, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've renamed staging_path
to local_snapshot_path
for clarity - that's the raw RocksDB column family export directory with the SSTs plus our own metadata JSON blob. We then tar that directory up into an archive at the path snapshot_archive_path
.
let packaging_task = task_center().spawn_blocking_unmanaged( | ||
"package-snapshot", | ||
Some(partition_id), | ||
async move { | ||
trace_span!("package-snapshot", %snapshot_id).in_scope(|| { | ||
let mut tarball = tar::Builder::new(NamedTempFile::new_in(&staging_path)?); | ||
debug!( | ||
"Creating snapshot tarball of {:?} in: {:?}...", | ||
&staging_path, | ||
tarball.get_ref() | ||
); | ||
tarball.append_dir_all(".", snapshot_path)?; | ||
tarball.finish()?; | ||
tarball.into_inner() | ||
}) | ||
}, | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was a suggestion from both Jack and Ahmed that I went along with, believing I can stream the archive directly to S3. The upside is:
- simpler bucket layout with a single file (though that'll be short-lived as we move towards incremental snapshots)
- simpler to split up into chunks for multi-part upload
The major downside is that we have 2x the on-node storage footprint, AND we do a bunch of I/O to boot. I don't think it's pulling its weight, but I only just realized yesterday that I couldn't make streaming the tar work in memory (blocked on tokio-rs/tokio#6914 which doesn't seem to be going anywhere).
I'll be removing the tar archiving in a follow-up iteration.
// todo(pavel): don't buffer the entire snapshot in memory! | ||
let payload = PutPayload::from(tokio::fs::read(tarball.path()).await?); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented in the latest revision! 🎉
768bddf
to
56e659f
Compare
56e659f
to
cee99e6
Compare
76f4843
to
38268d6
Compare
Substantial changes since initial revision
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for creating this PR @pcholakov. The changes look really nice. I left a few minor comments. The one question I had was whether concurrent modifications of a snapshot metadata.json
or the latest.json
can be a problem (e.g. if an old and new leader upload a snapshot at the same time)?
SnapshotExportError(PartitionId, #[source] anyhow::Error), | ||
#[error("Snapshot failed for partition {0}: {1}")] | ||
SnapshotMetadataHeaderError(PartitionId, #[source] io::Error), | ||
#[error("Internal error creating snapshot for partition {0}: {1}")] | ||
#[error("Snapshot IO error: {1}")] | ||
SnapshotIoError(PartitionId, #[source] io::Error), | ||
#[error("Snapshot repository IO error: {1}")] | ||
RepositoryIoError(PartitionId, #[source] anyhow::Error), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You probably don't have to add the "Error" suffix. It's a variant of SnapshotError
so it should be clear that it is an error.
/// Restate cluster name which produced the snapshot. | ||
pub lsn: Lsn, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment seems to be a bit off.
let relative_snapshot_path = format!("lsn_{lsn}", lsn = snapshot.min_applied_lsn); | ||
let snapshot_prefix = format!( | ||
"{prefix}{partition_id}/{relative_snapshot_path}", | ||
prefix = self.prefix, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't know that this is possible. Interesting.
debug!( | ||
%lsn, | ||
"Publishing partition snapshot to: {}", | ||
self.destination, | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can instrument put
via #[instrument()]
and include the lsn
, snapshot id, etc.
let put_result = self | ||
.object_store | ||
.put(&metadata_key, metadata_json_payload) | ||
.await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a possibility for two processes taking a snapshot for the same lsn (e.g. an old leader and a new one) which aren't exactly the same because the effective lsn is different? If this is possible, is this a problem?
let put_result = self | ||
.object_store | ||
.put(&latest_path, latest_json_payload) | ||
.await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question here but for different lsns. How are we gonna us e the latest.json
? I could imagine how a slow old leader completes a snapshot after a new snapshot has been completed.
for file in &snapshot.files { | ||
let filename = file.name.trim_start_matches("/"); | ||
let key = object_store::path::Path::from(format!( | ||
"{}/{}", | ||
snapshot_prefix.as_str(), | ||
filename | ||
)); | ||
let put_result = put_snapshot_object( | ||
local_snapshot_path.join(filename).as_path(), | ||
&key, | ||
&self.object_store, | ||
) | ||
.await?; | ||
debug!( | ||
etag = put_result.e_tag.unwrap_or_default(), | ||
?key, | ||
"Put snapshot data file completed", | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uploading multiple files concurrently, will probably only cause higher and less predictable resource utilization. And we aren't in a rush, I guess.
} else { | ||
let mut upload = object_store.put_multipart(key).await?; | ||
loop { | ||
let mut buf = vec![0; MULTIPART_UPLOAD_THRESHOLD_BYTES]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we reuse this buffer across iterations and ideally also across different file uploads?
loop { | ||
let mut buf = vec![0; MULTIPART_UPLOAD_THRESHOLD_BYTES]; | ||
let n = snapshot.read(&mut buf).await?; | ||
if n == 0 { | ||
break; | ||
} | ||
let part = PutPayload::from(buf); | ||
upload | ||
.put_part(part) | ||
.await | ||
.context("Failed to put snapshot part in repository")?; | ||
trace!("Uploaded chunk of {} bytes", n); | ||
} | ||
upload | ||
.complete() | ||
.await | ||
.context("Failed to put snapshot in repository") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we call upload.abort
in case an error occurs?
/// - `[<prefix>/]<partition_id>/YYYY-MM-DD/{lsn}/metadata.json` - snapshot descriptor | ||
/// - `[<prefix>/]<partition_id>/YYYY-MM-DD/{lsn}/*.sst` - data files (explicitly named in `metadata.json`) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the YYYY-MM-DD
still up to date? I couldn't find it in the path when we upload snapshots.
This change introduces a SnapshotRepository responsible for uploading snapshots to a remote object store.
Sample usage
Configuration:
Currently only
s3://
andfile://
URLs are supported and work just as expected.Snapshot creation:
Future work:
Closes: #2197