Skip to content

Commit

Permalink
Merge pull request #42 from dandi/adj-locks
Browse files Browse the repository at this point in the history
Fix/improve locking when processing items
  • Loading branch information
jwodder authored Nov 25, 2024
2 parents e39ae10 + 9e8b497 commit eaede98
Showing 1 changed file with 4 additions and 9 deletions.
13 changes: 4 additions & 9 deletions src/syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl Syncer {
if let Some(item) = r {
let this = self.clone();
object_dl_pool
.spawn(move |token| async move { this.process_item(item, token).await });
.spawn(move |token| async move { Box::pin(this.process_item(item, token)).await });
} else {
all_objects_txed = true;
}
Expand Down Expand Up @@ -179,6 +179,7 @@ impl Syncer {
if item.is_latest {
tracing::debug!("Object is latest version of key");
let latest_path = parentdir.join(filename);
let _guard = self.lock_path(latest_path.clone());
if latest_path.fs_err_try_exists()? {
let current_md = mdmanager.get().await?;
if md == current_md {
Expand All @@ -187,14 +188,12 @@ impl Syncer {
tracing::debug!(path = %latest_path.display(), "Backup path already exists but metadata does not match; renaming current file and downloading correct version");
// TODO: Add cancellation & cleanup logic around the rest
// of this block:
let guard = self.lock_path(latest_path.clone());
self.move_object_file(
&latest_path,
&parentdir.join(current_md.old_filename(filename)),
)?;
self.download_item(&item, &parentdir, latest_path, token)
.await?;
drop(guard);
mdmanager.set(md).await?;
}
} else {
Expand All @@ -203,18 +202,14 @@ impl Syncer {
tracing::debug!(path = %latest_path.display(), oldpath = %oldpath.display(), "Backup path does not exist but \"old\" path does; will rename");
// TODO: Add cancellation & cleanup logic around the rest
// of this block:
let guard = self.lock_path(latest_path.clone());
self.move_object_file(&oldpath, &latest_path)?;
drop(guard);
mdmanager.set(md).await?;
} else {
tracing::debug!(path = %latest_path.display(), "Backup path does not exist; will download");
// TODO: Add cancellation & cleanup logic around the rest
// of this block:
let guard = self.lock_path(latest_path.clone());
self.download_item(&item, &parentdir, latest_path, token)
.await?;
drop(guard);
mdmanager.set(md).await?;
}
}
Expand All @@ -225,19 +220,19 @@ impl Syncer {
tracing::debug!(path = %oldpath.display(), "Backup path already exists; doing nothing");
} else {
let latest_path = parentdir.join(filename);
let guard = self.lock_path(latest_path.clone());
if latest_path.fs_err_try_exists()? && md == mdmanager.get().await? {
tracing::debug!(path = %oldpath.display(), "Backup path does not exist, but \"latest\" file has matching metadata; renaming \"latest\" file");
// TODO: Add cancellation & cleanup logic around the rest
// of this block:
let guard = self.lock_path(latest_path.clone());
self.move_object_file(&latest_path, &oldpath)?;
drop(guard);
mdmanager.delete().await?;
} else {
tracing::debug!(path = %oldpath.display(), "Backup path does not exist; will download");
// No need for locking here, as this is an "old" path that
// doesn't exist, so no other tasks should be working on
// it.
drop(guard);
self.download_item(&item, &parentdir, oldpath, token)
.await?;
}
Expand Down

0 comments on commit eaede98

Please sign in to comment.