Skip to content

Commit

Permalink
Add locking around object file operations
Browse files Browse the repository at this point in the history
  • Loading branch information
jwodder committed Nov 25, 2024
1 parent eeebed7 commit ba04002
Showing 1 changed file with 30 additions and 56 deletions.
86 changes: 30 additions & 56 deletions src/syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,87 +177,69 @@ impl Syncer {
fs_err::create_dir_all(&parentdir)?;
let mdmanager = MetadataManager::new(self, &parentdir, filename);

let actions = if item.is_latest {
if item.is_latest {
tracing::debug!("Object is latest version of key");
let latest_path = parentdir.join(filename);
if latest_path.fs_err_try_exists()? {
let current_md = mdmanager.get().await?;
if md == current_md {
tracing::debug!(path = %latest_path.display(), "Backup path already exists and metadata matches; doing nothing");
Vec::new()
} else {
tracing::debug!(path = %latest_path.display(), "Backup path already exists but metadata does not match; renaming current file and downloading correct version");
vec![
ObjectAction::Move {
src: latest_path.clone(),
dest: parentdir.join(current_md.old_filename(filename)),
},
ObjectAction::Download { path: latest_path },
ObjectAction::SaveMetadata,
]
let guard = self.lock_path(latest_path.clone());
self.move_object_file(
&latest_path,
&parentdir.join(current_md.old_filename(filename)),
)?;
self.download_item(&item, latest_path, token).await?;
drop(guard);
mdmanager.set(md).await?;
}
} else {
let oldpath = parentdir.join(md.old_filename(filename));
if oldpath.fs_err_try_exists()? {
tracing::debug!(path = %latest_path.display(), oldpath = %oldpath.display(), "Backup path does not exist but \"old\" path does; will rename");
vec![
ObjectAction::Move {
src: oldpath,
dest: latest_path,
},
ObjectAction::SaveMetadata,
]
let guard = self.lock_path(latest_path.clone());
self.move_object_file(&oldpath, &latest_path)?;
drop(guard);
mdmanager.set(md).await?;
} else {
tracing::debug!(path = %latest_path.display(), "Backup path does not exist; will download");
vec![
ObjectAction::Download { path: latest_path },
ObjectAction::SaveMetadata,
]
let guard = self.lock_path(latest_path.clone());
self.download_item(&item, latest_path, token).await?;
drop(guard);
mdmanager.set(md).await?;
}
}
} else {
tracing::debug!("Object is old version of key");
let oldpath = parentdir.join(md.old_filename(filename));
if oldpath.fs_err_try_exists()? {
tracing::debug!(path = %oldpath.display(), "Backup path already exists; doing nothing");
Vec::new()
} else {
let latest_path = parentdir.join(filename);
if latest_path.fs_err_try_exists()? && md == mdmanager.get().await? {
tracing::debug!(path = %oldpath.display(), "Backup path does not exist, but \"latest\" file has matching metadata; renaming \"latest\" file");
vec![
ObjectAction::Move {
src: latest_path,
dest: oldpath,
},
ObjectAction::DeleteMetadata,
]
let guard = self.lock_path(latest_path.clone());
self.move_object_file(&latest_path, &oldpath)?;
drop(guard);
mdmanager.delete().await?;
} else {
tracing::debug!(path = %oldpath.display(), "Backup path does not exist; will download");
vec![ObjectAction::Download { path: oldpath }]
// No need for locking here, as this is an "old" path that
// doesn't exist, so no other tasks should be working on
// it.
self.download_item(&item, oldpath, token).await?;
}
}
};

for act in actions {
match act {
ObjectAction::Move { src, dest } => {
tracing::debug!(src = %src.display(), dest = %dest.display(), "Moving object file");
fs_err::rename(src, dest)?;
}
ObjectAction::Download { path } => {
self.download_item(&item, path, token.clone()).await?;
}
// TODO: Handle cancellation/cleanup around metadata
// management:
ObjectAction::SaveMetadata => mdmanager.set(md.clone()).await?,
ObjectAction::DeleteMetadata => mdmanager.delete().await?,
}
}
Ok(())
// TODO: Handle cancellation/cleanup around metadata management
}

// TODO: Manage object metadata and old versions
// TODO: Handle concurrent downloads of the same key
fn move_object_file(&self, src: &Path, dest: &Path) -> std::io::Result<()> {
tracing::debug!(src = %src.display(), dest = %dest.display(), "Moving object file");
fs_err::rename(src, dest)
}

async fn download_item(
Expand Down Expand Up @@ -315,14 +297,6 @@ impl Syncer {
}
}

#[derive(Clone, Debug, Eq, PartialEq)]
enum ObjectAction {
Move { src: PathBuf, dest: PathBuf },
Download { path: PathBuf },
SaveMetadata,
DeleteMetadata,
}

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
struct Metadata {
version_id: String,
Expand Down

0 comments on commit ba04002

Please sign in to comment.