Skip to content

Commit 022b9a6

Browse files
committed
Move metadata serialization to repository
1 parent 962c89b commit 022b9a6

File tree

2 files changed

+15
-26
lines changed

2 files changed

+15
-26
lines changed

crates/worker/src/partition/snapshots/repository.rs

+8-5
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,16 @@ use aws_config::BehaviorVersion;
1818
use aws_credential_types::provider::ProvideCredentials;
1919
use object_store::aws::AmazonS3Builder;
2020
use object_store::{MultipartUpload, ObjectStore, PutPayload};
21+
use serde::{Deserialize, Serialize};
22+
use serde_with::serde_as;
2123
use tokio::io::AsyncReadExt;
2224
use tracing::{debug, trace};
2325
use url::Url;
2426

25-
use restate_partition_store::snapshots::PartitionSnapshotMetadata;
27+
use restate_partition_store::snapshots::{PartitionSnapshotMetadata, SnapshotFormatVersion};
2628
use restate_types::config::SnapshotsOptions;
29+
use restate_types::identifiers::{PartitionId, SnapshotId};
30+
use restate_types::logs::Lsn;
2731

2832
/// Provides read and write access to the long-term partition snapshot storage destination.
2933
///
@@ -155,12 +159,11 @@ impl SnapshotRepository {
155159
);
156160
}
157161

158-
// todo(pavel): don't write `metadata.json` to disk, serialize it from the struct directly.
159-
// this gives us a chance to include data file checksums into it removes some IO
160-
let metadata_json_path = local_snapshot_path.join("metadata.json");
161162
let metadata_key =
162163
object_store::path::Path::from(format!("{}/metadata.json", snapshot_prefix.as_str()));
163-
let metadata_json_payload = PutPayload::from(tokio::fs::read(metadata_json_path).await?);
164+
let metadata_json_payload = PutPayload::from(
165+
serde_json::to_string_pretty(snapshot).expect("Can always serialize JSON"),
166+
);
164167
let put_result = self
165168
.object_store
166169
.put(&metadata_key, metadata_json_payload)

crates/worker/src/partition/snapshots/snapshot_task.rs

+7-21
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ impl SnapshotPartitionTask {
6262
)
6363
.await?;
6464

65-
let metadata = self.write_snapshot_metadata_header(&snapshot).await?;
65+
let metadata = self.metadata(&snapshot, SystemTime::now());
6666

6767
self.snapshot_repository
6868
.put(&metadata, snapshot.base_dir)
@@ -72,36 +72,22 @@ impl SnapshotPartitionTask {
7272
Ok(metadata)
7373
}
7474

75-
async fn write_snapshot_metadata_header(
75+
fn metadata(
7676
&self,
7777
snapshot: &LocalPartitionSnapshot,
78-
) -> Result<PartitionSnapshotMetadata, SnapshotError> {
79-
let snapshot_meta = PartitionSnapshotMetadata {
78+
created_at: SystemTime,
79+
) -> PartitionSnapshotMetadata {
80+
PartitionSnapshotMetadata {
8081
version: SnapshotFormatVersion::V1,
8182
cluster_name: self.cluster_name.clone(),
8283
node_name: self.node_name.clone(),
8384
partition_id: self.partition_id,
84-
created_at: humantime::Timestamp::from(SystemTime::now()),
85+
created_at: humantime::Timestamp::from(created_at),
8586
snapshot_id: self.snapshot_id,
8687
key_range: snapshot.key_range.clone(),
8788
min_applied_lsn: snapshot.min_applied_lsn,
8889
db_comparator_name: snapshot.db_comparator_name.clone(),
8990
files: snapshot.files.clone(),
90-
};
91-
let metadata_json =
92-
serde_json::to_string_pretty(&snapshot_meta).expect("Can always serialize JSON");
93-
94-
let metadata_path = snapshot.base_dir.join("metadata.json");
95-
tokio::fs::write(metadata_path.clone(), metadata_json)
96-
.await
97-
.map_err(|e| SnapshotError::SnapshotIoError(self.partition_id, e))?;
98-
99-
debug!(
100-
lsn = %snapshot.min_applied_lsn,
101-
"Partition snapshot metadata written to {:?}",
102-
metadata_path
103-
);
104-
105-
Ok(snapshot_meta)
91+
}
10692
}
10793
}

0 commit comments

Comments
 (0)