diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md index 6fa6281b1..db079339e 100644 --- a/DEVELOPMENT.md +++ b/DEVELOPMENT.md @@ -85,6 +85,10 @@ development command line options. function will patch `requests` to log the results of calls to `requests.utils.super_len()` +- `DANDI_DOWNLOAD_AGGRESSIVE_RETRY` -- When set, would make `download()` retry + very aggressively - it would keep trying if at least some bytes are downloaded + on each attempt. Typically is not needed and could be a sign of network issues. + ## Sourcegraph The [Sourcegraph](https://sourcegraph.com) browser extension can be used to diff --git a/dandi/cli/command.py b/dandi/cli/command.py index 27f104af0..37c4818fc 100644 --- a/dandi/cli/command.py +++ b/dandi/cli/command.py @@ -114,14 +114,20 @@ def main(ctx, log_level, pdb=False): lambda r: r.name != "pyout" and not r.name.startswith("pyout.") ) root.addHandler(handler) + exts = ( + "dandischema", + "h5py", + "hdmf", + "pynwb", + "requests", + "urllib3", + ) lgr.info( - "dandi v%s, dandischema v%s, hdmf v%s, pynwb v%s, h5py v%s", + "python %s, dandi %s, " + + ", ".join("%s %s" % (e, get_module_version(e)) for e in sorted(exts)), + sys.version.split()[0], __version__, - get_module_version("dandischema"), - get_module_version("hdmf"), - get_module_version("pynwb"), - get_module_version("h5py"), extra={"file_only": True}, ) lgr.info("sys.argv = %r", sys.argv, extra={"file_only": True}) diff --git a/dandi/cli/tests/test_service_scripts.py b/dandi/cli/tests/test_service_scripts.py index 790d298a3..e1f3a85d3 100644 --- a/dandi/cli/tests/test_service_scripts.py +++ b/dandi/cli/tests/test_service_scripts.py @@ -11,7 +11,6 @@ from click.testing import CliRunner from dandischema.consts import DANDI_SCHEMA_VERSION import pytest -import vcr from dandi import __version__ from dandi.tests.fixtures import SampleDandiset @@ -76,9 +75,14 @@ def test_update_dandiset_from_doi( dandiset_id = new_dandiset.dandiset_id repository = new_dandiset.api.instance.gui monkeypatch.setenv("DANDI_API_KEY", new_dandiset.api.api_key) - if os.environ.get("DANDI_TESTS_NO_VCR", ""): + if os.environ.get("DANDI_TESTS_NO_VCR", "") or sys.version_info <= (3, 10): + # Older vcrpy has an issue with Python 3.9 and newer urllib2 >= 2 + # But we require newer urllib2 for more correct operation, and + # do still support 3.9. Remove when 3.9 support is dropped ctx = nullcontext() else: + import vcr + ctx = vcr.use_cassette( str(DATA_DIR / "update_dandiset_from_doi" / f"{name}.vcr.yaml"), before_record_request=record_only_doi_requests, diff --git a/dandi/dandiapi.py b/dandi/dandiapi.py index 5ea504de7..88951a2cd 100644 --- a/dandi/dandiapi.py +++ b/dandi/dandiapi.py @@ -1592,10 +1592,23 @@ def downloader(start_at: int = 0) -> Iterator[bytes]: # TODO: apparently we might need retries here as well etc # if result.status_code not in (200, 201): result.raise_for_status() + nbytes, nchunks = 0, 0 for chunk in result.iter_content(chunk_size=chunk_size): + nchunks += 1 if chunk: # could be some "keep alive"? + nbytes += len(chunk) yield chunk - lgr.info("Asset %s successfully downloaded", self.identifier) + else: + lgr.debug("'Empty' chunk downloaded for %s", url) + lgr.info( + "Asset %s (%d bytes in %d chunks starting from %d) successfully " + "downloaded from %s", + self.identifier, + nbytes, + nchunks, + start_at, + url, + ) return downloader diff --git a/dandi/download.py b/dandi/download.py index f67e95454..249732e1b 100644 --- a/dandi/download.py +++ b/dandi/download.py @@ -13,6 +13,7 @@ from pathlib import Path import random from shutil import rmtree +import sys from threading import Lock import time from types import TracebackType @@ -692,7 +693,10 @@ def _download_file( # TODO: how do we discover the total size???? # TODO: do not do it in-place, but rather into some "hidden" file resuming = False - for attempt in range(3): + attempt = 0 + nattempts = 3 # number to do, could be incremented if we downloaded a little + while attempt <= nattempts: + attempt += 1 try: if digester: downloaded_digest = digester() # start empty @@ -700,9 +704,15 @@ def _download_file( # I wonder if we could make writing async with downloader with DownloadDirectory(path, digests or {}) as dldir: assert dldir.offset is not None + downloaded_in_attempt = 0 downloaded = dldir.offset resuming = downloaded > 0 if size is not None and downloaded == size: + lgr.debug( + "%s - downloaded size matches target size of %d, exiting the loop", + path, + size, + ) # Exit early when downloaded == size, as making a Range # request in such a case results in a 416 error from S3. # Problems will result if `size` is None but we've already @@ -713,6 +723,7 @@ def _download_file( assert downloaded_digest is not None downloaded_digest.update(block) downloaded += len(block) + downloaded_in_attempt += len(block) # TODO: yield progress etc out: dict[str, Any] = {"done": downloaded} if size: @@ -738,30 +749,83 @@ def _download_file( # Catching RequestException lets us retry on timeout & connection # errors (among others) in addition to HTTP status errors. except requests.RequestException as exc: + sleep_amount = random.random() * 5 * attempt + if os.environ.get("DANDI_DOWNLOAD_AGGRESSIVE_RETRY"): + # in such a case if we downloaded a little more -- + # consider it a successful attempt + if downloaded_in_attempt > 0: + lgr.debug( + "%s - download failed on attempt #%d: %s, " + "but did download %d bytes, so considering " + "it a success and incrementing number of allowed attempts.", + path, + attempt, + exc, + downloaded_in_attempt, + ) + nattempts += 1 # TODO: actually we should probably retry only on selected codes, - # and also respect Retry-After - if attempt >= 2 or ( - exc.response is not None - and exc.response.status_code - not in ( + if exc.response is not None: + if exc.response.status_code not in ( 400, # Bad Request, but happened with gider: # https://github.com/dandi/dandi-cli/issues/87 *RETRY_STATUSES, + ): + lgr.debug( + "%s - download failed due to response %d: %s", + path, + exc.response.status_code, + exc, + ) + yield {"status": "error", "message": str(exc)} + return + elif retry_after := exc.response.headers.get("Retry-After"): + # playing safe + if not str(retry_after).isdigit(): + # our code is wrong, do not crash but issue warning so + # we might get report/fix it up + lgr.warning( + "%s - download failed due to response %d with non-integer" + " Retry-After=%r: %s", + path, + exc.response.status_code, + retry_after, + exc, + ) + yield {"status": "error", "message": str(exc)} + return + sleep_amount = int(retry_after) + lgr.debug( + "%s - download failed due to response %d with " + "Retry-After=%d: %s, will sleep and retry", + path, + exc.response.status_code, + sleep_amount, + exc, + ) + else: + lgr.debug("%s - download failed: %s", path, exc) + yield {"status": "error", "message": str(exc)} + return + elif attempt >= nattempts: + lgr.debug( + "%s - download failed after %d attempts: %s", path, attempt, exc ) - ): - lgr.debug("%s - download failed: %s", path, exc) yield {"status": "error", "message": str(exc)} return # if is_access_denied(exc) or attempt >= 2: # raise # sleep a little and retry - lgr.debug( - "%s - failed to download on attempt #%d: %s, will sleep a bit and retry", - path, - attempt, - exc, - ) - time.sleep(random.random() * 5) + else: + lgr.debug( + "%s - download failed on attempt #%d: %s, will sleep a bit and retry", + path, + attempt, + exc, + ) + time.sleep(sleep_amount) + else: + lgr.warning("downloader logic: We should not be here!") if downloaded_digest and not resuming: assert downloaded_digest is not None @@ -829,16 +893,22 @@ def __enter__(self) -> DownloadDirectory: ): # Pick up where we left off, writing to the end of the file lgr.debug( - "Download directory exists and has matching checksum; resuming download" + "%s - download directory exists and has matching checksum(s) %s; resuming download", + self.dirpath, + matching_algs, ) self.fp = self.writefile.open("ab") else: # Delete the file (if it even exists) and start anew if not chkpath.exists(): - lgr.debug("Starting new download in new download directory") + lgr.debug( + "%s - no prior digests found; starting new download", self.dirpath + ) else: lgr.debug( - "Download directory found, but digests do not match; starting new download" + "%s - download directory found, but digests do not match;" + " starting new download", + self.dirpath, ) try: self.writefile.unlink() @@ -857,12 +927,35 @@ def __exit__( exc_tb: TracebackType | None, ) -> None: assert self.fp is not None + if exc_type is not None or exc_val is not None or exc_tb is not None: + lgr.debug( + "%s - entered __exit__ with position %d with exception: %s, %s", + self.dirpath, + self.fp.tell(), + exc_type, + exc_val, + ) + else: + lgr.debug( + "%s - entered __exit__ with position %d without any exception", + self.dirpath, + self.fp.tell(), + ) self.fp.close() try: if exc_type is None: try: self.writefile.replace(self.filepath) - except IsADirectoryError: + except (IsADirectoryError, PermissionError) as exc: + if isinstance(exc, PermissionError): + if not ( + sys.platform.startswith("win") and self.filepath.is_dir() + ): + raise + lgr.debug( + "Destination path %s is a directory; removing it and retrying", + self.filepath, + ) rmtree(self.filepath) self.writefile.replace(self.filepath) finally: diff --git a/dandi/tests/test_download.py b/dandi/tests/test_download.py index cd74c1609..a4ad631c5 100644 --- a/dandi/tests/test_download.py +++ b/dandi/tests/test_download.py @@ -1,7 +1,10 @@ from __future__ import annotations from collections.abc import Callable +from glob import glob import json +import logging +from multiprocessing import Manager, Process import os import os.path from pathlib import Path @@ -21,6 +24,7 @@ from ..consts import DRAFT, dandiset_metadata_file from ..dandiarchive import DandisetURL from ..download import ( + DownloadDirectory, Downloader, DownloadExisting, DownloadFormat, @@ -1038,3 +1042,89 @@ def test_pyouthelper_time_remaining_1339(): assert len(done) == 2 else: assert done[-1] == f"ETA: {10 - i} seconds<" + + +@mark.skipif_on_windows # https://github.com/pytest-dev/pytest/issues/12964 +def test_DownloadDirectory_basic(tmp_path: Path) -> None: + with DownloadDirectory(tmp_path, digests={}) as dl: + assert dl.dirpath.exists() + assert dl.writefile.exists() + assert dl.writefile.stat().st_size == 0 + assert dl.offset == 0 + + dl.append(b"123") + assert dl.fp is not None + dl.fp.flush() # appends are not flushed automatically + assert dl.writefile.stat().st_size == 3 + assert dl.offset == 0 # doesn't change + + dl.append(b"456") + inode_number = dl.writefile.stat().st_ino + assert inode_number != tmp_path.stat().st_ino + + # but after we are done - should be a full file! + assert tmp_path.stat().st_size == 6 + assert tmp_path.read_bytes() == b"123456" + # we moved the file, didn't copy (expensive) + assert inode_number == tmp_path.stat().st_ino + + # no problem with overwriting with new content + with DownloadDirectory(tmp_path, digests={}) as dl: + dl.append(b"789") + assert tmp_path.read_bytes() == b"789" + + # even if path is a directory which we "overwrite" + tmp_path.unlink() + tmp_path.mkdir() + (tmp_path / "somedata.dat").write_text("content") + with DownloadDirectory(tmp_path, digests={}) as dl: + assert set(glob(f"{tmp_path}*")) == {str(tmp_path), str(dl.dirpath)} + dl.append(b"123") + assert tmp_path.read_bytes() == b"123" + + # no temp .dandidownload folder is left behind + assert set(glob(f"{tmp_path}*")) == {str(tmp_path)} + + # test locking + with Manager() as manager: + results = manager.list() + with DownloadDirectory(tmp_path, digests={}) as dl: + dl.append(b"123") + p1 = Process(target=_download_directory_subproc, args=(tmp_path, results)) + p1.start() + p1.join() + assert len(results) == 1 + assert results[0] == f"Could not acquire download lock for {tmp_path}" + assert tmp_path.read_bytes() == b"123" + + +# needs to be a top-level function for pickling +def _download_directory_subproc(path, results): + try: + with DownloadDirectory(path, digests={}): + results.append("re-entered fine") + except Exception as exc: + results.append(str(exc)) + + +def test_DownloadDirectory_exc( + tmp_path: Path, caplog: pytest.LogCaptureFixture +) -> None: + caplog.set_level(logging.DEBUG, logger="dandi") + # and now let's exit with exception + with pytest.raises(RuntimeError): + with DownloadDirectory(tmp_path, digests={}) as dl: + dl.append(b"456") + raise RuntimeError("Boom") + assert ( + "dandi", + 10, + f"{dl.dirpath} - entered __exit__ with position 3 with exception: " + ", Boom", + ) == caplog.record_tuples[-1] + # and we left without cleanup but closed things up after ourselves + assert tmp_path.exists() + assert tmp_path.is_dir() + assert dl.dirpath.exists() + assert dl.fp is None + assert dl.writefile.read_bytes() == b"456" diff --git a/setup.cfg b/setup.cfg index 28ca0318e..d51ac6fd2 100644 --- a/setup.cfg +++ b/setup.cfg @@ -55,6 +55,8 @@ install_requires = ruamel.yaml >=0.15, <1 semantic-version tenacity + # possibly silently incomplete downloads: https://github.com/dandi/dandi-cli/issues/1500 + urllib3 >= 2.0.0 yarl ~= 1.9 zarr ~= 2.10 zarr_checksum ~= 0.4.0 @@ -76,7 +78,7 @@ include = dandi* extensions = allensdk extras = - duecredit + duecredit >= 0.6.0 fsspec[http] style = flake8