Skip to content

Commit

Permalink
Merge pull request #1443 from dandi/bf-rf-zarr-download
Browse files Browse the repository at this point in the history
OPT+RF of zarr downloads: do not wait for full files listing + compute %done from total zarr size
  • Loading branch information
yarikoptic authored May 30, 2024
2 parents 3f0318c + 08a4050 commit 59f5da0
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 71 deletions.
83 changes: 53 additions & 30 deletions dandi/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,8 @@ def agg_done(self, done_sizes: Iterator[int]) -> list[str]:
return v


def _skip_file(msg: Any) -> dict:
return {"status": "skipped", "message": str(msg)}
def _skip_file(msg: Any, **kwargs: Any) -> dict:
return {"status": "skipped", "message": str(msg), **kwargs}


def _populate_dandiset_yaml(
Expand Down Expand Up @@ -514,7 +514,7 @@ def _populate_dandiset_yaml(
existing is DownloadExisting.REFRESH
and os.lstat(dandiset_yaml).st_mtime >= mtime.timestamp()
):
yield _skip_file("already exists")
yield _skip_file("already exists", size=os.lstat(dandiset_yaml).st_mtime)
return
ds = Dandiset(dandiset_path, allow_empty=True)
ds.path_obj.mkdir(exist_ok=True) # exist_ok in case of parallel race
Expand Down Expand Up @@ -637,7 +637,7 @@ def _download_file(
# but mtime is different
if same == ["mtime", "size"]:
# TODO: add recording and handling of .nwb object_id
yield _skip_file("same time and size")
yield _skip_file("same time and size", size=size)
return
lgr.debug(f"{path!r} - same attributes: {same}. Redownloading")

Expand Down Expand Up @@ -878,33 +878,40 @@ def _download_zarr(
# Avoid heavy import by importing within function:
from .support.digests import get_zarr_checksum

download_gens = {}
entries = list(asset.iterfiles())
# we will collect them all while starting the download
# with the first page of entries received from the server.
entries = []
digests = {}
pc = ProgressCombiner(zarr_size=asset.size)

def digest_callback(path: str, algoname: str, d: str) -> None:
if algoname == "md5":
digests[path] = d

for entry in entries:
etag = entry.digest
assert etag.algorithm is DigestType.md5
download_gens[str(entry)] = _download_file(
entry.get_download_file_iter(),
download_path / str(entry),
toplevel_path=toplevel_path,
size=entry.size,
mtime=entry.modified,
existing=existing,
digests={"md5": etag.value},
lock=lock,
digest_callback=partial(digest_callback, str(entry)),
)
def downloads_gen():
for entry in asset.iterfiles():
entries.append(entry)
etag = entry.digest
assert etag.algorithm is DigestType.md5
yield pairing(
str(entry),
_download_file(
entry.get_download_file_iter(),
download_path / str(entry),
toplevel_path=toplevel_path,
size=entry.size,
mtime=entry.modified,
existing=existing,
digests={"md5": etag.value},
lock=lock,
digest_callback=partial(digest_callback, str(entry)),
),
)
pc.file_qty = len(entries)

pc = ProgressCombiner(zarr_size=asset.size, file_qty=len(download_gens))
final_out: dict | None = None
with interleave(
[pairing(p, gen) for p, gen in download_gens.items()],
downloads_gen(),
onerror=FINISH_CURRENT,
max_workers=jobs or 4,
) as it:
Expand Down Expand Up @@ -988,7 +995,7 @@ class DownloadProgress:
@dataclass
class ProgressCombiner:
zarr_size: int
file_qty: int
file_qty: int | None = None # set to specific known value whenever full sweep is complete
files: dict[str, DownloadProgress] = field(default_factory=dict)
#: Total size of all files that were not skipped and did not error out
#: during download
Expand Down Expand Up @@ -1021,18 +1028,25 @@ def get_done(self) -> dict:
total_downloaded = sum(
s.downloaded
for s in self.files.values()
if s.state in (DLState.DOWNLOADING, DLState.CHECKSUM_ERROR, DLState.DONE)
if s.state
in (
DLState.DOWNLOADING,
DLState.CHECKSUM_ERROR,
DLState.SKIPPED,
DLState.DONE,
)
)
return {
"done": total_downloaded,
"done%": total_downloaded / self.maxsize * 100,
"done%": total_downloaded / self.zarr_size * 100 if self.zarr_size else 0,
}

def set_status(self, statusdict: dict) -> None:
state_qtys = Counter(s.state for s in self.files.values())
total = len(self.files)
if (
total == self.file_qty
self.file_qty is not None # if already known
and total == self.file_qty
and state_qtys[DLState.STARTING] == state_qtys[DLState.DOWNLOADING] == 0
):
# All files have finished
Expand All @@ -1053,16 +1067,25 @@ def set_status(self, statusdict: dict) -> None:
def feed(self, path: str, status: dict) -> Iterator[dict]:
keys = list(status.keys())
self.files.setdefault(path, DownloadProgress())
size = status.get("size")
if size is not None:
if not self.yielded_size:
# this thread will yield
self.yielded_size = True
yield {"size": self.zarr_size}
if status.get("status") == "skipped":
self.files[path].state = DLState.SKIPPED
out = {"message": self.message}
# Treat skipped as "downloaded" for the matter of accounting
if size is not None:
self.files[path].downloaded = size
self.maxsize += size
self.set_status(out)
yield out
if self.zarr_size:
yield self.get_done()
elif keys == ["size"]:
if not self.yielded_size:
yield {"size": self.zarr_size}
self.yielded_size = True
self.files[path].size = status["size"]
self.files[path].size = size
self.maxsize += status["size"]
if any(s.state is DLState.DOWNLOADING for s in self.files.values()):
yield self.get_done()
Expand Down
99 changes: 58 additions & 41 deletions dandi/tests/test_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,9 +485,10 @@ def test_download_zarr_subdir_has_only_subdirs(


@pytest.mark.parametrize(
"file_qty,inputs,expected",
"zarr_size,file_qty,inputs,expected",
[
(
( # 0
42,
1,
[
("lonely.txt", {"size": 42}),
Expand All @@ -501,7 +502,7 @@ def test_download_zarr_subdir_has_only_subdirs(
("lonely.txt", {"status": "done"}),
],
[
{"size": 69105},
{"size": 42},
{"status": "downloading"},
{"done": 0, "done%": 0.0},
{"done": 20, "done%": 20 / 42 * 100},
Expand All @@ -510,7 +511,8 @@ def test_download_zarr_subdir_has_only_subdirs(
{"status": "done", "message": "1 done"},
],
),
(
( # 1
169,
2,
[
("apple.txt", {"size": 42}),
Expand All @@ -534,7 +536,7 @@ def test_download_zarr_subdir_has_only_subdirs(
("banana.txt", {"status": "done"}),
],
[
{"size": 69105},
{"size": 169},
{"status": "downloading"},
{"done": 0, "done%": 0.0},
{"done": 0, "done%": 0.0},
Expand All @@ -549,7 +551,8 @@ def test_download_zarr_subdir_has_only_subdirs(
{"status": "done", "message": "2 done"},
],
),
(
( # 2
169,
2,
[
("apple.txt", {"size": 42}),
Expand All @@ -573,10 +576,10 @@ def test_download_zarr_subdir_has_only_subdirs(
("banana.txt", {"status": "done"}),
],
[
{"size": 69105},
{"size": 169},
{"status": "downloading"},
{"done": 0, "done%": 0.0},
{"done": 20, "done%": 20 / 42 * 100},
{"done": 20, "done%": 20 / 169 * 100},
{"done": 20, "done%": 20 / 169 * 100},
{"done": 40, "done%": 40 / 169 * 100},
{"done": 42, "done%": 42 / 169 * 100},
Expand All @@ -589,7 +592,8 @@ def test_download_zarr_subdir_has_only_subdirs(
{"status": "done", "message": "2 done"},
],
),
(
( # 3
169,
2,
[
("apple.txt", {"size": 42}),
Expand All @@ -613,12 +617,12 @@ def test_download_zarr_subdir_has_only_subdirs(
("banana.txt", {"status": "done"}),
],
[
{"size": 69105},
{"size": 169},
{"status": "downloading"},
{"done": 0, "done%": 0.0},
{"done": 20, "done%": 20 / 42 * 100},
{"done": 40, "done%": 40 / 42 * 100},
{"done": 42, "done%": 42 / 42 * 100},
{"done": 20, "done%": 20 / 169 * 100},
{"done": 40, "done%": 40 / 169 * 100},
{"done": 42, "done%": 42 / 169 * 100},
{"message": "1 done"},
{"done": 42, "done%": 42 / 169 * 100},
{"done": 82, "done%": 82 / 169 * 100},
Expand All @@ -628,7 +632,8 @@ def test_download_zarr_subdir_has_only_subdirs(
{"status": "done", "message": "2 done"},
],
),
(
( # 4
169,
2,
[
("apple.txt", {"size": 42}),
Expand All @@ -647,29 +652,34 @@ def test_download_zarr_subdir_has_only_subdirs(
("apple.txt", {"status": "done"}),
],
[
{"size": 69105},
{"size": 169},
{"status": "downloading"},
{"done": 0, "done%": 0.0},
{"done": 0, "done%": 0.0},
{"done": 20, "done%": 20 / 169 * 100},
{"done": 60, "done%": 60 / 169 * 100},
{"done": 80, "done%": 80 / 169 * 100},
{"message": "1 errored"},
{"done": 40, "done%": 40 / 42 * 100},
{"done": 42, "done%": 100.0},
{"done": 40, "done%": 40 / 169 * 100},
{"done": 42, "done%": 42 / 169 * 100},
{"status": "error", "message": "1 done, 1 errored"},
],
),
(
( # 5
0,
1,
[("lonely.txt", {"status": "skipped", "message": "already exists"})],
[{"status": "skipped", "message": "1 skipped"}],
),
(
( # 6
169,
2,
[
("apple.txt", {"size": 42}),
("banana.txt", {"status": "skipped", "message": "already exists"}),
(
"banana.txt",
{"size": 127, "status": "skipped", "message": "already exists"},
),
("apple.txt", {"status": "downloading"}),
("apple.txt", {"done": 0, "done%": 0.0}),
("apple.txt", {"done": 20, "done%": 20 / 42 * 100}),
Expand All @@ -680,17 +690,19 @@ def test_download_zarr_subdir_has_only_subdirs(
("apple.txt", {"status": "done"}),
],
[
{"size": 69105},
{"size": 169},
{"message": "1 skipped"},
{"done": 127, "done%": (127 + 0) / 169 * 100},
{"status": "downloading"},
{"done": 0, "done%": 0.0},
{"done": 20, "done%": 20 / 42 * 100},
{"done": 40, "done%": 40 / 42 * 100},
{"done": 42, "done%": 100.0},
{"done": 127 + 0, "done%": (127 + 0) / 169 * 100},
{"done": 127 + 20, "done%": (127 + 20) / 169 * 100},
{"done": 127 + 40, "done%": (127 + 40) / 169 * 100},
{"done": 127 + 42, "done%": 100.0},
{"status": "done", "message": "1 done, 1 skipped"},
],
),
(
( # 7
169,
2,
[
("apple.txt", {"size": 42}),
Expand Down Expand Up @@ -719,7 +731,7 @@ def test_download_zarr_subdir_has_only_subdirs(
("apple.txt", {"status": "done"}),
],
[
{"size": 69105},
{"size": 169},
{"status": "downloading"},
{"done": 0, "done%": 0.0},
{"done": 0, "done%": 0.0},
Expand All @@ -734,14 +746,18 @@ def test_download_zarr_subdir_has_only_subdirs(
{"status": "error", "message": "1 done, 1 errored"},
],
),
(
( # 8
179,
3,
[
("apple.txt", {"size": 42}),
("banana.txt", {"size": 127}),
("apple.txt", {"status": "downloading"}),
("banana.txt", {"status": "downloading"}),
("coconut", {"status": "skipped", "message": "already exists"}),
(
"coconut",
{"size": 10, "status": "skipped", "message": "already exists"},
),
("apple.txt", {"done": 0, "done%": 0.0}),
("banana.txt", {"done": 0, "done%": 0.0}),
("apple.txt", {"done": 20, "done%": 20 / 42 * 100}),
Expand All @@ -764,28 +780,29 @@ def test_download_zarr_subdir_has_only_subdirs(
("banana.txt", {"status": "done"}),
],
[
{"size": 69105},
{"size": 179},
{"status": "downloading"},
{"message": "1 skipped"},
{"done": 0, "done%": 0.0},
{"done": 0, "done%": 0.0},
{"done": 20, "done%": 20 / 169 * 100},
{"done": 60, "done%": 60 / 169 * 100},
{"done": 80, "done%": 80 / 169 * 100},
{"done": 120, "done%": 120 / 169 * 100},
{"done": 122, "done%": 122 / 169 * 100},
{"done": 10, "done%": 10 / 179 * 100},
{"done": 10, "done%": 10 / 179 * 100},
{"done": 10, "done%": 10 / 179 * 100},
{"done": 10 + 20, "done%": (10 + 20) / 179 * 100},
{"done": 10 + 60, "done%": (10 + 60) / 179 * 100},
{"done": 10 + 80, "done%": (10 + 80) / 179 * 100},
{"done": 10 + 120, "done%": (10 + 120) / 179 * 100},
{"done": 10 + 122, "done%": (10 + 122) / 179 * 100},
{"message": "1 errored, 1 skipped"},
{"done": 162, "done%": 162 / 169 * 100},
{"done": 169, "done%": 100.0},
{"done": 10 + 162, "done%": (10 + 162) / 179 * 100},
{"done": 179, "done%": 100.0},
{"status": "error", "message": "1 done, 1 errored, 1 skipped"},
],
),
],
)
def test_progress_combiner(
file_qty: int, inputs: list[tuple[str, dict]], expected: list[dict]
zarr_size: int, file_qty: int, inputs: list[tuple[str, dict]], expected: list[dict]
) -> None:
pc = ProgressCombiner(zarr_size=69105, file_qty=file_qty)
pc = ProgressCombiner(zarr_size=zarr_size, file_qty=file_qty)
outputs: list[dict] = []
for path, status in inputs:
outputs.extend(pc.feed(path, status))
Expand Down

0 comments on commit 59f5da0

Please sign in to comment.