Skip to content

Commit

Permalink
Download objects to tempfiles and then move
Browse files Browse the repository at this point in the history
  • Loading branch information
jwodder committed Nov 25, 2024
1 parent 18bab40 commit 4d05491
Showing 1 changed file with 44 additions and 18 deletions.
62 changes: 44 additions & 18 deletions src/syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use fs_err::PathExt;
use futures_util::StreamExt;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::fs::File;
use std::num::NonZeroUsize;
use std::path::{Path, PathBuf};
use std::sync::Arc;
Expand Down Expand Up @@ -193,7 +192,8 @@ impl Syncer {
&latest_path,
&parentdir.join(current_md.old_filename(filename)),
)?;
self.download_item(&item, latest_path, token).await?;
self.download_item(&item, &parentdir, latest_path, token)
.await?;
drop(guard);
mdmanager.set(md).await?;
}
Expand All @@ -212,7 +212,8 @@ impl Syncer {
// TODO: Add cancellation & cleanup logic around the rest
// of this block:
let guard = self.lock_path(latest_path.clone());
self.download_item(&item, latest_path, token).await?;
self.download_item(&item, &parentdir, latest_path, token)
.await?;
drop(guard);
mdmanager.set(md).await?;
}
Expand All @@ -237,7 +238,8 @@ impl Syncer {
// No need for locking here, as this is an "old" path that
// doesn't exist, so no other tasks should be working on
// it.
self.download_item(&item, oldpath, token).await?;
self.download_item(&item, &parentdir, oldpath, token)
.await?;
}
}
}
Expand All @@ -252,42 +254,66 @@ impl Syncer {
async fn download_item(
&self,
item: &InventoryItem,
parentdir: &Path,
path: PathBuf,
token: CancellationToken,
) -> anyhow::Result<()> {
// TODO: Download to a temp file and then move
tracing::trace!("Opening output file");
let outfile = File::create(&path)
.with_context(|| format!("failed to open output file {}", path.display()))?;
tracing::trace!("Opening temporary output file");
let outfile = tempfile::Builder::new()
.prefix(".s3invsync.download.")
.tempfile_in(parentdir)
.with_context(|| {
format!("failed to create temporary output file for {}", item.url())
})?;
match token
.run_until_cancelled(self.client.download_object(
&item.url(),
item.details.md5_digest(),
&outfile,
outfile.as_file(),
))
.await
{
Some(Ok(())) => outfile
.set_modified(item.last_modified_date.into())
.with_context(|| format!("failed to set mtime on {}", path.display())),
Some(Ok(())) => {
tracing::trace!(dest = %path.display(), "Moving temporary output file to destination");
let fp = outfile.persist(&path).with_context(|| {
format!(
"failed to persist temporary output file to {}",
path.display()
)
})?;
fp.set_modified(item.last_modified_date.into())
.with_context(|| format!("failed to set mtime on {}", path.display()))?;
Ok(())
}
Some(Err(e)) => {
tracing::error!(error = ?e, "Failed to download object");
if let Err(e2) = self.cleanup_download_path(&path) {
tracing::warn!(error = ?e2, "Failed to clean up download file");
if let Err(e2) = self.cleanup_download_path(item, outfile, &path) {
tracing::warn!(error = ?e2, "Failed to clean up download path");
}
Err(e.into())
}
None => {
tracing::debug!("Download cancelled");
self.cleanup_download_path(&path).map_err(Into::into)
self.cleanup_download_path(item, outfile, &path)
.map_err(Into::into)
}
}
}

fn cleanup_download_path(&self, dlfile: &Path) -> std::io::Result<()> {
fn cleanup_download_path(
&self,
item: &InventoryItem,
outfile: tempfile::NamedTempFile,
dlfile: &Path,
) -> anyhow::Result<()> {
// TODO: Synchronize calls to this method?
tracing::trace!(path = %dlfile.display(), "Cleaning up unfinished download file");
fs_err::remove_file(dlfile)?;
outfile.close().with_context(|| {
format!(
"failed to remove temporary download file for {}",
item.url()
)
})?;
let p = dlfile.parent();
while let Some(pp) = p {
if pp == self.outdir {
Expand All @@ -297,7 +323,7 @@ impl Syncer {
match fs_err::remove_dir(pp) {
Ok(()) => (),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
Err(e) => return Err(e),
Err(e) => return Err(e.into()),
}
}
}
Expand Down

0 comments on commit 4d05491

Please sign in to comment.