Skip to content

Commit

Permalink
Merge pull request #1412 from dandi/gh-1410
Browse files Browse the repository at this point in the history
Report progress in deleting Zarr entries during upload
  • Loading branch information
jwodder authored Feb 29, 2024
2 parents aa55b89 + 5970a3b commit a943c08
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 12 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ repos:
- id: codespell
exclude: ^(dandi/_version\.py|dandi/due\.py|versioneer\.py)$
- repo: https://github.com/PyCQA/flake8
rev: 4.0.1
rev: 7.0.0
hooks:
- id: flake8
exclude: ^(dandi/_version\.py|dandi/due\.py|versioneer\.py)$
2 changes: 1 addition & 1 deletion dandi/files/bases.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ def iter_upload(
bytes_uploaded += out_part["size"]
yield {
"status": "uploading",
"upload": 100 * bytes_uploaded / total_size,
"progress": 100 * bytes_uploaded / total_size,
"current": bytes_uploaded,
}
parts_out.append(out_part)
Expand Down
39 changes: 34 additions & 5 deletions dandi/files/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from dandi import get_logger
from dandi.consts import (
MAX_ZARR_DEPTH,
ZARR_DELETE_BATCH_SIZE,
ZARR_MIME_TYPE,
ZARR_UPLOAD_BATCH_SIZE,
EmbargoStatus,
Expand Down Expand Up @@ -442,7 +443,11 @@ def mkzarr() -> str:
local_digest,
)
if to_delete:
a.rmfiles(to_delete, reingest=False)
yield from _rmfiles(
asset=a,
entries=to_delete,
status="deleting conflicting remote files",
)
else:
yield {"status": "traversing local Zarr"}
for local_entry in self.iterfiles():
Expand Down Expand Up @@ -497,7 +502,7 @@ def mkzarr() -> str:
bytes_uploaded += size
yield {
"status": "uploading",
"upload": 100
"progress": 100
* bytes_uploaded
/ to_upload.total_size,
"current": bytes_uploaded,
Expand All @@ -506,13 +511,16 @@ def mkzarr() -> str:
lgr.debug("%s: All files uploaded", asset_path)
old_zarr_files = list(old_zarr_entries.values())
if old_zarr_files:
yield {"status": "deleting extra remote files"}
lgr.debug(
"%s: Deleting %s in remote Zarr not present locally",
asset_path,
pluralize(len(old_zarr_files), "file"),
)
a.rmfiles(old_zarr_files, reingest=False)
yield from _rmfiles(
asset=a,
entries=old_zarr_files,
status="deleting extra remote files",
)
changed = True
if changed:
lgr.debug(
Expand All @@ -533,9 +541,9 @@ def mkzarr() -> str:
lgr.info(
"%s: Asset checksum mismatch (local: %s;"
" server: %s); redoing upload",
asset_path,
our_checksum,
server_checksum,
asset_path,
)
yield {"status": "Checksum mismatch"}
break
Expand Down Expand Up @@ -677,3 +685,24 @@ def _cmp_digests(
else:
lgr.debug("%s: File %s already on server; skipping", asset_path, local_entry)
return (local_entry, local_digest, False)


def _rmfiles(
asset: RemoteZarrAsset, entries: list[RemoteZarrEntry], status: str
) -> Iterator[dict]:
# Do the batching outside of the rmfiles() method so that we can report
# progress on the completion of each batch
yield {
"status": status,
"progress": 0,
"current": 0,
}
deleted = 0
for ents in chunked(entries, ZARR_DELETE_BATCH_SIZE):
asset.rmfiles(ents, reingest=False)
deleted += len(ents)
yield {
"status": status,
"progress": deleted / len(entries) * 100,
"current": deleted,
}
2 changes: 1 addition & 1 deletion dandi/support/pyout.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def get_style(hide_if_missing=True):
),
aggregate=counts,
),
"upload": progress_style,
"progress": progress_style,
"done%": progress_style,
"checksum": dict(
align="center",
Expand Down
8 changes: 4 additions & 4 deletions dandi/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,13 @@ def check_len(obj: io.IOBase, name: Any) -> None:
raise RuntimeError(
f"requests.utils.super_len() reported size of 0 for"
f" {name!r}, but os.stat() reported size"
f" {stat_size} bytes {i+1} tries later"
f" {stat_size} bytes {i + 1} tries later"
)
if fstat_size not in (None, 0):
raise RuntimeError(
f"requests.utils.super_len() reported size of 0 for"
f" {name!r}, but os.fstat() reported size"
f" {fstat_size} bytes {i+1} tries later"
f" {fstat_size} bytes {i + 1} tries later"
)
lgr.debug(
"- Size of %r still appears to be 0 after 10 rounds of"
Expand Down Expand Up @@ -403,9 +403,9 @@ def upload_agg(*ignored: Any) -> str:
return "%s/s" % naturalsize(speed)

pyout_style = pyouts.get_style(hide_if_missing=False)
pyout_style["upload"]["aggregate"] = upload_agg
pyout_style["progress"]["aggregate"] = upload_agg

rec_fields = ["path", "size", "errors", "upload", "status", "message"]
rec_fields = ["path", "size", "errors", "progress", "status", "message"]
out = pyouts.LogSafeTabular(
style=pyout_style, columns=rec_fields, max_workers=jobs or 5
)
Expand Down

0 comments on commit a943c08

Please sign in to comment.