From 920b40d8e5a46e434bc948438ad59cb3a7b09493 Mon Sep 17 00:00:00 2001 From: "John T. Wodder II" Date: Wed, 13 Jul 2022 10:45:20 -0400 Subject: [PATCH 01/17] Split up files.py --- dandi/files.py | 1324 --------------------------------------- dandi/files/__init__.py | 161 +++++ dandi/files/bases.py | 614 ++++++++++++++++++ dandi/files/zarr.py | 593 ++++++++++++++++++ setup.cfg | 1 - 5 files changed, 1368 insertions(+), 1325 deletions(-) delete mode 100644 dandi/files.py create mode 100644 dandi/files/__init__.py create mode 100644 dandi/files/bases.py create mode 100644 dandi/files/zarr.py diff --git a/dandi/files.py b/dandi/files.py deleted file mode 100644 index e7695c5fe..000000000 --- a/dandi/files.py +++ /dev/null @@ -1,1324 +0,0 @@ -""" -.. versionadded:: 0.36.0 - -This module defines functionality for working with local files & directories -(as opposed to remote resources on a DANDI Archive server) that are of interest -to DANDI. The classes for such files & directories all inherit from -`DandiFile`, which has two immediate subclasses: `DandisetMetadataFile`, for -representing :file:`dandiset.yaml` files, and `LocalAsset`, for representing -files that can be uploaded as assets to DANDI Archive. -""" - -from __future__ import annotations - -from abc import ABC, abstractmethod -import atexit -from collections import deque -from concurrent.futures import Future, ThreadPoolExecutor, as_completed -from contextlib import closing -from dataclasses import dataclass, field, replace -from datetime import datetime -import os -from pathlib import Path -import re -from threading import Lock -from time import sleep -from typing import ( - Any, - BinaryIO, - ClassVar, - Dict, - Generator, - Generic, - Iterator, - List, - Optional, - Tuple, - Union, - cast, -) -from xml.etree.ElementTree import fromstring - -from dandischema.digests.dandietag import DandiETag -from dandischema.digests.zarr import get_checksum -from dandischema.models import BareAsset, CommonModel -from dandischema.models import Dandiset as DandisetMeta -from dandischema.models import DigestType, get_schema_version -from nwbinspector import Importance, inspect_nwb, load_config -from pydantic import ValidationError -import requests -import zarr - -from . import get_logger -from .consts import ( - MAX_ZARR_DEPTH, - VIDEO_FILE_EXTENSIONS, - ZARR_MIME_TYPE, - ZARR_UPLOAD_BATCH_SIZE, - EmbargoStatus, - dandiset_metadata_file, -) -from .dandiapi import ( - RemoteAsset, - RemoteDandiset, - RemoteZarrAsset, - RemoteZarrEntry, - RESTFullAPIClient, -) -from .exceptions import UnknownAssetError -from .metadata import get_default_metadata, nwb2asset -from .misctypes import DUMMY_DIGEST, BasePath, Digest, P -from .pynwb_utils import validate as pynwb_validate -from .support.digests import ( - get_dandietag, - get_digest, - get_zarr_checksum, - md5file_nocache, -) -from .utils import chunked, pluralize, yaml_load - -lgr = get_logger() - -# TODO -- should come from schema. This is just a simplistic example for now -_required_dandiset_metadata_fields = ["identifier", "name", "description"] - - -@dataclass # type: ignore[misc] # -class DandiFile(ABC): - """Abstract base class for local files & directories of interest to DANDI""" - - #: The path to the actual file or directory on disk - filepath: Path - - @property - def size(self) -> int: - """The size of the file""" - return os.path.getsize(self.filepath) - - @property - def modified(self) -> datetime: - """The time at which the file was last modified""" - # TODO: Should this be overridden for LocalDirectoryAsset? - return datetime.fromtimestamp(self.filepath.stat().st_mtime).astimezone() - - @abstractmethod - def get_metadata( - self, - digest: Optional[Digest] = None, - ignore_errors: bool = True, - ) -> CommonModel: - """Return the Dandi metadata for the file""" - ... - - @abstractmethod - def get_validation_errors( - self, - schema_version: Optional[str] = None, - devel_debug: bool = False, - ) -> List[str]: - """ - Attempt to validate the file and return a list of errors encountered - """ - ... - - -class DandisetMetadataFile(DandiFile): - """Representation of a :file:`dandiset.yaml` file""" - - def get_metadata( - self, - digest: Optional[Digest] = None, - ignore_errors: bool = True, - ) -> DandisetMeta: - """Return the Dandiset metadata inside the file""" - with open(self.filepath) as f: - meta = yaml_load(f, typ="safe") - return DandisetMeta.unvalidated(**meta) - - # TODO: @validate_cache.memoize_path - def get_validation_errors( - self, - schema_version: Optional[str] = None, - devel_debug: bool = False, - ) -> List[str]: - with open(self.filepath) as f: - meta = yaml_load(f, typ="safe") - if schema_version is None: - schema_version = meta.get("schemaVersion") - if schema_version is None: - return _check_required_fields(meta, _required_dandiset_metadata_fields) - else: - current_version = get_schema_version() - if schema_version != current_version: - raise ValueError( - f"Unsupported schema version: {schema_version}; expected {current_version}" - ) - try: - DandisetMeta(**meta) - except ValidationError as e: - if devel_debug: - raise - lgr.warning( - "Validation error for %s: %s", - self.filepath, - e, - extra={"validating": True}, - ) - return [str(e)] - except Exception as e: - if devel_debug: - raise - lgr.warning( - "Unexpected validation error for %s: %s", - self.filepath, - e, - extra={"validating": True}, - ) - return [f"Failed to initialize Dandiset meta: {e}"] - return [] - - -@dataclass # type: ignore[misc] # -class LocalAsset(DandiFile): - """ - Representation of a file or directory that can be uploaded to a DANDI - Archive as an asset of a Dandiset - """ - - #: The forward-slash-separated path to the asset within its local Dandiset - #: (i.e., relative to the Dandiset's root) - path: str - - @abstractmethod - def get_digest(self) -> Digest: - """ - Calculate a DANDI etag digest for the asset using the appropriate - algorithm for its type - """ - ... - - @abstractmethod - def get_metadata( - self, - digest: Optional[Digest] = None, - ignore_errors: bool = True, - ) -> BareAsset: - """Return the Dandi metadata for the asset""" - ... - - # TODO: @validate_cache.memoize_path - def get_validation_errors( - self, - schema_version: Optional[str] = None, - devel_debug: bool = False, - ) -> List[str]: - if schema_version is not None: - current_version = get_schema_version() - if schema_version != current_version: - raise ValueError( - f"Unsupported schema version: {schema_version}; expected {current_version}" - ) - try: - asset = self.get_metadata(digest=DUMMY_DIGEST) - BareAsset(**asset.dict()) - except ValidationError as e: - if devel_debug: - raise - lgr.warning( - "Validation error for %s: %s", - self.filepath, - e, - extra={"validating": True}, - ) - return [str(e)] - except Exception as e: - if devel_debug: - raise - lgr.warning( - "Unexpected validation error for %s: %s", - self.filepath, - e, - extra={"validating": True}, - ) - return [f"Failed to read metadata: {e}"] - return [] - else: - # TODO: Do something else? - return [] - - def upload( - self, - dandiset: RemoteDandiset, - metadata: Dict[str, Any], - jobs: Optional[int] = None, - replacing: Optional[RemoteAsset] = None, - ) -> RemoteAsset: - """ - Upload the file as an asset with the given metadata to the given - Dandiset and return the resulting asset. Blocks until the upload is - complete. - - :param RemoteDandiset dandiset: - the Dandiset to which the file will be uploaded - :param dict metadata: - Metadata for the uploaded asset. The "path" field will be set to - the value of the instance's ``path`` attribute if no such field is - already present. - :param int jobs: Number of threads to use for uploading; defaults to 5 - :param RemoteAsset replacing: - If set, replace the given asset, which must have the same path as - the new asset - :rtype: RemoteAsset - """ - for status in self.iter_upload( - dandiset, metadata, jobs=jobs, replacing=replacing - ): - if status["status"] == "done": - a = status["asset"] - assert isinstance(a, RemoteAsset) - return a - raise AssertionError("iter_upload() finished without returning 'done'") - - @abstractmethod - def iter_upload( - self, - dandiset: RemoteDandiset, - metadata: Dict[str, Any], - jobs: Optional[int] = None, - replacing: Optional[RemoteAsset] = None, - ) -> Iterator[dict]: - """ - Upload the asset with the given metadata to the given Dandiset, - returning a generator of status `dict`\\s. - - :param RemoteDandiset dandiset: - the Dandiset to which the asset will be uploaded - :param dict metadata: - Metadata for the uploaded asset. The "path" field will be set to - the value of the instance's ``path`` attribute if no such field is - already present. - :param int jobs: Number of threads to use for uploading; defaults to 5 - :param RemoteAsset replacing: - If set, replace the given asset, which must have the same path as - the new asset - :returns: - A generator of `dict`\\s containing at least a ``"status"`` key. - Upon successful upload, the last `dict` will have a status of - ``"done"`` and an ``"asset"`` key containing the resulting - `RemoteAsset`. - """ - ... - - -class LocalFileAsset(LocalAsset): - """ - Representation of a regular file that can be uploaded to a DANDI Archive as - an asset of a Dandiset - """ - - EXTENSIONS: ClassVar[List[str]] = [] - - def get_metadata( - self, - digest: Optional[Digest] = None, - ignore_errors: bool = True, - ) -> BareAsset: - metadata = get_default_metadata(self.filepath, digest=digest) - metadata.path = self.path - return metadata - - def get_digest(self) -> Digest: - """Calculate a dandi-etag digest for the asset""" - value = get_digest(self.filepath, digest="dandi-etag") - return Digest.dandi_etag(value) - - def iter_upload( - self, - dandiset: RemoteDandiset, - metadata: Dict[str, Any], - jobs: Optional[int] = None, - replacing: Optional[RemoteAsset] = None, - ) -> Iterator[dict]: - """ - Upload the file as an asset with the given metadata to the given - Dandiset, returning a generator of status `dict`\\s. - - :param RemoteDandiset dandiset: - the Dandiset to which the file will be uploaded - :param dict metadata: - Metadata for the uploaded asset. The "path" field will be set to - the value of the instance's ``path`` attribute if no such field is - already present. - :param int jobs: Number of threads to use for uploading; defaults to 5 - :param RemoteAsset replacing: - If set, replace the given asset, which must have the same path as - the new asset - :returns: - A generator of `dict`\\s containing at least a ``"status"`` key. - Upon successful upload, the last `dict` will have a status of - ``"done"`` and an ``"asset"`` key containing the resulting - `RemoteAsset`. - """ - asset_path = metadata.setdefault("path", self.path) - client = dandiset.client - yield {"status": "calculating etag"} - etagger = get_dandietag(self.filepath) - filetag = etagger.as_str() - lgr.debug("Calculated dandi-etag of %s for %s", filetag, self.filepath) - digest = metadata.get("digest", {}) - if "dandi:dandi-etag" in digest: - if digest["dandi:dandi-etag"] != filetag: - raise RuntimeError( - f"{self.filepath}: File etag changed; was originally" - f" {digest['dandi:dandi-etag']} but is now {filetag}" - ) - yield {"status": "initiating upload"} - lgr.debug("%s: Beginning upload", asset_path) - total_size = self.size - try: - resp = client.post( - "/uploads/initialize/", - json={ - "contentSize": total_size, - "digest": { - "algorithm": "dandi:dandi-etag", - "value": filetag, - }, - "dandiset": dandiset.identifier, - }, - ) - except requests.HTTPError as e: - if e.response.status_code == 409: - lgr.debug("%s: Blob already exists on server", asset_path) - blob_id = e.response.headers["Location"] - else: - raise - else: - upload_id = resp["upload_id"] - parts = resp["parts"] - if len(parts) != etagger.part_qty: - raise RuntimeError( - f"Server and client disagree on number of parts for upload;" - f" server says {len(parts)}, client says {etagger.part_qty}" - ) - parts_out = [] - bytes_uploaded = 0 - lgr.debug("Uploading %s in %d parts", self.filepath, len(parts)) - with RESTFullAPIClient("http://nil.nil") as storage: - with self.filepath.open("rb") as fp: - with ThreadPoolExecutor(max_workers=jobs or 5) as executor: - lock = Lock() - futures = [ - executor.submit( - _upload_blob_part, - storage_session=storage, - fp=fp, - lock=lock, - etagger=etagger, - asset_path=asset_path, - part=part, - ) - for part in parts - ] - for fut in as_completed(futures): - out_part = fut.result() - bytes_uploaded += out_part["size"] - yield { - "status": "uploading", - "upload": 100 * bytes_uploaded / total_size, - "current": bytes_uploaded, - } - parts_out.append(out_part) - lgr.debug("%s: Completing upload", asset_path) - resp = client.post( - f"/uploads/{upload_id}/complete/", - json={"parts": parts_out}, - ) - lgr.debug( - "%s: Announcing completion to %s", - asset_path, - resp["complete_url"], - ) - r = storage.post( - resp["complete_url"], data=resp["body"], json_resp=False - ) - lgr.debug( - "%s: Upload completed. Response content: %s", - asset_path, - r.content, - ) - rxml = fromstring(r.text) - m = re.match(r"\{.+?\}", rxml.tag) - ns = m.group(0) if m else "" - final_etag = rxml.findtext(f"{ns}ETag") - if final_etag is not None: - final_etag = final_etag.strip('"') - if final_etag != filetag: - raise RuntimeError( - "Server and client disagree on final ETag of uploaded file;" - f" server says {final_etag}, client says {filetag}" - ) - # else: Error? Warning? - resp = client.post(f"/uploads/{upload_id}/validate/") - blob_id = resp["blob_id"] - lgr.debug("%s: Assigning asset blob to dandiset & version", asset_path) - yield {"status": "producing asset"} - if replacing is not None: - lgr.debug("%s: Replacing pre-existing asset", asset_path) - r = client.put( - replacing.api_path, - json={"metadata": metadata, "blob_id": blob_id}, - ) - else: - r = client.post( - f"{dandiset.version_api_path}assets/", - json={"metadata": metadata, "blob_id": blob_id}, - ) - a = RemoteAsset.from_data(dandiset, r) - lgr.info("%s: Asset successfully uploaded", asset_path) - yield {"status": "done", "asset": a} - - -class NWBAsset(LocalFileAsset): - """Representation of a local NWB file""" - - EXTENSIONS: ClassVar[List[str]] = [".nwb"] - - def get_metadata( - self, - digest: Optional[Digest] = None, - ignore_errors: bool = True, - ) -> BareAsset: - try: - metadata = nwb2asset(self.filepath, digest=digest) - except Exception as e: - lgr.warning( - "Failed to extract NWB metadata from %s: %s: %s", - self.filepath, - type(e).__name__, - str(e), - ) - if ignore_errors: - metadata = get_default_metadata(self.filepath, digest=digest) - else: - raise - metadata.path = self.path - return metadata - - # TODO: @validate_cache.memoize_path - def get_validation_errors( - self, - schema_version: Optional[str] = None, - devel_debug: bool = False, - ) -> List[str]: - errors: List[str] = pynwb_validate(self.filepath, devel_debug=devel_debug) - if schema_version is not None: - errors.extend( - super().get_validation_errors( - schema_version=schema_version, devel_debug=devel_debug - ) - ) - else: - # make sure that we have some basic metadata fields we require - try: - errors.extend( - [ - error.message - for error in inspect_nwb( - nwbfile_path=self.filepath, - skip_validate=True, - config=load_config(filepath_or_keyword="dandi"), - importance_threshold=Importance.CRITICAL, - ) - ] - ) - except Exception as e: - if devel_debug: - raise - lgr.warning( - "Failed to inspect NWBFile in %s: %s", - self.filepath, - e, - extra={"validating": True}, - ) - errors.append(f"Failed to inspect NWBFile: {e}") - return errors - - -class VideoAsset(LocalFileAsset): - EXTENSIONS: ClassVar[List[str]] = VIDEO_FILE_EXTENSIONS - - -class GenericAsset(LocalFileAsset): - """ - Representation of a generic regular file, one that is not of any known type - """ - - EXTENSIONS: ClassVar[List[str]] = [] - - -class LocalDirectoryAsset(LocalAsset, Generic[P]): - """ - Representation of a directory that can be uploaded to a DANDI Archive as - a single asset of a Dandiset. It is generic in ``P``, bound to - `dandi.misctypes.BasePath`. - """ - - EXTENSIONS: ClassVar[List[str]] = [] - - @property - @abstractmethod - def filetree(self) -> P: - """ - The path object for the root of the hierarchy of files within the - directory - """ - ... - - def iterfiles(self, include_dirs: bool = False) -> Iterator[P]: - """Yield all files within the directory""" - dirs = deque([self.filetree]) - while dirs: - for p in dirs.popleft().iterdir(): - if p.is_dir(): - dirs.append(p) - if include_dirs: - yield p - else: - yield p - - @property - def size(self) -> int: - """The total size of the files in the directory""" - return sum(p.size for p in self.iterfiles()) - - -@dataclass -class LocalZarrEntry(BasePath): - """A file or directory within a `ZarrAsset`""" - - #: The path to the actual file or directory on disk - filepath: Path - #: The path to the root of the Zarr file tree - zarr_basepath: Path - - def _get_subpath(self, name: str) -> LocalZarrEntry: - if not name or "/" in name: - raise ValueError(f"Invalid path component: {name!r}") - elif name == ".": - return self - elif name == "..": - return self.parent - else: - return replace( - self, filepath=self.filepath / name, parts=self.parts + (name,) - ) - - @property - def parent(self) -> LocalZarrEntry: - if self.is_root(): - return self - else: - return replace(self, filepath=self.filepath.parent, parts=self.parts[:-1]) - - def exists(self) -> bool: - return self.filepath.exists() - - def is_file(self) -> bool: - return self.filepath.is_file() - - def is_dir(self) -> bool: - return self.filepath.is_dir() - - def iterdir(self) -> Iterator[LocalZarrEntry]: - for p in self.filepath.iterdir(): - if p.is_dir() and not any(p.iterdir()): - # Ignore empty directories - continue - yield self._get_subpath(p.name) - - def get_digest(self) -> Digest: - """ - Calculate the DANDI etag digest for the entry. If the entry is a - directory, the algorithm will be the Dandi Zarr checksum algorithm; if - it is a file, it will be MD5. - """ - if self.is_dir(): - return Digest.dandi_zarr(get_zarr_checksum(self.filepath)) - else: - return Digest( - algorithm=DigestType.md5, value=get_digest(self.filepath, "md5") - ) - - @property - def size(self) -> int: - """ - The size of the entry. For a directory, this is the total size of all - entries within it. - """ - if self.is_dir(): - return sum(p.size for p in self.iterdir()) - else: - return os.path.getsize(self.filepath) - - @property - def modified(self) -> datetime: - """The time at which the entry was last modified""" - # TODO: Should this be overridden for directories? - return datetime.fromtimestamp(self.filepath.stat().st_mtime).astimezone() - - -@dataclass -class ZarrStat: - """Details about a Zarr asset""" - - #: The total size of the asset - size: int - #: The Dandi Zarr checksum of the asset - digest: Digest - #: A list of all files in the asset in unspecified order - files: List[LocalZarrEntry] - - -class ZarrAsset(LocalDirectoryAsset[LocalZarrEntry]): - """Representation of a local Zarr directory""" - - EXTENSIONS: ClassVar[List[str]] = [".ngff", ".zarr"] - - @property - def filetree(self) -> LocalZarrEntry: - """ - The `LocalZarrEntry` for the root of the hierarchy of files within the - Zarr asset - """ - return LocalZarrEntry( - filepath=self.filepath, zarr_basepath=self.filepath, parts=() - ) - - def stat(self) -> ZarrStat: - """Return various details about the Zarr asset""" - - def dirstat(dirpath: LocalZarrEntry) -> ZarrStat: - size = 0 - dir_md5s = {} - file_md5s = {} - files = [] - for p in dirpath.iterdir(): - if p.is_dir(): - st = dirstat(p) - size += st.size - dir_md5s[str(p)] = (st.digest.value, st.size) - files.extend(st.files) - else: - size += p.size - file_md5s[str(p)] = (md5file_nocache(p.filepath), p.size) - files.append(p) - return ZarrStat( - size=size, - digest=Digest.dandi_zarr(get_checksum(file_md5s, dir_md5s)), - files=files, - ) - - return dirstat(self.filetree) - - def get_digest(self) -> Digest: - """Calculate a dandi-zarr-checksum digest for the asset""" - return Digest.dandi_zarr(get_zarr_checksum(self.filepath)) - - def get_metadata( - self, - digest: Optional[Digest] = None, - ignore_errors: bool = True, - ) -> BareAsset: - metadata = get_default_metadata(self.filepath, digest=digest) - metadata.encodingFormat = ZARR_MIME_TYPE - metadata.path = self.path - return metadata - - def get_validation_errors( - self, - schema_version: Optional[str] = None, - devel_debug: bool = False, - ) -> List[str]: - try: - data = zarr.open(self.filepath) - except Exception as e: - if devel_debug: - raise - lgr.warning( - "Error opening %s: %s: %s", - self.filepath, - type(e).__name__, - e, - extra={"validating": True}, - ) - return [str(e)] - if isinstance(data, zarr.Group) and not data: - msg = "Zarr group is empty" - if devel_debug: - raise ValueError(msg) - lgr.warning("%s: %s", self.filepath, msg, extra={"validating": True}) - return [msg] - try: - next(self.filepath.glob(f"*{os.sep}" + os.sep.join(["*"] * MAX_ZARR_DEPTH))) - except StopIteration: - pass - else: - msg = f"Zarr directory tree more than {MAX_ZARR_DEPTH} directories deep" - if devel_debug: - raise ValueError(msg) - lgr.warning("%s: %s", self.filepath, msg, extra={"validating": True}) - return [msg] - # TODO: Should this be appended to the above errors? - return super().get_validation_errors( - schema_version=schema_version, devel_debug=devel_debug - ) - - def iter_upload( - self, - dandiset: RemoteDandiset, - metadata: Dict[str, Any], - jobs: Optional[int] = None, - replacing: Optional[RemoteAsset] = None, - ) -> Iterator[dict]: - """ - Upload the Zarr directory as an asset with the given metadata to the - given Dandiset, returning a generator of status `dict`\\s. - - :param RemoteDandiset dandiset: - the Dandiset to which the Zarr will be uploaded - :param dict metadata: - Metadata for the uploaded asset. The "path" field will be set to - the value of the instance's ``path`` attribute if no such field is - already present. - :param int jobs: Number of threads to use for uploading; defaults to 5 - :param RemoteAsset replacing: - If set, replace the given asset, which must have the same path as - the new asset; if the old asset is a Zarr, the Zarr will be updated - & reused for the new asset - :returns: - A generator of `dict`\\s containing at least a ``"status"`` key. - Upon successful upload, the last `dict` will have a status of - ``"done"`` and an ``"asset"`` key containing the resulting - `RemoteAsset`. - """ - # So that older clients don't get away with doing the wrong thing once - # Zarr upload to embargoed Dandisets is implemented in the API: - if dandiset.embargo_status is EmbargoStatus.EMBARGOED: - raise NotImplementedError( - "Uploading Zarr assets to embargoed Dandisets is currently not implemented" - ) - asset_path = metadata.setdefault("path", self.path) - client = dandiset.client - lgr.debug("%s: Producing asset", asset_path) - yield {"status": "producing asset"} - old_zarr_entries: Dict[str, RemoteZarrEntry] = {} - - def mkzarr() -> str: - nonlocal old_zarr_entries - try: - r = client.post( - "/zarr/", - json={"name": asset_path, "dandiset": dandiset.identifier}, - ) - except requests.HTTPError as e: - if "Zarr already exists" in e.response.text: - lgr.warning( - "%s: Found pre-existing Zarr at same path not" - " associated with any asset; reusing", - asset_path, - ) - (old_zarr,) = client.paginate( - "/zarr/", - params={ - "dandiset": dandiset.identifier, - "name": asset_path, - }, - ) - zarr_id = old_zarr["zarr_id"] - filetree = RemoteZarrEntry( - client=client, - zarr_id=zarr_id, - parts=(), - _known_dir=True, - ) - old_zarr_entries = { - str(e): e for e in filetree.iterfiles(include_dirs=True) - } - else: - raise - else: - zarr_id = r["zarr_id"] - assert isinstance(zarr_id, str) - return zarr_id - - if replacing is not None: - lgr.debug("%s: Replacing pre-existing asset", asset_path) - if isinstance(replacing, RemoteZarrAsset): - lgr.debug( - "%s: Pre-existing asset is a Zarr; reusing & updating", asset_path - ) - zarr_id = replacing.zarr - old_zarr_entries = { - str(e): e for e in replacing.iterfiles(include_dirs=True) - } - else: - lgr.debug( - "%s: Pre-existing asset is not a Zarr; minting new Zarr", asset_path - ) - zarr_id = mkzarr() - r = client.put( - replacing.api_path, - json={"metadata": metadata, "zarr_id": zarr_id}, - ) - else: - lgr.debug("%s: Minting new Zarr", asset_path) - zarr_id = mkzarr() - r = client.post( - f"{dandiset.version_api_path}assets/", - json={"metadata": metadata, "zarr_id": zarr_id}, - ) - a = RemoteAsset.from_data(dandiset, r) - assert isinstance(a, RemoteZarrAsset) - - total_size = 0 - to_upload = EntryUploadTracker() - if old_zarr_entries: - to_delete: List[RemoteZarrEntry] = [] - digesting: List[Future[Optional[Tuple[LocalZarrEntry, str]]]] = [] - yield {"status": "comparing against remote Zarr"} - with ThreadPoolExecutor(max_workers=jobs or 5) as executor: - for local_entry in self.iterfiles(): - total_size += local_entry.size - try: - remote_entry = old_zarr_entries.pop(str(local_entry)) - except KeyError: - for pp in local_entry.parents: - pps = str(pp) - if pps in old_zarr_entries: - if old_zarr_entries[pps].is_file(): - lgr.debug( - "%s: Parent path %s of file %s" - " corresponds to a remote file;" - " deleting remote", - asset_path, - pps, - local_entry, - ) - to_delete.append(old_zarr_entries.pop(pps)) - break - lgr.debug( - "%s: Path %s not present in remote Zarr; uploading", - asset_path, - local_entry, - ) - to_upload.register(local_entry) - else: - if remote_entry.is_dir(): - lgr.debug( - "%s: Path %s of local file is a directory in" - " remote Zarr; deleting remote & re-uploading", - asset_path, - local_entry, - ) - eprefix = str(remote_entry) + "/" - sub_e = [ - (k, v) - for k, v in old_zarr_entries.items() - if k.startswith(eprefix) - ] - for k, v in sub_e: - old_zarr_entries.pop(k) - to_delete.append(v) - to_upload.register(local_entry) - else: - digesting.append( - executor.submit( - _cmp_digests, - asset_path, - local_entry, - remote_entry.get_digest().value, - ) - ) - for dgstfut in as_completed(digesting): - try: - item = dgstfut.result() - except Exception: - for d in digesting: - d.cancel() - raise - else: - if item is not None: - local_entry, local_digest = item - to_upload.register(local_entry, local_digest) - if to_delete: - a.rmfiles(to_delete, reingest=False) - else: - yield {"status": "traversing local Zarr"} - for local_entry in self.iterfiles(): - total_size += local_entry.size - to_upload.register(local_entry) - yield {"status": "initiating upload", "size": total_size} - lgr.debug("%s: Beginning upload", asset_path) - bytes_uploaded = 0 - need_ingest = False - upload_data = ( - zarr_id, - client.get_url(f"/zarr/{zarr_id}/upload"), - cast(Optional[str], client.session.headers.get("Authorization")), - ) - with RESTFullAPIClient( - "http://nil.nil", - headers={"X-Amz-ACL": "bucket-owner-full-control"}, - ) as storage, closing(to_upload.get_items()) as upload_items: - for i, upload_body in enumerate( - chunked(upload_items, ZARR_UPLOAD_BATCH_SIZE), start=1 - ): - lgr.debug( - "%s: Uploading Zarr file batch #%d (%s)", - asset_path, - i, - pluralize(len(upload_body), "file"), - ) - r = client.post(f"/zarr/{zarr_id}/upload/", json=upload_body) - ZARR_UPLOADS_IN_PROGRESS.add(upload_data) - with ThreadPoolExecutor(max_workers=jobs or 5) as executor: - futures = [ - executor.submit( - _upload_zarr_file, - storage_session=storage, - path=self.filepath / upspec["path"], - upload_url=upspec["upload_url"], - ) - for upspec in r - ] - need_ingest = True - for fut in as_completed(futures): - try: - size = fut.result() - except Exception as e: - lgr.debug( - "Error uploading zarr: %s: %s", type(e).__name__, e - ) - lgr.debug("Cancelling upload") - for f in futures: - f.cancel() - executor.shutdown() - client.delete(f"/zarr/{zarr_id}/upload/") - raise - else: - bytes_uploaded += size - yield { - "status": "uploading", - "upload": 100 * bytes_uploaded / to_upload.total_size, - "current": bytes_uploaded, - } - lgr.debug("%s: Completing upload of batch #%d", asset_path, i) - client.post(f"/zarr/{zarr_id}/upload/complete/") - ZARR_UPLOADS_IN_PROGRESS.discard(upload_data) - lgr.debug("%s: All files uploaded", asset_path) - old_zarr_files = [e for e in old_zarr_entries.values() if e.is_file()] - if old_zarr_files: - yield {"status": "deleting extra remote files"} - lgr.debug( - "%s: Deleting %s in remote Zarr not present locally", - asset_path, - pluralize(len(old_zarr_files), "file"), - ) - a.rmfiles(old_zarr_files, reingest=False) - need_ingest = True - if need_ingest: - lgr.debug("%s: Waiting for server to calculate Zarr checksum", asset_path) - yield {"status": "server calculating checksum"} - client.post(f"/zarr/{zarr_id}/ingest/") - while True: - sleep(2) - r = client.get(f"/zarr/{zarr_id}/") - if r["status"] == "Complete": - break - lgr.info("%s: Asset successfully uploaded", asset_path) - else: - lgr.info("%s: No changes made to Zarr", asset_path) - yield {"status": "done", "asset": a} - - -def find_dandi_files( - *paths: Union[str, Path], - dandiset_path: Optional[Union[str, Path]] = None, - allow_all: bool = False, - include_metadata: bool = False, -) -> Iterator[DandiFile]: - """ - Yield all DANDI files at or under the paths in ``paths`` (which may be - either files or directories). Files & directories whose names start with a - period are ignored. Directories are only included in the return value if - they are of a type represented by a `LocalDirectoryAsset` subclass, in - which case they are not recursed into. - - :param dandiset_path: - The path to the root of the Dandiset in which the paths are located. - All paths in ``paths`` must be equal to or subpaths of - ``dandiset_path``. If `None`, then the Dandiset path for each asset - found is implicitly set to the parent directory. - :param allow_all: - If true, unrecognized assets and the Dandiset's :file:`dandiset.yaml` - file are returned as `GenericAsset` and `DandisetMetadataFile` - instances, respectively. If false, they are not returned at all. - :param include_metadata: - If true, the Dandiset's :file:`dandiset.yaml` file is returned as a - `DandisetMetadataFile` instance. If false, it is not returned at all - (unless ``allow_all`` is true). - """ - - path_queue: deque[Path] = deque() - for p in map(Path, paths): - if dandiset_path is not None: - try: - p.relative_to(dandiset_path) - except ValueError: - raise ValueError( - "Path {str(p)!r} is not inside Dandiset path {str(dandiset_path)!r}" - ) - path_queue.append(p) - while path_queue: - p = path_queue.popleft() - if p.name.startswith("."): - continue - if p.is_dir(): - if p.is_symlink(): - lgr.warning("%s: Ignoring unsupported symbolic link to directory", p) - elif dandiset_path is not None and p == Path(dandiset_path): - path_queue.extend(p.iterdir()) - elif any(p.iterdir()): - try: - df = dandi_file(p, dandiset_path) - except UnknownAssetError: - path_queue.extend(p.iterdir()) - else: - yield df - else: - df = dandi_file(p, dandiset_path) - if isinstance(df, GenericAsset) and not allow_all: - pass - elif isinstance(df, DandisetMetadataFile) and not ( - allow_all or include_metadata - ): - pass - else: - yield df - - -def dandi_file( - filepath: Union[str, Path], dandiset_path: Optional[Union[str, Path]] = None -) -> DandiFile: - """ - Return a `DandiFile` instance of the appropriate type for the file at - ``filepath`` inside the Dandiset rooted at ``dandiset_path``. If - ``dandiset_path`` is not set, it will default to ``filepath``'s parent - directory. - - If ``filepath`` is a directory, it must be of a type represented by a - `LocalDirectoryAsset` subclass; otherwise, an `UnknownAssetError` exception - will be raised. - - A regular file named :file:`dandiset.yaml` will only be represented by a - `DandisetMetadataFile` instance if it is at the root of the Dandiset. - - A regular file that is not of a known type will be represented by a - `GenericAsset` instance. - """ - filepath = Path(filepath) - if dandiset_path is not None: - path = filepath.relative_to(dandiset_path).as_posix() - if path == ".": - raise ValueError("Dandi file path cannot equal Dandiset path") - else: - path = filepath.name - if filepath.is_dir(): - if not any(filepath.iterdir()): - raise UnknownAssetError("Empty directories cannot be assets") - for dirclass in LocalDirectoryAsset.__subclasses__(): - if filepath.suffix in dirclass.EXTENSIONS: - return dirclass(filepath=filepath, path=path) # type: ignore[abstract] - raise UnknownAssetError( - f"Directory has unrecognized suffix {filepath.suffix!r}" - ) - elif path == dandiset_metadata_file: - return DandisetMetadataFile(filepath=filepath) - else: - for fileclass in LocalFileAsset.__subclasses__(): - if filepath.suffix in fileclass.EXTENSIONS: - return fileclass(filepath=filepath, path=path) # type: ignore[abstract] - return GenericAsset(filepath=filepath, path=path) - - -def _upload_blob_part( - storage_session: RESTFullAPIClient, - fp: BinaryIO, - lock: Lock, - etagger: DandiETag, - asset_path: str, - part: dict, -) -> dict: - etag_part = etagger.get_part(part["part_number"]) - if part["size"] != etag_part.size: - raise RuntimeError( - f"Server and client disagree on size of upload part" - f" {part['part_number']}; server says {part['size']}," - f" client says {etag_part.size}" - ) - with lock: - fp.seek(etag_part.offset) - chunk = fp.read(part["size"]) - if len(chunk) != part["size"]: - raise RuntimeError( - f"End of file {fp.name} reached unexpectedly early:" - f" read {len(chunk)} bytes of out of an expected {part['size']}" - ) - lgr.debug( - "%s: Uploading part %d/%d (%d bytes)", - asset_path, - part["part_number"], - etagger.part_qty, - part["size"], - ) - r = storage_session.put( - part["upload_url"], - data=chunk, - json_resp=False, - retry_statuses=[500], - ) - server_etag = r.headers["ETag"].strip('"') - lgr.debug( - "%s: Part upload finished ETag=%s Content-Length=%s", - asset_path, - server_etag, - r.headers.get("Content-Length"), - ) - client_etag = etagger.get_part_etag(etag_part) - if server_etag != client_etag: - raise RuntimeError( - f"Server and client disagree on ETag of upload part" - f" {part['part_number']}; server says" - f" {server_etag}, client says {client_etag}" - ) - return { - "part_number": part["part_number"], - "size": part["size"], - "etag": server_etag, - } - - -def _upload_zarr_file( - storage_session: RESTFullAPIClient, path: Path, upload_url: str -) -> int: - with path.open("rb") as fp: - storage_session.put( - upload_url, data=fp, json_resp=False, retry_if=_retry_zarr_file - ) - return path.stat().st_size - - -def _retry_zarr_file(r: requests.Response) -> bool: - # Some sort of filesystem hiccup can cause requests to be unable to get the - # filesize, leading to it falling back to "chunked" transfer encoding, - # which S3 doesn't support. - return ( - r.status_code == 501 - and "header you provided implies functionality that is not implemented" - in r.text - ) - - -def _check_required_fields(d: dict, required: List[str]) -> List[str]: - errors: List[str] = [] - for f in required: - v = d.get(f, None) - if not v or (isinstance(v, str) and not v.strip()): - errors += [f"Required field {f!r} has no value"] - if v in ("REQUIRED", "PLACEHOLDER"): - errors += [f"Required field {f!r} has value {v!r}"] - return errors - - -@dataclass -class EntryUploadTracker: - """ - Class for keeping track of `LocalZarrEntry` instances to upload - - :meta private: - """ - - total_size: int = 0 - digested_entries: List[Tuple[LocalZarrEntry, str]] = field(default_factory=list) - fresh_entries: List[LocalZarrEntry] = field(default_factory=list) - - def register(self, e: LocalZarrEntry, digest: Optional[str] = None) -> None: - if digest is not None: - self.digested_entries.append((e, digest)) - else: - self.fresh_entries.append(e) - self.total_size += e.size - - @staticmethod - def _mkitem(e: LocalZarrEntry) -> dict: - digest = md5file_nocache(e.filepath) - return {"path": str(e), "etag": digest} - - def get_items(self, jobs: int = 5) -> Generator[dict, None, None]: - # Note: In order for the ThreadPoolExecutor to be closed if an error - # occurs during upload, the method must be used like this: - # - # with contextlib.closing(to_upload.get_items()) as upload_items: - # for item in upload_items: - # ... - for e, digest in self.digested_entries: - yield {"path": str(e), "etag": digest} - if not self.fresh_entries: - return - with ThreadPoolExecutor(max_workers=jobs) as executor: - futures = [executor.submit(self._mkitem, e) for e in self.fresh_entries] - for fut in as_completed(futures): - try: - yield fut.result() - # Use BaseException to also catch GeneratorExit thrown by - # closing() - except BaseException: - for f in futures: - f.cancel() - raise - - -def _cmp_digests( - asset_path: str, local_entry: LocalZarrEntry, remote_digest: str -) -> Optional[Tuple[LocalZarrEntry, str]]: - local_digest = md5file_nocache(local_entry.filepath) - if local_digest != remote_digest: - lgr.debug( - "%s: Path %s in Zarr differs from local file; re-uploading", - asset_path, - local_entry, - ) - return (local_entry, local_digest) - else: - lgr.debug("%s: File %s already on server; skipping", asset_path, local_entry) - return None - - -# Collection of (zarr ID, upload endpoint URL, auth header value) tuples -ZARR_UPLOADS_IN_PROGRESS: set[tuple[str, str, Optional[str]]] = set() - - -@atexit.register -def cancel_zarr_uploads() -> None: - for zarr_id, url, auth in ZARR_UPLOADS_IN_PROGRESS: - lgr.debug("Cancelling upload for Zarr %s", zarr_id) - headers = {"Authorization": auth} if auth is not None else {} - r = requests.delete(url, headers=headers) - if not r.ok: - lgr.warning( - "Upload cancellation failed with %d: %s: %s", - r.status_code, - r.reason, - r.text, - ) diff --git a/dandi/files/__init__.py b/dandi/files/__init__.py new file mode 100644 index 000000000..06bd7cd0a --- /dev/null +++ b/dandi/files/__init__.py @@ -0,0 +1,161 @@ +""" +.. versionadded:: 0.36.0 + +This module defines functionality for working with local files & directories +(as opposed to remote resources on a DANDI Archive server) that are of interest +to DANDI. The classes for such files & directories all inherit from +`DandiFile`, which has two immediate subclasses: `DandisetMetadataFile`, for +representing :file:`dandiset.yaml` files, and `LocalAsset`, for representing +files that can be uploaded as assets to DANDI Archive. +""" + +from __future__ import annotations + +from collections import deque +from collections.abc import Iterator +from pathlib import Path +from typing import Optional + +from dandi import get_logger +from dandi.consts import dandiset_metadata_file +from dandi.exceptions import UnknownAssetError + +from .bases import ( + DandiFile, + DandisetMetadataFile, + GenericAsset, + LocalAsset, + LocalDirectoryAsset, + LocalFileAsset, + NWBAsset, + VideoAsset, +) +from .zarr import LocalZarrEntry, ZarrAsset, ZarrStat + +__all__ = [ + "DandiFile", + "DandisetMetadataFile", + "LocalAsset", + "LocalFileAsset", + "NWBAsset", + "VideoAsset", + "GenericAsset", + "LocalDirectoryAsset", + "LocalZarrEntry", + "ZarrStat", + "ZarrAsset", + "find_dandi_files", + "dandi_file", +] + +lgr = get_logger() + + +def find_dandi_files( + *paths: str | Path, + dandiset_path: Optional[str | Path] = None, + allow_all: bool = False, + include_metadata: bool = False, +) -> Iterator[DandiFile]: + """ + Yield all DANDI files at or under the paths in ``paths`` (which may be + either files or directories). Files & directories whose names start with a + period are ignored. Directories are only included in the return value if + they are of a type represented by a `LocalDirectoryAsset` subclass, in + which case they are not recursed into. + + :param dandiset_path: + The path to the root of the Dandiset in which the paths are located. + All paths in ``paths`` must be equal to or subpaths of + ``dandiset_path``. If `None`, then the Dandiset path for each asset + found is implicitly set to the parent directory. + :param allow_all: + If true, unrecognized assets and the Dandiset's :file:`dandiset.yaml` + file are returned as `GenericAsset` and `DandisetMetadataFile` + instances, respectively. If false, they are not returned at all. + :param include_metadata: + If true, the Dandiset's :file:`dandiset.yaml` file is returned as a + `DandisetMetadataFile` instance. If false, it is not returned at all + (unless ``allow_all`` is true). + """ + + path_queue: deque[Path] = deque() + for p in map(Path, paths): + if dandiset_path is not None: + try: + p.relative_to(dandiset_path) + except ValueError: + raise ValueError( + "Path {str(p)!r} is not inside Dandiset path {str(dandiset_path)!r}" + ) + path_queue.append(p) + while path_queue: + p = path_queue.popleft() + if p.name.startswith("."): + continue + if p.is_dir(): + if p.is_symlink(): + lgr.warning("%s: Ignoring unsupported symbolic link to directory", p) + elif dandiset_path is not None and p == Path(dandiset_path): + path_queue.extend(p.iterdir()) + elif any(p.iterdir()): + try: + df = dandi_file(p, dandiset_path) + except UnknownAssetError: + path_queue.extend(p.iterdir()) + else: + yield df + else: + df = dandi_file(p, dandiset_path) + if isinstance(df, GenericAsset) and not allow_all: + pass + elif isinstance(df, DandisetMetadataFile) and not ( + allow_all or include_metadata + ): + pass + else: + yield df + + +def dandi_file( + filepath: str | Path, dandiset_path: Optional[str | Path] = None +) -> DandiFile: + """ + Return a `DandiFile` instance of the appropriate type for the file at + ``filepath`` inside the Dandiset rooted at ``dandiset_path``. If + ``dandiset_path`` is not set, it will default to ``filepath``'s parent + directory. + + If ``filepath`` is a directory, it must be of a type represented by a + `LocalDirectoryAsset` subclass; otherwise, an `UnknownAssetError` exception + will be raised. + + A regular file named :file:`dandiset.yaml` will only be represented by a + `DandisetMetadataFile` instance if it is at the root of the Dandiset. + + A regular file that is not of a known type will be represented by a + `GenericAsset` instance. + """ + filepath = Path(filepath) + if dandiset_path is not None: + path = filepath.relative_to(dandiset_path).as_posix() + if path == ".": + raise ValueError("Dandi file path cannot equal Dandiset path") + else: + path = filepath.name + if filepath.is_dir(): + if not any(filepath.iterdir()): + raise UnknownAssetError("Empty directories cannot be assets") + for dirclass in LocalDirectoryAsset.__subclasses__(): + if filepath.suffix in dirclass.EXTENSIONS: + return dirclass(filepath=filepath, path=path) # type: ignore[abstract] + raise UnknownAssetError( + f"Directory has unrecognized suffix {filepath.suffix!r}" + ) + elif path == dandiset_metadata_file: + return DandisetMetadataFile(filepath=filepath) + else: + for fileclass in LocalFileAsset.__subclasses__(): + if filepath.suffix in fileclass.EXTENSIONS: + return fileclass(filepath=filepath, path=path) # type: ignore[abstract] + return GenericAsset(filepath=filepath, path=path) diff --git a/dandi/files/bases.py b/dandi/files/bases.py new file mode 100644 index 000000000..98be9d402 --- /dev/null +++ b/dandi/files/bases.py @@ -0,0 +1,614 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from collections import deque +from collections.abc import Iterator +from concurrent.futures import ThreadPoolExecutor, as_completed +from dataclasses import dataclass +from datetime import datetime +import os +from pathlib import Path +import re +from threading import Lock +from typing import Any, BinaryIO, ClassVar, Generic, Optional +from xml.etree.ElementTree import fromstring + +from dandischema.digests.dandietag import DandiETag +from dandischema.models import BareAsset, CommonModel +from dandischema.models import Dandiset as DandisetMeta +from dandischema.models import get_schema_version +from nwbinspector import Importance, inspect_nwb, load_config +from pydantic import ValidationError +import requests + +from dandi import get_logger +from dandi.consts import VIDEO_FILE_EXTENSIONS +from dandi.dandiapi import RemoteAsset, RemoteDandiset, RESTFullAPIClient +from dandi.metadata import get_default_metadata, nwb2asset +from dandi.misctypes import DUMMY_DIGEST, Digest, P +from dandi.pynwb_utils import validate as pynwb_validate +from dandi.support.digests import get_dandietag, get_digest +from dandi.utils import yaml_load + +lgr = get_logger() + +# TODO -- should come from schema. This is just a simplistic example for now +_required_dandiset_metadata_fields = ["identifier", "name", "description"] + + +@dataclass # type: ignore[misc] # +class DandiFile(ABC): + """Abstract base class for local files & directories of interest to DANDI""" + + #: The path to the actual file or directory on disk + filepath: Path + + @property + def size(self) -> int: + """The size of the file""" + return os.path.getsize(self.filepath) + + @property + def modified(self) -> datetime: + """The time at which the file was last modified""" + # TODO: Should this be overridden for LocalDirectoryAsset? + return datetime.fromtimestamp(self.filepath.stat().st_mtime).astimezone() + + @abstractmethod + def get_metadata( + self, + digest: Optional[Digest] = None, + ignore_errors: bool = True, + ) -> CommonModel: + """Return the Dandi metadata for the file""" + ... + + @abstractmethod + def get_validation_errors( + self, + schema_version: Optional[str] = None, + devel_debug: bool = False, + ) -> list[str]: + """ + Attempt to validate the file and return a list of errors encountered + """ + ... + + +class DandisetMetadataFile(DandiFile): + """Representation of a :file:`dandiset.yaml` file""" + + def get_metadata( + self, + digest: Optional[Digest] = None, + ignore_errors: bool = True, + ) -> DandisetMeta: + """Return the Dandiset metadata inside the file""" + with open(self.filepath) as f: + meta = yaml_load(f, typ="safe") + return DandisetMeta.unvalidated(**meta) + + # TODO: @validate_cache.memoize_path + def get_validation_errors( + self, + schema_version: Optional[str] = None, + devel_debug: bool = False, + ) -> list[str]: + with open(self.filepath) as f: + meta = yaml_load(f, typ="safe") + if schema_version is None: + schema_version = meta.get("schemaVersion") + if schema_version is None: + return _check_required_fields(meta, _required_dandiset_metadata_fields) + else: + current_version = get_schema_version() + if schema_version != current_version: + raise ValueError( + f"Unsupported schema version: {schema_version}; expected {current_version}" + ) + try: + DandisetMeta(**meta) + except ValidationError as e: + if devel_debug: + raise + lgr.warning( + "Validation error for %s: %s", + self.filepath, + e, + extra={"validating": True}, + ) + return [str(e)] + except Exception as e: + if devel_debug: + raise + lgr.warning( + "Unexpected validation error for %s: %s", + self.filepath, + e, + extra={"validating": True}, + ) + return [f"Failed to initialize Dandiset meta: {e}"] + return [] + + +@dataclass # type: ignore[misc] # +class LocalAsset(DandiFile): + """ + Representation of a file or directory that can be uploaded to a DANDI + Archive as an asset of a Dandiset + """ + + #: The forward-slash-separated path to the asset within its local Dandiset + #: (i.e., relative to the Dandiset's root) + path: str + + @abstractmethod + def get_digest(self) -> Digest: + """ + Calculate a DANDI etag digest for the asset using the appropriate + algorithm for its type + """ + ... + + @abstractmethod + def get_metadata( + self, + digest: Optional[Digest] = None, + ignore_errors: bool = True, + ) -> BareAsset: + """Return the Dandi metadata for the asset""" + ... + + # TODO: @validate_cache.memoize_path + def get_validation_errors( + self, + schema_version: Optional[str] = None, + devel_debug: bool = False, + ) -> list[str]: + if schema_version is not None: + current_version = get_schema_version() + if schema_version != current_version: + raise ValueError( + f"Unsupported schema version: {schema_version}; expected {current_version}" + ) + try: + asset = self.get_metadata(digest=DUMMY_DIGEST) + BareAsset(**asset.dict()) + except ValidationError as e: + if devel_debug: + raise + lgr.warning( + "Validation error for %s: %s", + self.filepath, + e, + extra={"validating": True}, + ) + return [str(e)] + except Exception as e: + if devel_debug: + raise + lgr.warning( + "Unexpected validation error for %s: %s", + self.filepath, + e, + extra={"validating": True}, + ) + return [f"Failed to read metadata: {e}"] + return [] + else: + # TODO: Do something else? + return [] + + def upload( + self, + dandiset: RemoteDandiset, + metadata: dict[str, Any], + jobs: Optional[int] = None, + replacing: Optional[RemoteAsset] = None, + ) -> RemoteAsset: + """ + Upload the file as an asset with the given metadata to the given + Dandiset and return the resulting asset. Blocks until the upload is + complete. + + :param RemoteDandiset dandiset: + the Dandiset to which the file will be uploaded + :param dict metadata: + Metadata for the uploaded asset. The "path" field will be set to + the value of the instance's ``path`` attribute if no such field is + already present. + :param int jobs: Number of threads to use for uploading; defaults to 5 + :param RemoteAsset replacing: + If set, replace the given asset, which must have the same path as + the new asset + :rtype: RemoteAsset + """ + for status in self.iter_upload( + dandiset, metadata, jobs=jobs, replacing=replacing + ): + if status["status"] == "done": + a = status["asset"] + assert isinstance(a, RemoteAsset) + return a + raise AssertionError("iter_upload() finished without returning 'done'") + + @abstractmethod + def iter_upload( + self, + dandiset: RemoteDandiset, + metadata: dict[str, Any], + jobs: Optional[int] = None, + replacing: Optional[RemoteAsset] = None, + ) -> Iterator[dict]: + """ + Upload the asset with the given metadata to the given Dandiset, + returning a generator of status `dict`\\s. + + :param RemoteDandiset dandiset: + the Dandiset to which the asset will be uploaded + :param dict metadata: + Metadata for the uploaded asset. The "path" field will be set to + the value of the instance's ``path`` attribute if no such field is + already present. + :param int jobs: Number of threads to use for uploading; defaults to 5 + :param RemoteAsset replacing: + If set, replace the given asset, which must have the same path as + the new asset + :returns: + A generator of `dict`\\s containing at least a ``"status"`` key. + Upon successful upload, the last `dict` will have a status of + ``"done"`` and an ``"asset"`` key containing the resulting + `RemoteAsset`. + """ + ... + + +class LocalFileAsset(LocalAsset): + """ + Representation of a regular file that can be uploaded to a DANDI Archive as + an asset of a Dandiset + """ + + EXTENSIONS: ClassVar[list[str]] = [] + + def get_metadata( + self, + digest: Optional[Digest] = None, + ignore_errors: bool = True, + ) -> BareAsset: + metadata = get_default_metadata(self.filepath, digest=digest) + metadata.path = self.path + return metadata + + def get_digest(self) -> Digest: + """Calculate a dandi-etag digest for the asset""" + value = get_digest(self.filepath, digest="dandi-etag") + return Digest.dandi_etag(value) + + def iter_upload( + self, + dandiset: RemoteDandiset, + metadata: dict[str, Any], + jobs: Optional[int] = None, + replacing: Optional[RemoteAsset] = None, + ) -> Iterator[dict]: + """ + Upload the file as an asset with the given metadata to the given + Dandiset, returning a generator of status `dict`\\s. + + :param RemoteDandiset dandiset: + the Dandiset to which the file will be uploaded + :param dict metadata: + Metadata for the uploaded asset. The "path" field will be set to + the value of the instance's ``path`` attribute if no such field is + already present. + :param int jobs: Number of threads to use for uploading; defaults to 5 + :param RemoteAsset replacing: + If set, replace the given asset, which must have the same path as + the new asset + :returns: + A generator of `dict`\\s containing at least a ``"status"`` key. + Upon successful upload, the last `dict` will have a status of + ``"done"`` and an ``"asset"`` key containing the resulting + `RemoteAsset`. + """ + asset_path = metadata.setdefault("path", self.path) + client = dandiset.client + yield {"status": "calculating etag"} + etagger = get_dandietag(self.filepath) + filetag = etagger.as_str() + lgr.debug("Calculated dandi-etag of %s for %s", filetag, self.filepath) + digest = metadata.get("digest", {}) + if "dandi:dandi-etag" in digest: + if digest["dandi:dandi-etag"] != filetag: + raise RuntimeError( + f"{self.filepath}: File etag changed; was originally" + f" {digest['dandi:dandi-etag']} but is now {filetag}" + ) + yield {"status": "initiating upload"} + lgr.debug("%s: Beginning upload", asset_path) + total_size = self.size + try: + resp = client.post( + "/uploads/initialize/", + json={ + "contentSize": total_size, + "digest": { + "algorithm": "dandi:dandi-etag", + "value": filetag, + }, + "dandiset": dandiset.identifier, + }, + ) + except requests.HTTPError as e: + if e.response.status_code == 409: + lgr.debug("%s: Blob already exists on server", asset_path) + blob_id = e.response.headers["Location"] + else: + raise + else: + upload_id = resp["upload_id"] + parts = resp["parts"] + if len(parts) != etagger.part_qty: + raise RuntimeError( + f"Server and client disagree on number of parts for upload;" + f" server says {len(parts)}, client says {etagger.part_qty}" + ) + parts_out = [] + bytes_uploaded = 0 + lgr.debug("Uploading %s in %d parts", self.filepath, len(parts)) + with RESTFullAPIClient("http://nil.nil") as storage: + with self.filepath.open("rb") as fp: + with ThreadPoolExecutor(max_workers=jobs or 5) as executor: + lock = Lock() + futures = [ + executor.submit( + _upload_blob_part, + storage_session=storage, + fp=fp, + lock=lock, + etagger=etagger, + asset_path=asset_path, + part=part, + ) + for part in parts + ] + for fut in as_completed(futures): + out_part = fut.result() + bytes_uploaded += out_part["size"] + yield { + "status": "uploading", + "upload": 100 * bytes_uploaded / total_size, + "current": bytes_uploaded, + } + parts_out.append(out_part) + lgr.debug("%s: Completing upload", asset_path) + resp = client.post( + f"/uploads/{upload_id}/complete/", + json={"parts": parts_out}, + ) + lgr.debug( + "%s: Announcing completion to %s", + asset_path, + resp["complete_url"], + ) + r = storage.post( + resp["complete_url"], data=resp["body"], json_resp=False + ) + lgr.debug( + "%s: Upload completed. Response content: %s", + asset_path, + r.content, + ) + rxml = fromstring(r.text) + m = re.match(r"\{.+?\}", rxml.tag) + ns = m.group(0) if m else "" + final_etag = rxml.findtext(f"{ns}ETag") + if final_etag is not None: + final_etag = final_etag.strip('"') + if final_etag != filetag: + raise RuntimeError( + "Server and client disagree on final ETag of uploaded file;" + f" server says {final_etag}, client says {filetag}" + ) + # else: Error? Warning? + resp = client.post(f"/uploads/{upload_id}/validate/") + blob_id = resp["blob_id"] + lgr.debug("%s: Assigning asset blob to dandiset & version", asset_path) + yield {"status": "producing asset"} + if replacing is not None: + lgr.debug("%s: Replacing pre-existing asset", asset_path) + r = client.put( + replacing.api_path, + json={"metadata": metadata, "blob_id": blob_id}, + ) + else: + r = client.post( + f"{dandiset.version_api_path}assets/", + json={"metadata": metadata, "blob_id": blob_id}, + ) + a = RemoteAsset.from_data(dandiset, r) + lgr.info("%s: Asset successfully uploaded", asset_path) + yield {"status": "done", "asset": a} + + +class NWBAsset(LocalFileAsset): + """Representation of a local NWB file""" + + EXTENSIONS: ClassVar[list[str]] = [".nwb"] + + def get_metadata( + self, + digest: Optional[Digest] = None, + ignore_errors: bool = True, + ) -> BareAsset: + try: + metadata = nwb2asset(self.filepath, digest=digest) + except Exception as e: + lgr.warning( + "Failed to extract NWB metadata from %s: %s: %s", + self.filepath, + type(e).__name__, + str(e), + ) + if ignore_errors: + metadata = get_default_metadata(self.filepath, digest=digest) + else: + raise + metadata.path = self.path + return metadata + + # TODO: @validate_cache.memoize_path + def get_validation_errors( + self, + schema_version: Optional[str] = None, + devel_debug: bool = False, + ) -> list[str]: + errors: list[str] = pynwb_validate(self.filepath, devel_debug=devel_debug) + if schema_version is not None: + errors.extend( + super().get_validation_errors( + schema_version=schema_version, devel_debug=devel_debug + ) + ) + else: + # make sure that we have some basic metadata fields we require + try: + errors.extend( + [ + error.message + for error in inspect_nwb( + nwbfile_path=self.filepath, + skip_validate=True, + config=load_config(filepath_or_keyword="dandi"), + importance_threshold=Importance.CRITICAL, + ) + ] + ) + except Exception as e: + if devel_debug: + raise + lgr.warning( + "Failed to inspect NWBFile in %s: %s", + self.filepath, + e, + extra={"validating": True}, + ) + errors.append(f"Failed to inspect NWBFile: {e}") + return errors + + +class VideoAsset(LocalFileAsset): + EXTENSIONS: ClassVar[list[str]] = VIDEO_FILE_EXTENSIONS + + +class GenericAsset(LocalFileAsset): + """ + Representation of a generic regular file, one that is not of any known type + """ + + EXTENSIONS: ClassVar[list[str]] = [] + + +class LocalDirectoryAsset(LocalAsset, Generic[P]): + """ + Representation of a directory that can be uploaded to a DANDI Archive as + a single asset of a Dandiset. It is generic in ``P``, bound to + `dandi.misctypes.BasePath`. + """ + + EXTENSIONS: ClassVar[list[str]] = [] + + @property + @abstractmethod + def filetree(self) -> P: + """ + The path object for the root of the hierarchy of files within the + directory + """ + ... + + def iterfiles(self, include_dirs: bool = False) -> Iterator[P]: + """Yield all files within the directory""" + dirs = deque([self.filetree]) + while dirs: + for p in dirs.popleft().iterdir(): + if p.is_dir(): + dirs.append(p) + if include_dirs: + yield p + else: + yield p + + @property + def size(self) -> int: + """The total size of the files in the directory""" + return sum(p.size for p in self.iterfiles()) + + +def _upload_blob_part( + storage_session: RESTFullAPIClient, + fp: BinaryIO, + lock: Lock, + etagger: DandiETag, + asset_path: str, + part: dict, +) -> dict: + etag_part = etagger.get_part(part["part_number"]) + if part["size"] != etag_part.size: + raise RuntimeError( + f"Server and client disagree on size of upload part" + f" {part['part_number']}; server says {part['size']}," + f" client says {etag_part.size}" + ) + with lock: + fp.seek(etag_part.offset) + chunk = fp.read(part["size"]) + if len(chunk) != part["size"]: + raise RuntimeError( + f"End of file {fp.name} reached unexpectedly early:" + f" read {len(chunk)} bytes of out of an expected {part['size']}" + ) + lgr.debug( + "%s: Uploading part %d/%d (%d bytes)", + asset_path, + part["part_number"], + etagger.part_qty, + part["size"], + ) + r = storage_session.put( + part["upload_url"], + data=chunk, + json_resp=False, + retry_statuses=[500], + ) + server_etag = r.headers["ETag"].strip('"') + lgr.debug( + "%s: Part upload finished ETag=%s Content-Length=%s", + asset_path, + server_etag, + r.headers.get("Content-Length"), + ) + client_etag = etagger.get_part_etag(etag_part) + if server_etag != client_etag: + raise RuntimeError( + f"Server and client disagree on ETag of upload part" + f" {part['part_number']}; server says" + f" {server_etag}, client says {client_etag}" + ) + return { + "part_number": part["part_number"], + "size": part["size"], + "etag": server_etag, + } + + +def _check_required_fields(d: dict, required: list[str]) -> list[str]: + errors: list[str] = [] + for f in required: + v = d.get(f, None) + if not v or (isinstance(v, str) and not v.strip()): + errors += [f"Required field {f!r} has no value"] + if v in ("REQUIRED", "PLACEHOLDER"): + errors += [f"Required field {f!r} has value {v!r}"] + return errors diff --git a/dandi/files/zarr.py b/dandi/files/zarr.py new file mode 100644 index 000000000..2a769b64d --- /dev/null +++ b/dandi/files/zarr.py @@ -0,0 +1,593 @@ +from __future__ import annotations + +import atexit +from collections.abc import Generator, Iterator +from concurrent.futures import Future, ThreadPoolExecutor, as_completed +from contextlib import closing +from dataclasses import dataclass, field, replace +from datetime import datetime +import os +from pathlib import Path +from time import sleep +from typing import Any, ClassVar, Optional, cast + +from dandischema.digests.zarr import get_checksum +from dandischema.models import BareAsset, DigestType +import requests +import zarr + +from dandi import get_logger +from dandi.consts import ( + MAX_ZARR_DEPTH, + ZARR_MIME_TYPE, + ZARR_UPLOAD_BATCH_SIZE, + EmbargoStatus, +) +from dandi.dandiapi import ( + RemoteAsset, + RemoteDandiset, + RemoteZarrAsset, + RemoteZarrEntry, + RESTFullAPIClient, +) +from dandi.metadata import get_default_metadata +from dandi.misctypes import BasePath, Digest +from dandi.support.digests import get_digest, get_zarr_checksum, md5file_nocache +from dandi.utils import chunked, pluralize + +from .bases import LocalDirectoryAsset + +lgr = get_logger() + + +@dataclass +class LocalZarrEntry(BasePath): + """A file or directory within a `ZarrAsset`""" + + #: The path to the actual file or directory on disk + filepath: Path + #: The path to the root of the Zarr file tree + zarr_basepath: Path + + def _get_subpath(self, name: str) -> LocalZarrEntry: + if not name or "/" in name: + raise ValueError(f"Invalid path component: {name!r}") + elif name == ".": + return self + elif name == "..": + return self.parent + else: + return replace( + self, filepath=self.filepath / name, parts=self.parts + (name,) + ) + + @property + def parent(self) -> LocalZarrEntry: + if self.is_root(): + return self + else: + return replace(self, filepath=self.filepath.parent, parts=self.parts[:-1]) + + def exists(self) -> bool: + return self.filepath.exists() + + def is_file(self) -> bool: + return self.filepath.is_file() + + def is_dir(self) -> bool: + return self.filepath.is_dir() + + def iterdir(self) -> Iterator[LocalZarrEntry]: + for p in self.filepath.iterdir(): + if p.is_dir() and not any(p.iterdir()): + # Ignore empty directories + continue + yield self._get_subpath(p.name) + + def get_digest(self) -> Digest: + """ + Calculate the DANDI etag digest for the entry. If the entry is a + directory, the algorithm will be the Dandi Zarr checksum algorithm; if + it is a file, it will be MD5. + """ + if self.is_dir(): + return Digest.dandi_zarr(get_zarr_checksum(self.filepath)) + else: + return Digest( + algorithm=DigestType.md5, value=get_digest(self.filepath, "md5") + ) + + @property + def size(self) -> int: + """ + The size of the entry. For a directory, this is the total size of all + entries within it. + """ + if self.is_dir(): + return sum(p.size for p in self.iterdir()) + else: + return os.path.getsize(self.filepath) + + @property + def modified(self) -> datetime: + """The time at which the entry was last modified""" + # TODO: Should this be overridden for directories? + return datetime.fromtimestamp(self.filepath.stat().st_mtime).astimezone() + + +@dataclass +class ZarrStat: + """Details about a Zarr asset""" + + #: The total size of the asset + size: int + #: The Dandi Zarr checksum of the asset + digest: Digest + #: A list of all files in the asset in unspecified order + files: list[LocalZarrEntry] + + +class ZarrAsset(LocalDirectoryAsset[LocalZarrEntry]): + """Representation of a local Zarr directory""" + + EXTENSIONS: ClassVar[list[str]] = [".ngff", ".zarr"] + + @property + def filetree(self) -> LocalZarrEntry: + """ + The `LocalZarrEntry` for the root of the hierarchy of files within the + Zarr asset + """ + return LocalZarrEntry( + filepath=self.filepath, zarr_basepath=self.filepath, parts=() + ) + + def stat(self) -> ZarrStat: + """Return various details about the Zarr asset""" + + def dirstat(dirpath: LocalZarrEntry) -> ZarrStat: + size = 0 + dir_md5s = {} + file_md5s = {} + files = [] + for p in dirpath.iterdir(): + if p.is_dir(): + st = dirstat(p) + size += st.size + dir_md5s[str(p)] = (st.digest.value, st.size) + files.extend(st.files) + else: + size += p.size + file_md5s[str(p)] = (md5file_nocache(p.filepath), p.size) + files.append(p) + return ZarrStat( + size=size, + digest=Digest.dandi_zarr(get_checksum(file_md5s, dir_md5s)), + files=files, + ) + + return dirstat(self.filetree) + + def get_digest(self) -> Digest: + """Calculate a dandi-zarr-checksum digest for the asset""" + return Digest.dandi_zarr(get_zarr_checksum(self.filepath)) + + def get_metadata( + self, + digest: Optional[Digest] = None, + ignore_errors: bool = True, + ) -> BareAsset: + metadata = get_default_metadata(self.filepath, digest=digest) + metadata.encodingFormat = ZARR_MIME_TYPE + metadata.path = self.path + return metadata + + def get_validation_errors( + self, + schema_version: Optional[str] = None, + devel_debug: bool = False, + ) -> list[str]: + try: + data = zarr.open(self.filepath) + except Exception as e: + if devel_debug: + raise + lgr.warning( + "Error opening %s: %s: %s", + self.filepath, + type(e).__name__, + e, + extra={"validating": True}, + ) + return [str(e)] + if isinstance(data, zarr.Group) and not data: + msg = "Zarr group is empty" + if devel_debug: + raise ValueError(msg) + lgr.warning("%s: %s", self.filepath, msg, extra={"validating": True}) + return [msg] + try: + next(self.filepath.glob(f"*{os.sep}" + os.sep.join(["*"] * MAX_ZARR_DEPTH))) + except StopIteration: + pass + else: + msg = f"Zarr directory tree more than {MAX_ZARR_DEPTH} directories deep" + if devel_debug: + raise ValueError(msg) + lgr.warning("%s: %s", self.filepath, msg, extra={"validating": True}) + return [msg] + # TODO: Should this be appended to the above errors? + return super().get_validation_errors( + schema_version=schema_version, devel_debug=devel_debug + ) + + def iter_upload( + self, + dandiset: RemoteDandiset, + metadata: dict[str, Any], + jobs: Optional[int] = None, + replacing: Optional[RemoteAsset] = None, + ) -> Iterator[dict]: + """ + Upload the Zarr directory as an asset with the given metadata to the + given Dandiset, returning a generator of status `dict`\\s. + + :param RemoteDandiset dandiset: + the Dandiset to which the Zarr will be uploaded + :param dict metadata: + Metadata for the uploaded asset. The "path" field will be set to + the value of the instance's ``path`` attribute if no such field is + already present. + :param int jobs: Number of threads to use for uploading; defaults to 5 + :param RemoteAsset replacing: + If set, replace the given asset, which must have the same path as + the new asset; if the old asset is a Zarr, the Zarr will be updated + & reused for the new asset + :returns: + A generator of `dict`\\s containing at least a ``"status"`` key. + Upon successful upload, the last `dict` will have a status of + ``"done"`` and an ``"asset"`` key containing the resulting + `RemoteAsset`. + """ + # So that older clients don't get away with doing the wrong thing once + # Zarr upload to embargoed Dandisets is implemented in the API: + if dandiset.embargo_status is EmbargoStatus.EMBARGOED: + raise NotImplementedError( + "Uploading Zarr assets to embargoed Dandisets is currently not implemented" + ) + asset_path = metadata.setdefault("path", self.path) + client = dandiset.client + lgr.debug("%s: Producing asset", asset_path) + yield {"status": "producing asset"} + old_zarr_entries: dict[str, RemoteZarrEntry] = {} + + def mkzarr() -> str: + nonlocal old_zarr_entries + try: + r = client.post( + "/zarr/", + json={"name": asset_path, "dandiset": dandiset.identifier}, + ) + except requests.HTTPError as e: + if "Zarr already exists" in e.response.text: + lgr.warning( + "%s: Found pre-existing Zarr at same path not" + " associated with any asset; reusing", + asset_path, + ) + (old_zarr,) = client.paginate( + "/zarr/", + params={ + "dandiset": dandiset.identifier, + "name": asset_path, + }, + ) + zarr_id = old_zarr["zarr_id"] + filetree = RemoteZarrEntry( + client=client, + zarr_id=zarr_id, + parts=(), + _known_dir=True, + ) + old_zarr_entries = { + str(e): e for e in filetree.iterfiles(include_dirs=True) + } + else: + raise + else: + zarr_id = r["zarr_id"] + assert isinstance(zarr_id, str) + return zarr_id + + if replacing is not None: + lgr.debug("%s: Replacing pre-existing asset", asset_path) + if isinstance(replacing, RemoteZarrAsset): + lgr.debug( + "%s: Pre-existing asset is a Zarr; reusing & updating", asset_path + ) + zarr_id = replacing.zarr + old_zarr_entries = { + str(e): e for e in replacing.iterfiles(include_dirs=True) + } + else: + lgr.debug( + "%s: Pre-existing asset is not a Zarr; minting new Zarr", asset_path + ) + zarr_id = mkzarr() + r = client.put( + replacing.api_path, + json={"metadata": metadata, "zarr_id": zarr_id}, + ) + else: + lgr.debug("%s: Minting new Zarr", asset_path) + zarr_id = mkzarr() + r = client.post( + f"{dandiset.version_api_path}assets/", + json={"metadata": metadata, "zarr_id": zarr_id}, + ) + a = RemoteAsset.from_data(dandiset, r) + assert isinstance(a, RemoteZarrAsset) + + total_size = 0 + to_upload = EntryUploadTracker() + if old_zarr_entries: + to_delete: list[RemoteZarrEntry] = [] + digesting: list[Future[Optional[tuple[LocalZarrEntry, str]]]] = [] + yield {"status": "comparing against remote Zarr"} + with ThreadPoolExecutor(max_workers=jobs or 5) as executor: + for local_entry in self.iterfiles(): + total_size += local_entry.size + try: + remote_entry = old_zarr_entries.pop(str(local_entry)) + except KeyError: + for pp in local_entry.parents: + pps = str(pp) + if pps in old_zarr_entries: + if old_zarr_entries[pps].is_file(): + lgr.debug( + "%s: Parent path %s of file %s" + " corresponds to a remote file;" + " deleting remote", + asset_path, + pps, + local_entry, + ) + to_delete.append(old_zarr_entries.pop(pps)) + break + lgr.debug( + "%s: Path %s not present in remote Zarr; uploading", + asset_path, + local_entry, + ) + to_upload.register(local_entry) + else: + if remote_entry.is_dir(): + lgr.debug( + "%s: Path %s of local file is a directory in" + " remote Zarr; deleting remote & re-uploading", + asset_path, + local_entry, + ) + eprefix = str(remote_entry) + "/" + sub_e = [ + (k, v) + for k, v in old_zarr_entries.items() + if k.startswith(eprefix) + ] + for k, v in sub_e: + old_zarr_entries.pop(k) + to_delete.append(v) + to_upload.register(local_entry) + else: + digesting.append( + executor.submit( + _cmp_digests, + asset_path, + local_entry, + remote_entry.get_digest().value, + ) + ) + for dgstfut in as_completed(digesting): + try: + item = dgstfut.result() + except Exception: + for d in digesting: + d.cancel() + raise + else: + if item is not None: + local_entry, local_digest = item + to_upload.register(local_entry, local_digest) + if to_delete: + a.rmfiles(to_delete, reingest=False) + else: + yield {"status": "traversing local Zarr"} + for local_entry in self.iterfiles(): + total_size += local_entry.size + to_upload.register(local_entry) + yield {"status": "initiating upload", "size": total_size} + lgr.debug("%s: Beginning upload", asset_path) + bytes_uploaded = 0 + need_ingest = False + upload_data = ( + zarr_id, + client.get_url(f"/zarr/{zarr_id}/upload"), + cast(Optional[str], client.session.headers.get("Authorization")), + ) + with RESTFullAPIClient( + "http://nil.nil", + headers={"X-Amz-ACL": "bucket-owner-full-control"}, + ) as storage, closing(to_upload.get_items()) as upload_items: + for i, upload_body in enumerate( + chunked(upload_items, ZARR_UPLOAD_BATCH_SIZE), start=1 + ): + lgr.debug( + "%s: Uploading Zarr file batch #%d (%s)", + asset_path, + i, + pluralize(len(upload_body), "file"), + ) + r = client.post(f"/zarr/{zarr_id}/upload/", json=upload_body) + ZARR_UPLOADS_IN_PROGRESS.add(upload_data) + with ThreadPoolExecutor(max_workers=jobs or 5) as executor: + futures = [ + executor.submit( + _upload_zarr_file, + storage_session=storage, + path=self.filepath / upspec["path"], + upload_url=upspec["upload_url"], + ) + for upspec in r + ] + need_ingest = True + for fut in as_completed(futures): + try: + size = fut.result() + except Exception as e: + lgr.debug( + "Error uploading zarr: %s: %s", type(e).__name__, e + ) + lgr.debug("Cancelling upload") + for f in futures: + f.cancel() + executor.shutdown() + client.delete(f"/zarr/{zarr_id}/upload/") + raise + else: + bytes_uploaded += size + yield { + "status": "uploading", + "upload": 100 * bytes_uploaded / to_upload.total_size, + "current": bytes_uploaded, + } + lgr.debug("%s: Completing upload of batch #%d", asset_path, i) + client.post(f"/zarr/{zarr_id}/upload/complete/") + ZARR_UPLOADS_IN_PROGRESS.discard(upload_data) + lgr.debug("%s: All files uploaded", asset_path) + old_zarr_files = [e for e in old_zarr_entries.values() if e.is_file()] + if old_zarr_files: + yield {"status": "deleting extra remote files"} + lgr.debug( + "%s: Deleting %s in remote Zarr not present locally", + asset_path, + pluralize(len(old_zarr_files), "file"), + ) + a.rmfiles(old_zarr_files, reingest=False) + need_ingest = True + if need_ingest: + lgr.debug("%s: Waiting for server to calculate Zarr checksum", asset_path) + yield {"status": "server calculating checksum"} + client.post(f"/zarr/{zarr_id}/ingest/") + while True: + sleep(2) + r = client.get(f"/zarr/{zarr_id}/") + if r["status"] == "Complete": + break + lgr.info("%s: Asset successfully uploaded", asset_path) + else: + lgr.info("%s: No changes made to Zarr", asset_path) + yield {"status": "done", "asset": a} + + +def _upload_zarr_file( + storage_session: RESTFullAPIClient, path: Path, upload_url: str +) -> int: + with path.open("rb") as fp: + storage_session.put( + upload_url, data=fp, json_resp=False, retry_if=_retry_zarr_file + ) + return path.stat().st_size + + +def _retry_zarr_file(r: requests.Response) -> bool: + # Some sort of filesystem hiccup can cause requests to be unable to get the + # filesize, leading to it falling back to "chunked" transfer encoding, + # which S3 doesn't support. + return ( + r.status_code == 501 + and "header you provided implies functionality that is not implemented" + in r.text + ) + + +@dataclass +class EntryUploadTracker: + """ + Class for keeping track of `LocalZarrEntry` instances to upload + + :meta private: + """ + + total_size: int = 0 + digested_entries: list[tuple[LocalZarrEntry, str]] = field(default_factory=list) + fresh_entries: list[LocalZarrEntry] = field(default_factory=list) + + def register(self, e: LocalZarrEntry, digest: Optional[str] = None) -> None: + if digest is not None: + self.digested_entries.append((e, digest)) + else: + self.fresh_entries.append(e) + self.total_size += e.size + + @staticmethod + def _mkitem(e: LocalZarrEntry) -> dict: + digest = md5file_nocache(e.filepath) + return {"path": str(e), "etag": digest} + + def get_items(self, jobs: int = 5) -> Generator[dict, None, None]: + # Note: In order for the ThreadPoolExecutor to be closed if an error + # occurs during upload, the method must be used like this: + # + # with contextlib.closing(to_upload.get_items()) as upload_items: + # for item in upload_items: + # ... + for e, digest in self.digested_entries: + yield {"path": str(e), "etag": digest} + if not self.fresh_entries: + return + with ThreadPoolExecutor(max_workers=jobs) as executor: + futures = [executor.submit(self._mkitem, e) for e in self.fresh_entries] + for fut in as_completed(futures): + try: + yield fut.result() + # Use BaseException to also catch GeneratorExit thrown by + # closing() + except BaseException: + for f in futures: + f.cancel() + raise + + +def _cmp_digests( + asset_path: str, local_entry: LocalZarrEntry, remote_digest: str +) -> Optional[tuple[LocalZarrEntry, str]]: + local_digest = md5file_nocache(local_entry.filepath) + if local_digest != remote_digest: + lgr.debug( + "%s: Path %s in Zarr differs from local file; re-uploading", + asset_path, + local_entry, + ) + return (local_entry, local_digest) + else: + lgr.debug("%s: File %s already on server; skipping", asset_path, local_entry) + return None + + +# Collection of (zarr ID, upload endpoint URL, auth header value) tuples +ZARR_UPLOADS_IN_PROGRESS: set[tuple[str, str, Optional[str]]] = set() + + +@atexit.register +def cancel_zarr_uploads() -> None: + for zarr_id, url, auth in ZARR_UPLOADS_IN_PROGRESS: + lgr.debug("Cancelling upload for Zarr %s", zarr_id) + headers = {"Authorization": auth} if auth is not None else {} + r = requests.delete(url, headers=headers) + if not r.ok: + lgr.warning( + "Upload cancellation failed with %d: %s: %s", + r.status_code, + r.reason, + r.text, + ) diff --git a/setup.cfg b/setup.cfg index 025ee28b8..052721432 100644 --- a/setup.cfg +++ b/setup.cfg @@ -109,7 +109,6 @@ ignore = E203,W503 exclude = *sphinx* dandi/externals/* - */__init__.py .tox/ _version.py versioneer.py From a3c4a2773eec98f4474317bb30790beb13c6502c Mon Sep 17 00:00:00 2001 From: "John T. Wodder II" Date: Tue, 19 Jul 2022 10:53:02 -0400 Subject: [PATCH 02/17] Add classes for BIDS assets --- dandi/consts.py | 4 + dandi/delete.py | 5 +- dandi/files/__init__.py | 161 ++++++++++++++++++++++++++++++------- dandi/files/bases.py | 13 +-- dandi/files/bids.py | 47 +++++++++++ dandi/files/zarr.py | 4 +- dandi/tests/test_files.py | 162 +++++++++++++++++++++++++++++++++----- 7 files changed, 332 insertions(+), 64 deletions(-) create mode 100644 dandi/files/bids.py diff --git a/dandi/consts.py b/dandi/consts.py index 0239bb91b..8689db660 100644 --- a/dandi/consts.py +++ b/dandi/consts.py @@ -155,6 +155,8 @@ class DandiInstance(NamedTuple): VIDEO_FILE_EXTENSIONS = [".mp4", ".avi", ".wmv", ".mov", ".flv", ".mkv"] VIDEO_FILE_MODULES = ["processing", "acquisition"] +ZARR_EXTENSIONS = [".ngff", ".zarr"] + #: Maximum allowed depth of a Zarr directory tree MAX_ZARR_DEPTH = 7 @@ -166,3 +168,5 @@ class DandiInstance(NamedTuple): #: Maximum number of Zarr directory entries to delete at once ZARR_DELETE_BATCH_SIZE = 100 + +BIDS_DATASET_DESCRIPTION = "dataset_description.json" diff --git a/dandi/delete.py b/dandi/delete.py index 3a6833908..518ec7e1b 100644 --- a/dandi/delete.py +++ b/dandi/delete.py @@ -5,11 +5,10 @@ import click -from .consts import DRAFT, dandiset_metadata_file +from .consts import DRAFT, ZARR_EXTENSIONS, dandiset_metadata_file from .dandiapi import DandiAPIClient, RemoteAsset, RemoteDandiset from .dandiarchive import BaseAssetIDURL, DandisetURL, ParsedDandiURL, parse_dandi_url from .exceptions import NotFoundError -from .files import ZarrAsset from .utils import get_instance, is_url @@ -231,7 +230,7 @@ def find_local_asset(filepath: str) -> Tuple[str, str]: "Use 'dandi download' or 'organize' first" ) relpath = path.relative_to(dandiset.path).as_posix() - if path.is_dir() and path.suffix not in ZarrAsset.EXTENSIONS: + if path.is_dir() and path.suffix not in ZARR_EXTENSIONS: relpath += "/" return (dandiset.identifier, relpath) diff --git a/dandi/files/__init__.py b/dandi/files/__init__.py index 06bd7cd0a..61c6fd678 100644 --- a/dandi/files/__init__.py +++ b/dandi/files/__init__.py @@ -13,11 +13,19 @@ from collections import deque from collections.abc import Iterator +from dataclasses import dataclass +from enum import Enum from pathlib import Path from typing import Optional +import weakref from dandi import get_logger -from dandi.consts import dandiset_metadata_file +from dandi.consts import ( + BIDS_DATASET_DESCRIPTION, + VIDEO_FILE_EXTENSIONS, + ZARR_EXTENSIONS, + dandiset_metadata_file, +) from dandi.exceptions import UnknownAssetError from .bases import ( @@ -30,22 +38,34 @@ NWBAsset, VideoAsset, ) +from .bids import ( + BIDSAsset, + BIDSDatasetDescriptionAsset, + GenericBIDSAsset, + NWBBIDSAsset, + ZarrBIDSAsset, +) from .zarr import LocalZarrEntry, ZarrAsset, ZarrStat __all__ = [ + "BIDSAsset", + "BIDSDatasetDescriptionAsset", "DandiFile", "DandisetMetadataFile", + "GenericAsset", + "GenericBIDSAsset", "LocalAsset", + "LocalDirectoryAsset", "LocalFileAsset", + "LocalZarrEntry", "NWBAsset", + "NWBBIDSAsset", "VideoAsset", - "GenericAsset", - "LocalDirectoryAsset", - "LocalZarrEntry", - "ZarrStat", "ZarrAsset", - "find_dandi_files", + "ZarrBIDSAsset", + "ZarrStat", "dandi_file", + "find_dandi_files", ] lgr = get_logger() @@ -79,7 +99,10 @@ def find_dandi_files( (unless ``allow_all`` is true). """ - path_queue: deque[Path] = deque() + # A pair of each file or directory being considered plus the most recent + # BIDS dataset_description.json file at the path (if a directory) or in a + # parent path + path_queue: deque[tuple[Path, Optional[BIDSDatasetDescriptionAsset]]] = deque() for p in map(Path, paths): if dandiset_path is not None: try: @@ -88,26 +111,36 @@ def find_dandi_files( raise ValueError( "Path {str(p)!r} is not inside Dandiset path {str(dandiset_path)!r}" ) - path_queue.append(p) + path_queue.append((p, None)) while path_queue: - p = path_queue.popleft() + p, bidsdd = path_queue.popleft() if p.name.startswith("."): continue if p.is_dir(): if p.is_symlink(): lgr.warning("%s: Ignoring unsupported symbolic link to directory", p) elif dandiset_path is not None and p == Path(dandiset_path): - path_queue.extend(p.iterdir()) + if (p / BIDS_DATASET_DESCRIPTION).exists(): + bids2 = dandi_file(p / BIDS_DATASET_DESCRIPTION, dandiset_path) + assert isinstance(bids2, BIDSDatasetDescriptionAsset) + bidsdd = bids2 + path_queue.extend((q, bidsdd) for q in p.iterdir()) elif any(p.iterdir()): try: - df = dandi_file(p, dandiset_path) + df = dandi_file(p, dandiset_path, bids_dataset_description=bidsdd) except UnknownAssetError: - path_queue.extend(p.iterdir()) + if (p / BIDS_DATASET_DESCRIPTION).exists(): + bids2 = dandi_file(p / BIDS_DATASET_DESCRIPTION, dandiset_path) + assert isinstance(bids2, BIDSDatasetDescriptionAsset) + bidsdd = bids2 + path_queue.extend((q, bidsdd) for q in p.iterdir()) else: yield df else: - df = dandi_file(p, dandiset_path) - if isinstance(df, GenericAsset) and not allow_all: + df = dandi_file(p, dandiset_path, bids_dataset_description=bidsdd) + # Don't use isinstance() here, as GenericBIDSAsset's should still + # be returned + if type(df) is GenericAsset and not allow_all: pass elif isinstance(df, DandisetMetadataFile) and not ( allow_all or include_metadata @@ -118,7 +151,9 @@ def find_dandi_files( def dandi_file( - filepath: str | Path, dandiset_path: Optional[str | Path] = None + filepath: str | Path, + dandiset_path: Optional[str | Path] = None, + bids_dataset_description: Optional[BIDSDatasetDescriptionAsset] = None, ) -> DandiFile: """ Return a `DandiFile` instance of the appropriate type for the file at @@ -126,6 +161,10 @@ def dandi_file( ``dandiset_path`` is not set, it will default to ``filepath``'s parent directory. + If ``bids_dataset_description`` is set, the file will be assumed to lie + within the BIDS dataset with the given :file:`dataset_description.json` + file at its root, resulting in a `BIDSAsset`. + If ``filepath`` is a directory, it must be of a type represented by a `LocalDirectoryAsset` subclass; otherwise, an `UnknownAssetError` exception will be raised. @@ -143,19 +182,83 @@ def dandi_file( raise ValueError("Dandi file path cannot equal Dandiset path") else: path = filepath.name - if filepath.is_dir(): - if not any(filepath.iterdir()): - raise UnknownAssetError("Empty directories cannot be assets") - for dirclass in LocalDirectoryAsset.__subclasses__(): - if filepath.suffix in dirclass.EXTENSIONS: - return dirclass(filepath=filepath, path=path) # type: ignore[abstract] - raise UnknownAssetError( - f"Directory has unrecognized suffix {filepath.suffix!r}" - ) - elif path == dandiset_metadata_file: + if filepath.is_file() and path == dandiset_metadata_file: return DandisetMetadataFile(filepath=filepath) + if bids_dataset_description is None: + factory = DandiFileFactory() else: - for fileclass in LocalFileAsset.__subclasses__(): - if filepath.suffix in fileclass.EXTENSIONS: - return fileclass(filepath=filepath, path=path) # type: ignore[abstract] - return GenericAsset(filepath=filepath, path=path) + factory = BIDSFileFactory(bids_dataset_description) + return factory(filepath, path) + + +class DandiFileType(Enum): + """:meta private:""" + + NWB = 1 + ZARR = 2 + VIDEO = 3 + GENERIC = 4 + BIDS_DATASET_DESCRIPTION = 5 + + @staticmethod + def classify(path: Path) -> DandiFileType: + if path.is_dir(): + if not any(path.iterdir()): + raise UnknownAssetError("Empty directories cannot be assets") + if path.suffix in ZARR_EXTENSIONS: + return DandiFileType.ZARR + raise UnknownAssetError( + f"Directory has unrecognized suffix {path.suffix!r}" + ) + elif path.name == BIDS_DATASET_DESCRIPTION: + return DandiFileType.BIDS_DATASET_DESCRIPTION + elif path.suffix == ".nwb": + return DandiFileType.NWB + elif path.suffix in VIDEO_FILE_EXTENSIONS: + return DandiFileType.VIDEO + else: + return DandiFileType.GENERIC + + +class DandiFileFactory: + """:meta private:""" + + CLASSES: dict[DandiFileType, type[LocalAsset]] = { + DandiFileType.NWB: NWBAsset, + DandiFileType.ZARR: ZarrAsset, + DandiFileType.VIDEO: VideoAsset, + DandiFileType.GENERIC: GenericAsset, + DandiFileType.BIDS_DATASET_DESCRIPTION: BIDSDatasetDescriptionAsset, + } + + def __call__(self, filepath: Path, path: str) -> DandiFile: + return self.CLASSES[DandiFileType.classify(filepath)]( + filepath=filepath, path=path + ) + + +@dataclass +class BIDSFileFactory(DandiFileFactory): + bids_dataset_description: BIDSDatasetDescriptionAsset + + CLASSES = { + DandiFileType.NWB: NWBBIDSAsset, + DandiFileType.ZARR: ZarrBIDSAsset, + DandiFileType.VIDEO: GenericBIDSAsset, + DandiFileType.GENERIC: GenericBIDSAsset, + } + + def __call__(self, filepath: Path, path: str) -> DandiFile: + ftype = DandiFileType.classify(filepath) + if ftype is DandiFileType.BIDS_DATASET_DESCRIPTION: + if filepath == self.bids_dataset_description.filepath: + return self.bids_dataset_description + else: + return BIDSDatasetDescriptionAsset(filepath=filepath, path=path) + df = self.CLASSES[ftype]( + filepath=filepath, + path=path, + bids_dataset_description_ref=weakref.ref(self.bids_dataset_description), + ) + self.bids_dataset_description.dataset_files.append(df) + return df diff --git a/dandi/files/bases.py b/dandi/files/bases.py index 98be9d402..f77c6a3f7 100644 --- a/dandi/files/bases.py +++ b/dandi/files/bases.py @@ -10,7 +10,7 @@ from pathlib import Path import re from threading import Lock -from typing import Any, BinaryIO, ClassVar, Generic, Optional +from typing import Any, BinaryIO, Generic, Optional from xml.etree.ElementTree import fromstring from dandischema.digests.dandietag import DandiETag @@ -22,7 +22,6 @@ import requests from dandi import get_logger -from dandi.consts import VIDEO_FILE_EXTENSIONS from dandi.dandiapi import RemoteAsset, RemoteDandiset, RESTFullAPIClient from dandi.metadata import get_default_metadata, nwb2asset from dandi.misctypes import DUMMY_DIGEST, Digest, P @@ -269,8 +268,6 @@ class LocalFileAsset(LocalAsset): an asset of a Dandiset """ - EXTENSIONS: ClassVar[list[str]] = [] - def get_metadata( self, digest: Optional[Digest] = None, @@ -435,8 +432,6 @@ def iter_upload( class NWBAsset(LocalFileAsset): """Representation of a local NWB file""" - EXTENSIONS: ClassVar[list[str]] = [".nwb"] - def get_metadata( self, digest: Optional[Digest] = None, @@ -499,7 +494,7 @@ def get_validation_errors( class VideoAsset(LocalFileAsset): - EXTENSIONS: ClassVar[list[str]] = VIDEO_FILE_EXTENSIONS + pass class GenericAsset(LocalFileAsset): @@ -507,7 +502,7 @@ class GenericAsset(LocalFileAsset): Representation of a generic regular file, one that is not of any known type """ - EXTENSIONS: ClassVar[list[str]] = [] + pass class LocalDirectoryAsset(LocalAsset, Generic[P]): @@ -517,8 +512,6 @@ class LocalDirectoryAsset(LocalAsset, Generic[P]): `dandi.misctypes.BasePath`. """ - EXTENSIONS: ClassVar[list[str]] = [] - @property @abstractmethod def filetree(self) -> P: diff --git a/dandi/files/bids.py b/dandi/files/bids.py new file mode 100644 index 000000000..85b688b6c --- /dev/null +++ b/dandi/files/bids.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from pathlib import Path +import weakref + +from .bases import GenericAsset, LocalFileAsset, NWBAsset +from .zarr import ZarrAsset + + +@dataclass +class BIDSDatasetDescriptionAsset(LocalFileAsset): + dataset_files: list[BIDSAsset] = field(default_factory=list) + + +@dataclass +class BIDSAsset(LocalFileAsset): + bids_dataset_description_ref: weakref.ref[BIDSDatasetDescriptionAsset] + + @property + def bids_dataset_description(self) -> BIDSDatasetDescriptionAsset: + bdd = self.bids_dataset_description_ref() + assert bdd is not None + return bdd + + @property + def bids_root(self) -> Path: + return self.bids_dataset_description.filepath.parent + + @property + def bids_path(self) -> str: + """ + ``/``-separated path to the asset from the root of the BIDS dataset + """ + return self.filepath.relative_to(self.bids_root).as_posix() + + +class NWBBIDSAsset(BIDSAsset, NWBAsset): + pass + + +class ZarrBIDSAsset(BIDSAsset, ZarrAsset): + pass + + +class GenericBIDSAsset(BIDSAsset, GenericAsset): + pass diff --git a/dandi/files/zarr.py b/dandi/files/zarr.py index 2a769b64d..d54a9aa10 100644 --- a/dandi/files/zarr.py +++ b/dandi/files/zarr.py @@ -9,7 +9,7 @@ import os from pathlib import Path from time import sleep -from typing import Any, ClassVar, Optional, cast +from typing import Any, Optional, cast from dandischema.digests.zarr import get_checksum from dandischema.models import BareAsset, DigestType @@ -130,8 +130,6 @@ class ZarrStat: class ZarrAsset(LocalDirectoryAsset[LocalZarrEntry]): """Representation of a local Zarr directory""" - EXTENSIONS: ClassVar[list[str]] = [".ngff", ".zarr"] - @property def filetree(self) -> LocalZarrEntry: """ diff --git a/dandi/tests/test_files.py b/dandi/tests/test_files.py index ef798c141..ddfa375c3 100644 --- a/dandi/tests/test_files.py +++ b/dandi/tests/test_files.py @@ -1,5 +1,7 @@ from operator import attrgetter from pathlib import Path +from typing import cast +from unittest.mock import ANY from dandischema.models import get_schema_version import numpy as np @@ -9,11 +11,15 @@ from ..consts import ZARR_MIME_TYPE, dandiset_metadata_file from ..dandiapi import AssetType, RemoteZarrAsset from ..files import ( + BIDSDatasetDescriptionAsset, DandisetMetadataFile, GenericAsset, + GenericBIDSAsset, NWBAsset, + NWBBIDSAsset, VideoAsset, ZarrAsset, + ZarrBIDSAsset, dandi_file, find_dandi_files, ) @@ -21,26 +27,35 @@ lgr = get_logger() +def mkpaths(root: Path, *paths: str) -> None: + for p in paths: + pp = root / p + pp.parent.mkdir(parents=True, exist_ok=True) + if p.endswith("/"): + pp.mkdir() + else: + pp.touch() + + def test_find_dandi_files(tmp_path: Path) -> None: - (tmp_path / dandiset_metadata_file).touch() - (tmp_path / "sample01.zarr").mkdir() - (tmp_path / "sample01.zarr" / "inner.nwb").touch() - (tmp_path / "sample01.zarr" / "foo").touch() - (tmp_path / "sample02.nwb").touch() - (tmp_path / "foo").touch() - (tmp_path / "bar.txt").touch() - (tmp_path / "subdir").mkdir() - (tmp_path / "subdir" / "sample03.nwb").touch() - (tmp_path / "subdir" / "sample04.zarr").mkdir() - (tmp_path / "subdir" / "sample04.zarr" / "inner2.nwb").touch() - (tmp_path / "subdir" / "sample04.zarr" / "baz").touch() - (tmp_path / "subdir" / "gnusto").touch() - (tmp_path / "subdir" / "cleesh.txt").touch() - (tmp_path / "empty.zarr").mkdir() - (tmp_path / "glarch.mp4").touch() - (tmp_path / ".ignored").touch() - (tmp_path / ".ignored.dir").mkdir() - (tmp_path / ".ignored.dir" / "ignored.nwb").touch() + mkpaths( + tmp_path, + dandiset_metadata_file, + "sample01.zarr/inner.nwb", + "sample01.zarr/foo", + "sample02.nwb", + "foo", + "bar.txt", + "subdir/sample03.nwb", + "subdir/sample04.zarr/inner2.nwb", + "subdir/sample04.zarr/baz", + "subdir/gnusto", + "subdir/cleesh.txt", + "empty.zarr/", + "glarch.mp4", + ".ignored", + ".ignored.dir/ignored.nwb", + ) files = sorted( find_dandi_files(tmp_path, dandiset_path=tmp_path), key=attrgetter("filepath") @@ -98,6 +113,115 @@ def test_find_dandi_files(tmp_path: Path) -> None: ] +def test_find_dandi_files_with_bids(tmp_path: Path) -> None: + mkpaths( + tmp_path, + dandiset_metadata_file, + "foo.txt", + "bar.nwb", + "bids1/dataset_description.json", + "bids1/file.txt", + "bids1/subdir/quux.nwb", + "bids1/subdir/glarch.zarr/dataset_description.json", + "bids2/dataset_description.json", + "bids2/movie.mp4", + "bids2/subbids/dataset_description.json", + "bids2/subbids/data.json", + ) + + files = sorted( + find_dandi_files(tmp_path, dandiset_path=tmp_path, allow_all=False), + key=attrgetter("filepath"), + ) + + assert files == [ + NWBAsset(filepath=tmp_path / "bar.nwb", path="bar.nwb"), + BIDSDatasetDescriptionAsset( + filepath=tmp_path / "bids1" / "dataset_description.json", + path="bids1/dataset_description.json", + dataset_files=ANY, + ), + GenericBIDSAsset( + filepath=tmp_path / "bids1" / "file.txt", + path="bids1/file.txt", + bids_dataset_description_ref=ANY, + ), + ZarrBIDSAsset( + filepath=tmp_path / "bids1" / "subdir" / "glarch.zarr", + path="bids1/subdir/glarch.zarr", + bids_dataset_description_ref=ANY, + ), + NWBBIDSAsset( + filepath=tmp_path / "bids1" / "subdir" / "quux.nwb", + path="bids1/subdir/quux.nwb", + bids_dataset_description_ref=ANY, + ), + BIDSDatasetDescriptionAsset( + filepath=tmp_path / "bids2" / "dataset_description.json", + path="bids2/dataset_description.json", + dataset_files=ANY, + ), + GenericBIDSAsset( + filepath=tmp_path / "bids2" / "movie.mp4", + path="bids2/movie.mp4", + bids_dataset_description_ref=ANY, + ), + GenericBIDSAsset( + filepath=tmp_path / "bids2" / "subbids" / "data.json", + path="bids2/subbids/data.json", + bids_dataset_description_ref=ANY, + ), + BIDSDatasetDescriptionAsset( + filepath=tmp_path / "bids2" / "subbids" / "dataset_description.json", + path="bids2/subbids/dataset_description.json", + dataset_files=ANY, + ), + ] + + bidsdd = cast(BIDSDatasetDescriptionAsset, files[1]) + assert sorted(bidsdd.dataset_files, key=attrgetter("filepath")) == [ + GenericBIDSAsset( + filepath=tmp_path / "bids1" / "file.txt", + path="bids1/file.txt", + bids_dataset_description_ref=ANY, + ), + ZarrBIDSAsset( + filepath=tmp_path / "bids1" / "subdir" / "glarch.zarr", + path="bids1/subdir/glarch.zarr", + bids_dataset_description_ref=ANY, + ), + NWBBIDSAsset( + filepath=tmp_path / "bids1" / "subdir" / "quux.nwb", + path="bids1/subdir/quux.nwb", + bids_dataset_description_ref=ANY, + ), + ] + for asset in bidsdd.dataset_files: + assert asset.bids_dataset_description is bidsdd + + bidsdd = cast(BIDSDatasetDescriptionAsset, files[5]) + assert bidsdd.dataset_files == [ + GenericBIDSAsset( + filepath=tmp_path / "bids2" / "movie.mp4", + path="bids2/movie.mp4", + bids_dataset_description_ref=ANY, + ), + ] + for asset in bidsdd.dataset_files: + assert asset.bids_dataset_description is bidsdd + + bidsdd = cast(BIDSDatasetDescriptionAsset, files[8]) + assert bidsdd.dataset_files == [ + GenericBIDSAsset( + filepath=tmp_path / "bids2" / "subbids" / "data.json", + path="bids2/subbids/data.json", + bids_dataset_description_ref=ANY, + ), + ] + for asset in bidsdd.dataset_files: + assert asset.bids_dataset_description is bidsdd + + def test_validate_simple1(simple1_nwb): # this file should be ok as long as schema_version is specified errors = dandi_file(simple1_nwb).get_validation_errors( From 6777183b6a007bced9e33a5bd8b196a067deb1a9 Mon Sep 17 00:00:00 2001 From: "John T. Wodder II" Date: Tue, 19 Jul 2022 21:59:01 -0400 Subject: [PATCH 03/17] Give BIDSDatasetDescriptionAsset a bids_root property --- dandi/files/bids.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dandi/files/bids.py b/dandi/files/bids.py index 85b688b6c..155e04638 100644 --- a/dandi/files/bids.py +++ b/dandi/files/bids.py @@ -12,6 +12,10 @@ class BIDSDatasetDescriptionAsset(LocalFileAsset): dataset_files: list[BIDSAsset] = field(default_factory=list) + @property + def bids_root(self) -> Path: + return self.filepath.parent + @dataclass class BIDSAsset(LocalFileAsset): From c93192309b606173e24d70aad8804a3507a02646 Mon Sep 17 00:00:00 2001 From: "John T. Wodder II" Date: Thu, 21 Jul 2022 10:04:39 -0400 Subject: [PATCH 04/17] Add docstrings to BIDS classes --- dandi/files/bids.py | 36 +++++++++++++++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/dandi/files/bids.py b/dandi/files/bids.py index 155e04638..5dcc2775d 100644 --- a/dandi/files/bids.py +++ b/dandi/files/bids.py @@ -10,26 +10,50 @@ @dataclass class BIDSDatasetDescriptionAsset(LocalFileAsset): + """ + The :file:`dataset_description.json` file for a BIDS dataset, used to + perform operations on the dataset as a whole + """ + + #: A list of all other assets in the dataset dataset_files: list[BIDSAsset] = field(default_factory=list) @property def bids_root(self) -> Path: + """ + The directory on the filesystem in which the BIDS dataset is located + """ return self.filepath.parent @dataclass class BIDSAsset(LocalFileAsset): + """ + Base class for non-:file:`dataset_description.json` assets in BIDS datasets + """ + + #: A weak reference to the :file:`dataset_description.json` file for the + #: containing dataset. + #: + #: Users are advised to use `bids_dataset_description` to access the + #: :file:`dataset_description.json` file instead. bids_dataset_description_ref: weakref.ref[BIDSDatasetDescriptionAsset] @property def bids_dataset_description(self) -> BIDSDatasetDescriptionAsset: + """ + The :file:`dataset_description.json` file for the containing dataset + """ bdd = self.bids_dataset_description_ref() assert bdd is not None return bdd @property def bids_root(self) -> Path: - return self.bids_dataset_description.filepath.parent + """ + The directory on the filesystem in which the BIDS dataset is located + """ + return self.bids_dataset_description.bids_root @property def bids_path(self) -> str: @@ -40,12 +64,22 @@ def bids_path(self) -> str: class NWBBIDSAsset(BIDSAsset, NWBAsset): + """An NWB file in a BIDS dataset""" + pass class ZarrBIDSAsset(BIDSAsset, ZarrAsset): + """A Zarr directory in a BIDS dataset""" + pass class GenericBIDSAsset(BIDSAsset, GenericAsset): + """ + An asset in a BIDS dataset that is not an NWB file, a Zarr directory, or a + :file:`dataset_description.json` file. Note that, unlike the non-BIDS + classes, this includes video files. + """ + pass From bce33d108a295bdeaa331191b5eaf2125c89f626 Mon Sep 17 00:00:00 2001 From: "John T. Wodder II" Date: Wed, 27 Jul 2022 15:12:21 -0400 Subject: [PATCH 05/17] BIDS validation --- dandi/files/bids.py | 95 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 92 insertions(+), 3 deletions(-) diff --git a/dandi/files/bids.py b/dandi/files/bids.py index 5dcc2775d..7ee2f28c8 100644 --- a/dandi/files/bids.py +++ b/dandi/files/bids.py @@ -1,9 +1,14 @@ from __future__ import annotations +from collections import defaultdict from dataclasses import dataclass, field from pathlib import Path +from threading import Lock +from typing import Optional import weakref +from dandi.validate import validate_bids + from .bases import GenericAsset, LocalFileAsset, NWBAsset from .zarr import ZarrAsset @@ -18,6 +23,18 @@ class BIDSDatasetDescriptionAsset(LocalFileAsset): #: A list of all other assets in the dataset dataset_files: list[BIDSAsset] = field(default_factory=list) + #: A list of validation error messages pertaining to the dataset as a + #: whole, populated by `_validate()` + _dataset_errors: Optional[list[str]] = None + + #: A list of validation error messages for individual assets in the + #: dataset, keyed by `bids_path` properties; populated by `_validate()` + _asset_errors: Optional[dict[str, list[str]]] = None + + #: Threading lock needed in case multiple assets are validated in parallel + #: during upload + _lock: Lock = field(init=False, default_factory=Lock, repr=False, compare=False) + @property def bids_root(self) -> Path: """ @@ -25,6 +42,50 @@ def bids_root(self) -> Path: """ return self.filepath.parent + def _validate(self) -> None: + with self._lock: + if self._dataset_errors is None: + bids_paths = [str(self.filepath)] + [ + str(asset.filepath) for asset in self.dataset_files + ] + results = validate_bids(*bids_paths) + self._dataset_errors: list[str] = [] + if len(results["path_listing"]) == len(results["path_tracking"]): + self._dataset_errors.append("No valid BIDS files were found") + for entry in results["schema_tracking"]: + if entry["mandatory"]: + self._dataset_errors.append( + f"The `{entry['regex']}` regex pattern file" + " required by BIDS was not found." + ) + self._asset_errors = defaultdict(list) + for path in results["path_tracking"]: + bids_path = Path(path).relative_to(self.bids_root).as_posix() + self._dataset_errors.append( + f"The `{bids_path}` file was not matched by any regex schema entry." + ) + self._asset_errors[bids_path].append( + "File not matched by any regex schema entry" + ) + + def get_asset_errors(self, asset: BIDSAsset) -> list[str]: + self._validate() + errors: list[str] = [] + if self._dataset_errors: + errors.append("BIDS dataset is invalid") + assert self._asset_errors is not None + errors.extend(self._asset_errors[asset.bids_path]) + return errors + + def get_validation_errors( + self, + schema_version: Optional[str] = None, + devel_debug: bool = False, + ) -> list[str]: + self._validate() + assert self._dataset_errors is not None + return list(self._dataset_errors) + @dataclass class BIDSAsset(LocalFileAsset): @@ -62,17 +123,38 @@ def bids_path(self) -> str: """ return self.filepath.relative_to(self.bids_root).as_posix() + def get_validation_errors( + self, + schema_version: Optional[str] = None, + devel_debug: bool = False, + ) -> list[str]: + return self.bids_dataset_description.get_asset_errors(self) + class NWBBIDSAsset(BIDSAsset, NWBAsset): """An NWB file in a BIDS dataset""" - pass + def get_validation_errors( + self, + schema_version: Optional[str] = None, + devel_debug: bool = False, + ) -> list[str]: + return NWBAsset.get_validation_errors( + self, schema_version, devel_debug + ) + BIDSAsset.get_validation_errors(self) class ZarrBIDSAsset(BIDSAsset, ZarrAsset): """A Zarr directory in a BIDS dataset""" - pass + def get_validation_errors( + self, + schema_version: Optional[str] = None, + devel_debug: bool = False, + ) -> list[str]: + return ZarrBIDSAsset.get_validation_errors( + self, schema_version, devel_debug + ) + BIDSAsset.get_validation_errors(self) class GenericBIDSAsset(BIDSAsset, GenericAsset): @@ -82,4 +164,11 @@ class GenericBIDSAsset(BIDSAsset, GenericAsset): classes, this includes video files. """ - pass + def get_validation_errors( + self, + schema_version: Optional[str] = None, + devel_debug: bool = False, + ) -> list[str]: + return GenericAsset.get_validation_errors( + self, schema_version, devel_debug + ) + BIDSAsset.get_validation_errors(self) From 5d30d496e159bb448a8f16f4cbf1ea683b606ba6 Mon Sep 17 00:00:00 2001 From: "John T. Wodder II" Date: Wed, 27 Jul 2022 15:21:05 -0400 Subject: [PATCH 06/17] Fix circular import --- dandi/files/bids.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dandi/files/bids.py b/dandi/files/bids.py index 7ee2f28c8..13f45a00c 100644 --- a/dandi/files/bids.py +++ b/dandi/files/bids.py @@ -7,8 +7,6 @@ from typing import Optional import weakref -from dandi.validate import validate_bids - from .bases import GenericAsset, LocalFileAsset, NWBAsset from .zarr import ZarrAsset @@ -45,6 +43,9 @@ def bids_root(self) -> Path: def _validate(self) -> None: with self._lock: if self._dataset_errors is None: + # Import here to avoid circular import + from dandi.validate import validate_bids + bids_paths = [str(self.filepath)] + [ str(asset.filepath) for asset in self.dataset_files ] From ab4733f093bdfbac6c73ec5799f8d22d9d203796 Mon Sep 17 00:00:00 2001 From: "John T. Wodder II" Date: Wed, 3 Aug 2022 15:05:21 -0400 Subject: [PATCH 07/17] Metadata for BIDS assets --- dandi/files/bids.py | 65 ++++++++++++++++++++++++++++++++++++++++++++- dandi/metadata.py | 55 -------------------------------------- 2 files changed, 64 insertions(+), 56 deletions(-) diff --git a/dandi/files/bids.py b/dandi/files/bids.py index 13f45a00c..2044b6d2c 100644 --- a/dandi/files/bids.py +++ b/dandi/files/bids.py @@ -2,13 +2,23 @@ from collections import defaultdict from dataclasses import dataclass, field +from datetime import datetime from pathlib import Path from threading import Lock -from typing import Optional +from typing import Any, Optional import weakref +from dandischema.models import BareAsset + from .bases import GenericAsset, LocalFileAsset, NWBAsset from .zarr import ZarrAsset +from ..metadata import add_common_metadata, prepare_metadata +from ..misctypes import Digest + +BIDS_TO_DANDI = { + "subject": "subject_id", + "session": "session_id", +} @dataclass @@ -29,6 +39,11 @@ class BIDSDatasetDescriptionAsset(LocalFileAsset): #: dataset, keyed by `bids_path` properties; populated by `_validate()` _asset_errors: Optional[dict[str, list[str]]] = None + #: Asset metadata (in the form of a `dict` of BareAsset fields) for + #: individual assets in the dataset, keyed by `bids_path` properties; + #: populated by `_validate()` + _asset_metadata: Optional[dict[str, dict[str, Any]]] = None + #: Threading lock needed in case multiple assets are validated in parallel #: during upload _lock: Lock = field(init=False, default_factory=Lock, repr=False, compare=False) @@ -68,8 +83,21 @@ def _validate(self) -> None: self._asset_errors[bids_path].append( "File not matched by any regex schema entry" ) + self._asset_metadata = defaultdict(dict) + for meta in results["match_listing"]: + bids_path = ( + Path(meta.pop("path")).relative_to(self.bids_root).as_posix() + ) + meta = { + BIDS_TO_DANDI[k]: v + for k, v in meta.items() + if k in BIDS_TO_DANDI + } + # meta["bids_schema_version"] = results["bids_schema_version"] + self._asset_metadata[bids_path] = prepare_metadata(meta) def get_asset_errors(self, asset: BIDSAsset) -> list[str]: + """:meta private:""" self._validate() errors: list[str] = [] if self._dataset_errors: @@ -78,6 +106,12 @@ def get_asset_errors(self, asset: BIDSAsset) -> list[str]: errors.extend(self._asset_errors[asset.bids_path]) return errors + def get_asset_metadata(self, asset: BIDSAsset) -> dict[str, Any]: + """:meta private:""" + self._validate() + assert self._asset_metadata is not None + return self._asset_metadata[asset.bids_path] + def get_validation_errors( self, schema_version: Optional[str] = None, @@ -87,6 +121,13 @@ def get_validation_errors( assert self._dataset_errors is not None return list(self._dataset_errors) + def get_metadata( + self, + digest: Optional[Digest] = None, + ignore_errors: bool = True, + ) -> BareAsset: + raise NotImplementedError + @dataclass class BIDSAsset(LocalFileAsset): @@ -131,6 +172,17 @@ def get_validation_errors( ) -> list[str]: return self.bids_dataset_description.get_asset_errors(self) + def get_metadata( + self, + digest: Optional[Digest] = None, + ignore_errors: bool = True, + ) -> BareAsset: + metadata = self.bids_dataset_description.get_asset_metadata(self) + start_time = end_time = datetime.now().astimezone() + add_common_metadata(metadata, self.filepath, start_time, end_time, digest) + metadata["path"] = self.path + return BareAsset(**metadata) + class NWBBIDSAsset(BIDSAsset, NWBAsset): """An NWB file in a BIDS dataset""" @@ -144,6 +196,17 @@ def get_validation_errors( self, schema_version, devel_debug ) + BIDSAsset.get_validation_errors(self) + def get_metadata( + self, + digest: Optional[Digest] = None, + ignore_errors: bool = True, + ) -> BareAsset: + bids_metadata = BIDSAsset.get_metadata(self) + nwb_metadata = NWBAsset.get_metadata(self, digest, ignore_errors) + return BareAsset( + **{**bids_metadata.dict(), **nwb_metadata.dict(exclude_none=True)} + ) + class ZarrBIDSAsset(BIDSAsset, ZarrAsset): """A Zarr directory in a BIDS dataset""" diff --git a/dandi/metadata.py b/dandi/metadata.py index c1a13a60c..2449179ac 100644 --- a/dandi/metadata.py +++ b/dandi/metadata.py @@ -2,7 +2,6 @@ from datetime import datetime, timedelta from functools import lru_cache -import itertools import os import os.path as op from pathlib import Path @@ -44,48 +43,6 @@ lgr = get_logger() -# Remove hard-coding when current version fallback is merged. - -BIDS_TO_DANDI = { - "subject": "subject_id", - "session": "session_id", -} - - -def _rename_bids_keys(bids_metadata, mapping=BIDS_TO_DANDI): - """Standardize BIDS metadata field naming to match DANDI.""" - return {mapping.get(k, k): v for k, v in bids_metadata.items()} - - -def _path_in_bids( - check_path, bids_marker="dataset_description.json", end_marker="dandiset.yaml" -): - """Determine whether a path is a member of a BIDS dataset. - - Parameters - ---------- - check_path: str or Path - bids_marker: str, optional - String giving a filename, the existence of which in a directory will mark it as a - BIDS dataset root directory. - end_marker: str, optional - String giving a filename, the existence of which in a directory will end the - search. - - Returns - ------- - bool - """ - check_path = Path(check_path) - for dir_level in itertools.chain([check_path], check_path.parents): - bids_marker_candidate = dir_level / bids_marker - end_marker_candidate = dir_level / end_marker - if bids_marker_candidate.is_file() or bids_marker_candidate.is_symlink(): - return True - if end_marker_candidate.is_file() or end_marker_candidate.is_symlink(): - return False - return False - # Disable this for clean hacking @metadata_cache.memoize_path @@ -115,18 +72,6 @@ def get_metadata(path: Union[str, Path]) -> Optional[dict]: lgr.debug("Failed to get metadata for %s: %s", path, exc) return None - # Somewhat less fragile search than previous proposals, - # could still be augmented with `_is_nwb` to disambiguate both cases - # at the detection level. - if _path_in_bids(path): - from .validate import validate_bids - - _meta = validate_bids(path) - meta = _meta["match_listing"][0] - meta["bids_schema_version"] = _meta["bids_schema_version"] - meta = _rename_bids_keys(meta) - return meta - if nwb_has_external_links(path): raise NotImplementedError( f"NWB files with external links are not supported: {path}" From 962eaf16098bb63423fad59af0a78d3c6411ddd1 Mon Sep 17 00:00:00 2001 From: "John T. Wodder II" Date: Thu, 4 Aug 2022 09:06:44 -0400 Subject: [PATCH 08/17] Make dataset_description.json files have default metadata --- dandi/files/bids.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/dandi/files/bids.py b/dandi/files/bids.py index 2044b6d2c..b6d35d4ac 100644 --- a/dandi/files/bids.py +++ b/dandi/files/bids.py @@ -121,12 +121,7 @@ def get_validation_errors( assert self._dataset_errors is not None return list(self._dataset_errors) - def get_metadata( - self, - digest: Optional[Digest] = None, - ignore_errors: bool = True, - ) -> BareAsset: - raise NotImplementedError + # get_metadata(): inherit use of default metadata from LocalFileAsset @dataclass From 6f23007c201275b10cc3aae03adaaec7ef55c09e Mon Sep 17 00:00:00 2001 From: "John T. Wodder II" Date: Thu, 4 Aug 2022 10:05:32 -0400 Subject: [PATCH 09/17] This will probably be useful later. --- dandi/files/__init__.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/dandi/files/__init__.py b/dandi/files/__init__.py index 61c6fd678..cbf6d9b89 100644 --- a/dandi/files/__init__.py +++ b/dandi/files/__init__.py @@ -66,6 +66,7 @@ "ZarrStat", "dandi_file", "find_dandi_files", + "find_bids_dataset_description", ] lgr = get_logger() @@ -262,3 +263,26 @@ def __call__(self, filepath: Path, path: str) -> DandiFile: ) self.bids_dataset_description.dataset_files.append(df) return df + + +def find_bids_dataset_description( + dirpath: str | Path, dandiset_path: Optional[str | Path] = None +) -> Optional[BIDSDatasetDescriptionAsset]: + """ + Look for a :file:`dataset_description.json` file in the directory + ``dirpath`` and each of its parents, stopping when a :file:`dandiset.yaml` + file is found or ``dandiset_path`` is reached. + """ + dirpath = Path(dirpath) + for d in (dirpath, *dirpath.parents): + bids_marker = d / BIDS_DATASET_DESCRIPTION + dandi_end = d / dandiset_metadata_file + if bids_marker.is_file() or bids_marker.is_symlink(): + f = dandi_file(bids_marker, dandiset_path) + assert isinstance(f, BIDSDatasetDescriptionAsset) + return f + elif dandi_end.is_file() or dandi_end.is_symlink(): + return None + elif dandiset_path is not None and d == Path(dandiset_path): + return None + return None From be789047a44628eba9383e33090e46bbad54902e Mon Sep 17 00:00:00 2001 From: "John T. Wodder II" Date: Fri, 5 Aug 2022 13:49:54 -0400 Subject: [PATCH 10/17] Mark test_ls_bids_file as xfailing --- dandi/cli/tests/test_ls.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dandi/cli/tests/test_ls.py b/dandi/cli/tests/test_ls.py index b04c8e175..27a43367e 100644 --- a/dandi/cli/tests/test_ls.py +++ b/dandi/cli/tests/test_ls.py @@ -50,6 +50,7 @@ def load(s): @mark.skipif_no_network +@pytest.mark.xfail(reason="https://github.com/dandi/dandi-cli/issues/1097") def test_ls_bids_file(bids_examples): bids_file_path = "asl003/sub-Sub1/anat/sub-Sub1_T1w.nii.gz" bids_file_path = os.path.join(bids_examples, bids_file_path) From 3c26f3807b1d3c5ff11d1292fabe5f49f5f3faac Mon Sep 17 00:00:00 2001 From: "John T. Wodder II" Date: Fri, 5 Aug 2022 14:09:02 -0400 Subject: [PATCH 11/17] Describe some structures in terms of "flatdata" and "schemadata" --- dandi/metadata.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/dandi/metadata.py b/dandi/metadata.py index 2449179ac..dd9203ed0 100644 --- a/dandi/metadata.py +++ b/dandi/metadata.py @@ -47,7 +47,8 @@ # Disable this for clean hacking @metadata_cache.memoize_path def get_metadata(path: Union[str, Path]) -> Optional[dict]: - """Get selected metadata from a .nwb file or a dandiset directory + """ + Get "flatdata" from a .nwb file or a Dandiset directory If a directory given and it is not a Dandiset, None is returned @@ -871,6 +872,10 @@ def add_common_metadata( end_time: datetime, digest: Optional[Digest] = None, ) -> None: + """ + Update a `dict` of raw "schemadata" with the fields that are common to both + NWB assets and non-NWB assets + """ if digest is not None: metadata["digest"] = digest.asdict() else: @@ -908,4 +913,14 @@ def get_generator(start_time: datetime, end_time: datetime) -> models.Activity: def prepare_metadata(metadata: dict) -> Dict[str, Any]: + """ + Convert "flatdata" [1]_ for an asset into raw [2]_ "schemadata" [3]_ + + .. [1] a flat `dict` mapping strings to strings & other primitive types; + returned by `get_metadata()` + + .. [2] i.e, a `dict` rather than a `BareAsset` + + .. [3] metadata in the form used by the ``dandischema`` library + """ return cast(Dict[str, Any], extract_model(models.BareAsset, metadata).json_dict()) From 8e15843b9977a8b5ac3c461eede5484a73a72198 Mon Sep 17 00:00:00 2001 From: "John T. Wodder II" Date: Fri, 5 Aug 2022 14:12:16 -0400 Subject: [PATCH 12/17] Adjust opening of `dandi.files` docstring --- dandi/files/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dandi/files/__init__.py b/dandi/files/__init__.py index cbf6d9b89..69f84f1d1 100644 --- a/dandi/files/__init__.py +++ b/dandi/files/__init__.py @@ -1,7 +1,7 @@ """ .. versionadded:: 0.36.0 -This module defines functionality for working with local files & directories +`dandi.files` defines functionality for working with local files & directories (as opposed to remote resources on a DANDI Archive server) that are of interest to DANDI. The classes for such files & directories all inherit from `DandiFile`, which has two immediate subclasses: `DandisetMetadataFile`, for From a654c8b6a24ef18fe4a4424119c88e1dd45efd8c Mon Sep 17 00:00:00 2001 From: "John T. Wodder II" Date: Fri, 5 Aug 2022 14:19:14 -0400 Subject: [PATCH 13/17] Add a comment explaining when an exception can happen --- dandi/files/__init__.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dandi/files/__init__.py b/dandi/files/__init__.py index 69f84f1d1..22ce7dfd4 100644 --- a/dandi/files/__init__.py +++ b/dandi/files/__init__.py @@ -130,6 +130,10 @@ def find_dandi_files( try: df = dandi_file(p, dandiset_path, bids_dataset_description=bidsdd) except UnknownAssetError: + # The directory does not have a recognized file extension + # (ie., it's not a Zarr or any other directory asset type + # we may add later), so traverse through it as a regular + # directory. if (p / BIDS_DATASET_DESCRIPTION).exists(): bids2 = dandi_file(p / BIDS_DATASET_DESCRIPTION, dandiset_path) assert isinstance(bids2, BIDSDatasetDescriptionAsset) From b43612d1a144548f5b5aed27970f23aa58d2a53c Mon Sep 17 00:00:00 2001 From: "John T. Wodder II" Date: Fri, 5 Aug 2022 15:10:58 -0400 Subject: [PATCH 14/17] Test properties of local Zarr assets and their entries --- dandi/files/zarr.py | 4 ++-- dandi/tests/test_files.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/dandi/files/zarr.py b/dandi/files/zarr.py index d54a9aa10..6982df8cd 100644 --- a/dandi/files/zarr.py +++ b/dandi/files/zarr.py @@ -152,11 +152,11 @@ def dirstat(dirpath: LocalZarrEntry) -> ZarrStat: if p.is_dir(): st = dirstat(p) size += st.size - dir_md5s[str(p)] = (st.digest.value, st.size) + dir_md5s[p.name] = (st.digest.value, st.size) files.extend(st.files) else: size += p.size - file_md5s[str(p)] = (md5file_nocache(p.filepath), p.size) + file_md5s[p.name] = (md5file_nocache(p.filepath), p.size) files.append(p) return ZarrStat( size=size, diff --git a/dandi/tests/test_files.py b/dandi/tests/test_files.py index ddfa375c3..2a333aca8 100644 --- a/dandi/tests/test_files.py +++ b/dandi/tests/test_files.py @@ -313,3 +313,31 @@ def test_upload_zarr(new_dandiset, tmp_path): assert not (file_src.filetree / "arr_2" / "0").exists() assert not (file_src.filetree / "arr_2" / "0").is_file() assert not (file_src.filetree / "arr_2" / "0").is_dir() + + +def test_zarr_properties(tmp_path: Path) -> None: + # This test assumes that the Zarr serialization format never changes + filepath = tmp_path / "example.zarr" + dt = np.dtype(" Date: Mon, 8 Aug 2022 10:42:51 -0400 Subject: [PATCH 15/17] Remove previous BIDS code from upload.py --- dandi/tests/test_upload.py | 10 ++--- dandi/upload.py | 89 -------------------------------------- 2 files changed, 4 insertions(+), 95 deletions(-) diff --git a/dandi/tests/test_upload.py b/dandi/tests/test_upload.py index 39cd611bb..d8af7871d 100644 --- a/dandi/tests/test_upload.py +++ b/dandi/tests/test_upload.py @@ -185,12 +185,10 @@ def test_upload_bids_invalid( mocker: MockerFixture, bids_dandiset_invalid: SampleDandiset ) -> None: iter_upload_spy = mocker.spy(LocalFileAsset, "iter_upload") - # Does it fail when it should fail? - with pytest.raises(RuntimeError): - bids_dandiset_invalid.upload(existing="forced") + bids_dandiset_invalid.upload(existing="force") iter_upload_spy.assert_not_called() # Does validation ignoring work? - bids_dandiset_invalid.upload(existing="forced", validation="ignore") + bids_dandiset_invalid.upload(existing="force", validation="ignore") iter_upload_spy.assert_called() # Check existence of assets: dandiset = bids_dandiset_invalid.dandiset @@ -201,7 +199,7 @@ def test_upload_bids_validation_ignore( mocker: MockerFixture, bids_dandiset: SampleDandiset ) -> None: iter_upload_spy = mocker.spy(LocalFileAsset, "iter_upload") - bids_dandiset.upload(existing="forced", validation="ignore") + bids_dandiset.upload(existing="force", validation="ignore") # Check whether upload was run iter_upload_spy.assert_called() # Check existence of assets: @@ -216,7 +214,7 @@ def test_upload_bids_validation_ignore( def test_upload_bids(mocker: MockerFixture, bids_dandiset: SampleDandiset) -> None: iter_upload_spy = mocker.spy(LocalFileAsset, "iter_upload") - bids_dandiset.upload(existing="forced") + bids_dandiset.upload(existing="force") # Check whether upload was run iter_upload_spy.assert_called() # Check existence of assets: diff --git a/dandi/upload.py b/dandi/upload.py index 6737d6d2e..56ccef1ae 100644 --- a/dandi/upload.py +++ b/dandi/upload.py @@ -10,7 +10,6 @@ import click from . import lgr -from .bids_utils import is_valid from .consts import DRAFT, dandiset_identifier_regex, dandiset_metadata_file from .dandiapi import RemoteAsset from .exceptions import NotFoundError @@ -62,20 +61,6 @@ def upload( " paths. Use 'dandi download' or 'organize' first." ) - # Pre-validate BIDS datasets before going for individual files. - bids_datasets = _bids_discover_and_validate(dandiset_.path, validation) - - if bids_datasets: - _bids_datasets = [str(i) for i in bids_datasets] - if not allow_any_path: - lgr.info( - "Enabling --allow-any-path since we detected %s under the following " - "paths: %s", - pluralize(len(_bids_datasets), "BIDS dataset"), - ", ".join(_bids_datasets), - ) - allow_any_path = True - instance = get_instance(dandi_instance) assert instance.api is not None api_url = instance.api @@ -398,77 +383,3 @@ def check_replace_asset( def skip_file(msg: Any) -> Dict[str, str]: return {"status": "skipped", "message": str(msg)} - - -def _bids_discover_and_validate( - dandiset_path: str, - validation: Optional[str] = "require", -) -> List[Path]: - """Temporary implementation for discovery and validation of BIDS datasets - - References: - - unification of validation records: https://github.com/dandi/dandi-cli/issues/943 - - validation "design doc": https://github.com/dandi/dandi-cli/pull/663 - """ - from .utils import find_files - from .validate import validate_bids - - lgr.debug("Discovering root directories of BIDS datasets") - bids_descriptions = map( - Path, - find_files( - r"(^|[/\x5C])dataset_description\.json$", - dandiset_path, - # for cases like sub-MITU01h3_..._chunk-4_SPIM.ngff.source - dirs_avoid=r"\.(ngff|zarr)(\..*)?$", - ), - ) - bids_datasets = [bd.parent for bd in bids_descriptions] - if bids_datasets: - lgr.debug( - "Detected %s under following paths: %s", - pluralize(len(bids_datasets), "BIDS dataset"), - ", ".join(str(i) for i in bids_datasets), - ) - - if validation != "skip": - if bids_datasets: - bids_datasets_to_validate = list() - for p in bids_datasets: - for bd in bids_datasets: - try: - p.relative_to(bd) - except ValueError: - try: - bd.relative_to(p) - except ValueError: - pass - else: - bids_datasets_to_validate.append(bd) - else: - bids_datasets_to_validate.append(bd) - else: - bids_datasets_to_validate = bids_datasets - bids_datasets_to_validate.sort() - valid_datasets: List[Path] = [] - invalid_datasets: List[Path] = [] - for bd in bids_datasets_to_validate: - validator_result = validate_bids(bd) - valid = is_valid( - validator_result, - allow_missing_files=validation == "ignore", - allow_invalid_filenames=validation == "ignore", - ) - (valid_datasets if valid else invalid_datasets).append(bd) - if invalid_datasets: - raise RuntimeError( - f"Found {pluralize(len(invalid_datasets), 'BIDS dataset')}, which did not " - "pass validation:\n * " - + "\n * ".join([str(i) for i in invalid_datasets]) - + "\nTo resolve " - "this, perform the required changes or set the validation parameter to " - '"skip" or "ignore".' - ) - return valid_datasets - else: - return bids_datasets From 550cc7c21ed3e2b719f91352a2a6e32d84c1cfdd Mon Sep 17 00:00:00 2001 From: "John T. Wodder II" Date: Mon, 8 Aug 2022 10:44:54 -0400 Subject: [PATCH 16/17] Add versionadded:: directives --- dandi/files/__init__.py | 4 ++++ dandi/files/bids.py | 18 ++++++++++++++++-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/dandi/files/__init__.py b/dandi/files/__init__.py index 22ce7dfd4..dcb008c96 100644 --- a/dandi/files/__init__.py +++ b/dandi/files/__init__.py @@ -244,6 +244,8 @@ def __call__(self, filepath: Path, path: str) -> DandiFile: @dataclass class BIDSFileFactory(DandiFileFactory): + """:meta private:""" + bids_dataset_description: BIDSDatasetDescriptionAsset CLASSES = { @@ -273,6 +275,8 @@ def find_bids_dataset_description( dirpath: str | Path, dandiset_path: Optional[str | Path] = None ) -> Optional[BIDSDatasetDescriptionAsset]: """ + .. versionadded:: 0.46.0 + Look for a :file:`dataset_description.json` file in the directory ``dirpath`` and each of its parents, stopping when a :file:`dandiset.yaml` file is found or ``dandiset_path`` is reached. diff --git a/dandi/files/bids.py b/dandi/files/bids.py index b6d35d4ac..9207c98f3 100644 --- a/dandi/files/bids.py +++ b/dandi/files/bids.py @@ -24,6 +24,8 @@ @dataclass class BIDSDatasetDescriptionAsset(LocalFileAsset): """ + .. versionadded:: 0.46.0 + The :file:`dataset_description.json` file for a BIDS dataset, used to perform operations on the dataset as a whole """ @@ -127,6 +129,8 @@ def get_validation_errors( @dataclass class BIDSAsset(LocalFileAsset): """ + .. versionadded:: 0.46.0 + Base class for non-:file:`dataset_description.json` assets in BIDS datasets """ @@ -180,7 +184,11 @@ def get_metadata( class NWBBIDSAsset(BIDSAsset, NWBAsset): - """An NWB file in a BIDS dataset""" + """ + .. versionadded:: 0.46.0 + + An NWB file in a BIDS dataset + """ def get_validation_errors( self, @@ -204,7 +212,11 @@ def get_metadata( class ZarrBIDSAsset(BIDSAsset, ZarrAsset): - """A Zarr directory in a BIDS dataset""" + """ + .. versionadded:: 0.46.0 + + A Zarr directory in a BIDS dataset + """ def get_validation_errors( self, @@ -218,6 +230,8 @@ def get_validation_errors( class GenericBIDSAsset(BIDSAsset, GenericAsset): """ + .. versionadded:: 0.46.0 + An asset in a BIDS dataset that is not an NWB file, a Zarr directory, or a :file:`dataset_description.json` file. Note that, unlike the non-BIDS classes, this includes video files. From e66abfddfe3a605b23b340ed923ba2a59dc8aac0 Mon Sep 17 00:00:00 2001 From: "John T. Wodder II" Date: Mon, 8 Aug 2022 12:16:18 -0400 Subject: [PATCH 17/17] Move private dandi.files classes to _private.py --- dandi/files/__init__.py | 86 +----------------------------------- dandi/files/_private.py | 97 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+), 84 deletions(-) create mode 100644 dandi/files/_private.py diff --git a/dandi/files/__init__.py b/dandi/files/__init__.py index dcb008c96..67f8b7e38 100644 --- a/dandi/files/__init__.py +++ b/dandi/files/__init__.py @@ -13,21 +13,14 @@ from collections import deque from collections.abc import Iterator -from dataclasses import dataclass -from enum import Enum from pathlib import Path from typing import Optional -import weakref from dandi import get_logger -from dandi.consts import ( - BIDS_DATASET_DESCRIPTION, - VIDEO_FILE_EXTENSIONS, - ZARR_EXTENSIONS, - dandiset_metadata_file, -) +from dandi.consts import BIDS_DATASET_DESCRIPTION, dandiset_metadata_file from dandi.exceptions import UnknownAssetError +from ._private import BIDSFileFactory, DandiFileFactory from .bases import ( DandiFile, DandisetMetadataFile, @@ -196,81 +189,6 @@ def dandi_file( return factory(filepath, path) -class DandiFileType(Enum): - """:meta private:""" - - NWB = 1 - ZARR = 2 - VIDEO = 3 - GENERIC = 4 - BIDS_DATASET_DESCRIPTION = 5 - - @staticmethod - def classify(path: Path) -> DandiFileType: - if path.is_dir(): - if not any(path.iterdir()): - raise UnknownAssetError("Empty directories cannot be assets") - if path.suffix in ZARR_EXTENSIONS: - return DandiFileType.ZARR - raise UnknownAssetError( - f"Directory has unrecognized suffix {path.suffix!r}" - ) - elif path.name == BIDS_DATASET_DESCRIPTION: - return DandiFileType.BIDS_DATASET_DESCRIPTION - elif path.suffix == ".nwb": - return DandiFileType.NWB - elif path.suffix in VIDEO_FILE_EXTENSIONS: - return DandiFileType.VIDEO - else: - return DandiFileType.GENERIC - - -class DandiFileFactory: - """:meta private:""" - - CLASSES: dict[DandiFileType, type[LocalAsset]] = { - DandiFileType.NWB: NWBAsset, - DandiFileType.ZARR: ZarrAsset, - DandiFileType.VIDEO: VideoAsset, - DandiFileType.GENERIC: GenericAsset, - DandiFileType.BIDS_DATASET_DESCRIPTION: BIDSDatasetDescriptionAsset, - } - - def __call__(self, filepath: Path, path: str) -> DandiFile: - return self.CLASSES[DandiFileType.classify(filepath)]( - filepath=filepath, path=path - ) - - -@dataclass -class BIDSFileFactory(DandiFileFactory): - """:meta private:""" - - bids_dataset_description: BIDSDatasetDescriptionAsset - - CLASSES = { - DandiFileType.NWB: NWBBIDSAsset, - DandiFileType.ZARR: ZarrBIDSAsset, - DandiFileType.VIDEO: GenericBIDSAsset, - DandiFileType.GENERIC: GenericBIDSAsset, - } - - def __call__(self, filepath: Path, path: str) -> DandiFile: - ftype = DandiFileType.classify(filepath) - if ftype is DandiFileType.BIDS_DATASET_DESCRIPTION: - if filepath == self.bids_dataset_description.filepath: - return self.bids_dataset_description - else: - return BIDSDatasetDescriptionAsset(filepath=filepath, path=path) - df = self.CLASSES[ftype]( - filepath=filepath, - path=path, - bids_dataset_description_ref=weakref.ref(self.bids_dataset_description), - ) - self.bids_dataset_description.dataset_files.append(df) - return df - - def find_bids_dataset_description( dirpath: str | Path, dandiset_path: Optional[str | Path] = None ) -> Optional[BIDSDatasetDescriptionAsset]: diff --git a/dandi/files/_private.py b/dandi/files/_private.py new file mode 100644 index 000000000..3d0681ba2 --- /dev/null +++ b/dandi/files/_private.py @@ -0,0 +1,97 @@ +from __future__ import annotations + +from dataclasses import dataclass +from enum import Enum +from pathlib import Path +import weakref + +from dandi.consts import ( + BIDS_DATASET_DESCRIPTION, + VIDEO_FILE_EXTENSIONS, + ZARR_EXTENSIONS, +) +from dandi.exceptions import UnknownAssetError + +from .bases import DandiFile, GenericAsset, LocalAsset, NWBAsset, VideoAsset +from .bids import ( + BIDSDatasetDescriptionAsset, + GenericBIDSAsset, + NWBBIDSAsset, + ZarrBIDSAsset, +) +from .zarr import ZarrAsset + + +class DandiFileType(Enum): + """:meta private:""" + + NWB = 1 + ZARR = 2 + VIDEO = 3 + GENERIC = 4 + BIDS_DATASET_DESCRIPTION = 5 + + @staticmethod + def classify(path: Path) -> DandiFileType: + if path.is_dir(): + if not any(path.iterdir()): + raise UnknownAssetError("Empty directories cannot be assets") + if path.suffix in ZARR_EXTENSIONS: + return DandiFileType.ZARR + raise UnknownAssetError( + f"Directory has unrecognized suffix {path.suffix!r}" + ) + elif path.name == BIDS_DATASET_DESCRIPTION: + return DandiFileType.BIDS_DATASET_DESCRIPTION + elif path.suffix == ".nwb": + return DandiFileType.NWB + elif path.suffix in VIDEO_FILE_EXTENSIONS: + return DandiFileType.VIDEO + else: + return DandiFileType.GENERIC + + +class DandiFileFactory: + """:meta private:""" + + CLASSES: dict[DandiFileType, type[LocalAsset]] = { + DandiFileType.NWB: NWBAsset, + DandiFileType.ZARR: ZarrAsset, + DandiFileType.VIDEO: VideoAsset, + DandiFileType.GENERIC: GenericAsset, + DandiFileType.BIDS_DATASET_DESCRIPTION: BIDSDatasetDescriptionAsset, + } + + def __call__(self, filepath: Path, path: str) -> DandiFile: + return self.CLASSES[DandiFileType.classify(filepath)]( + filepath=filepath, path=path + ) + + +@dataclass +class BIDSFileFactory(DandiFileFactory): + """:meta private:""" + + bids_dataset_description: BIDSDatasetDescriptionAsset + + CLASSES = { + DandiFileType.NWB: NWBBIDSAsset, + DandiFileType.ZARR: ZarrBIDSAsset, + DandiFileType.VIDEO: GenericBIDSAsset, + DandiFileType.GENERIC: GenericBIDSAsset, + } + + def __call__(self, filepath: Path, path: str) -> DandiFile: + ftype = DandiFileType.classify(filepath) + if ftype is DandiFileType.BIDS_DATASET_DESCRIPTION: + if filepath == self.bids_dataset_description.filepath: + return self.bids_dataset_description + else: + return BIDSDatasetDescriptionAsset(filepath=filepath, path=path) + df = self.CLASSES[ftype]( + filepath=filepath, + path=path, + bids_dataset_description_ref=weakref.ref(self.bids_dataset_description), + ) + self.bids_dataset_description.dataset_files.append(df) + return df