Skip to content

Commit

Permalink
Maintain a latest.json pointer object
Browse files Browse the repository at this point in the history
  • Loading branch information
pcholakov committed Nov 21, 2024
1 parent 022b9a6 commit cee99e6
Showing 1 changed file with 54 additions and 4 deletions.
58 changes: 54 additions & 4 deletions crates/worker/src/partition/snapshots/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,35 @@ pub struct SnapshotRepository {
prefix: String,
}

#[serde_as]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct LatestSnapshot {
pub version: SnapshotFormatVersion,

/// Restate cluster name which produced the snapshot.
pub lsn: Lsn,

/// Restate partition id.
pub partition_id: PartitionId,

/// Node that produced this snapshot.
pub node_name: String,

/// Local node time when the snapshot was created.
#[serde(with = "serde_with::As::<serde_with::DisplayFromStr>")]
pub created_at: humantime::Timestamp,

/// Snapshot id.
pub snapshot_id: SnapshotId,

/// The minimum LSN guaranteed to be applied in this snapshot. The actual
/// LSN may be >= [minimum_lsn].
pub min_applied_lsn: Lsn,

/// The relative path within the snapshot repository where the snapshot data is stored.
pub path: String,
}

impl SnapshotRepository {
pub async fn create(
base_dir: PathBuf,
Expand Down Expand Up @@ -127,10 +156,10 @@ impl SnapshotRepository {
self.destination,
);

let relative_snapshot_path = format!("lsn_{lsn}", lsn = snapshot.min_applied_lsn);
let snapshot_prefix = format!(
"{prefix}{partition_id}/lsn_{lsn}",
"{prefix}{partition_id}/{relative_snapshot_path}",
prefix = self.prefix,
lsn = snapshot.min_applied_lsn,
);

debug!(
Expand All @@ -151,7 +180,7 @@ impl SnapshotRepository {
&key,

Check warning on line 180 in crates/worker/src/partition/snapshots/repository.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

Diff in /home/runner/work/restate/restate/crates/worker/src/partition/snapshots/repository.rs
&self.object_store,
)
.await?;
.await?;
debug!(
etag = put_result.e_tag.unwrap_or_default(),
?key,
Expand All @@ -174,7 +203,28 @@ impl SnapshotRepository {
"Successfully published snapshot metadata",
);

// todo(pavel): (re)write latest.json pointer object
let latest = LatestSnapshot {
version: snapshot.version,
lsn: snapshot.min_applied_lsn,
partition_id,
node_name: snapshot.node_name.clone(),
created_at: snapshot.created_at.clone(),
snapshot_id: snapshot.snapshot_id,
min_applied_lsn: snapshot.min_applied_lsn,
path: relative_snapshot_path,
};
let latest_path = object_store::path::Path::from(format!(
"{prefix}{partition_id}/latest.json",
prefix = self.prefix,
partition_id = partition_id,

Check warning on line 219 in crates/worker/src/partition/snapshots/repository.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

Diff in /home/runner/work/restate/restate/crates/worker/src/partition/snapshots/repository.rs
));
let latest_json_payload = PutPayload::from(serde_json::to_string_pretty(&latest)?);
let put_result = self.object_store.put(&latest_path, latest_json_payload).await?;
debug!(
etag = put_result.e_tag.unwrap_or_default(),
key = ?latest_path,
"Successfully updated latest snapshot pointer",
);

tokio::fs::remove_dir_all(local_snapshot_path.as_path()).await?;
trace!(
Expand Down

0 comments on commit cee99e6

Please sign in to comment.