diff --git a/CHANGELOG.md b/CHANGELOG.md index a5d9fcf13..8eac9dd78 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - add message broker publishing to workflow - add rabbitmq + default config to integration tests - File operator that does database operations for files #148 + - introduced `/v1/submissions/{submissionId}/files` to update and remove files in a submission #633 + - file flagged for deletion also removed from submission and check files have the status ready when being read from the submission #633 + - prevent publish if files have in submission have status added (added but no metadata object) or failed (failed in ingestion, completion, or for any other reason) #633 - Mongo indexes for `file` schema #148 - `/files` endpoint to retrieve files attached to a project #148 #627 - option to add additional members to `application/problem+json` #642 diff --git a/docs/specification.yml b/docs/specification.yml index 105922249..3e5f9a284 100644 --- a/docs/specification.yml +++ b/docs/specification.yml @@ -1409,6 +1409,179 @@ paths: application/json: schema: $ref: "#/components/schemas/405MethodNotAllowed" + /v1/submissions/{submissionId}/files: + post: + tags: + - Submission + summary: Add new files to a submission. + parameters: + - name: submissionId + in: path + description: ID of the object submission. + schema: + type: string + required: true + requestBody: + content: + application/json: + schema: + type: array + items: + type: object + properties: + accessionId: + type: string + title: Accession Id for file + version: + type: integer + title: Version of the file + responses: + 204: + description: No Content + 400: + description: Bad Request + content: + application/json: + schema: + $ref: "#/components/schemas/400BadRequest" + 401: + description: Unauthorized + content: + application/json: + schema: + $ref: "#/components/schemas/401Unauthorized" + 403: + description: Forbidden + content: + application/json: + schema: + $ref: "#/components/schemas/403Forbidden" + get: + tags: + - Query + summary: Retrieve all files with detailed info specific to a submission. + parameters: + - name: submissionId + in: path + description: ID of the object submission. + schema: + type: string + required: true + responses: + 200: + description: OK + content: + application/json: + schema: + type: array + items: + $ref: "#/components/schemas/File" + 400: + description: Bad Request + content: + application/json: + schema: + $ref: "#/components/schemas/400BadRequest" + 401: + description: Unauthorized + content: + application/json: + schema: + $ref: "#/components/schemas/401Unauthorized" + 403: + description: Forbidden + content: + application/json: + schema: + $ref: "#/components/schemas/403Forbidden" + put: + tags: + - Manage + summary: Update Files for the submission with a specified submission ID. + parameters: + - name: submissionId + in: path + description: ID of the object submission. + schema: + type: string + required: true + requestBody: + content: + application/json: + schema: + type: array + items: + $ref: "#/components/schemas/SubmissionFile" + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: "#/components/schemas/SubmissionCreated" + 400: + description: Bad Request + content: + application/json: + schema: + $ref: "#/components/schemas/400BadRequest" + 401: + description: Unauthorized + content: + application/json: + schema: + $ref: "#/components/schemas/401Unauthorized" + 403: + description: Forbidden + content: + application/json: + schema: + $ref: "#/components/schemas/403Forbidden" + 405: + description: Method Not Allowed + content: + application/json: + schema: + $ref: "#/components/schemas/405MethodNotAllowed" + /v1/submissions/{submissionId}/files/{fileId}: + delete: + tags: + - Manage + summary: Remove file from a submission. + parameters: + - name: submissionId + in: path + description: ID of the object submission. + schema: + type: string + required: true + - name: fileId + in: path + description: ID of the object submission. + schema: + type: string + required: true + responses: + 204: + description: No Content + 400: + description: Bad Request + content: + application/json: + schema: + $ref: "#/components/schemas/400BadRequest" + 401: + description: Unauthorized + content: + application/json: + schema: + $ref: "#/components/schemas/401Unauthorized" + 403: + description: Forbidden + content: + application/json: + schema: + $ref: "#/components/schemas/403Forbidden" /v1/publish/{submissionId}: patch: tags: @@ -1603,7 +1776,7 @@ paths: schema: type: array items: - $ref: "#/components/schemas/REMS" + $ref: "#/components/schemas/File" 400: description: Bad Request content: @@ -1747,7 +1920,7 @@ components: - dateUpdated additionalProperties: true properties: - accessioniId: + accessionId: type: string description: accession id generated to identify an object publishedDate: @@ -2956,3 +3129,31 @@ components: $ref: "#/components/schemas/Checksum" unencrypted_checksums: $ref: "#/components/schemas/Checksum" + SubmissionFile: + type: object + description: Describes a submission file information that can be attached to a submission + properties: + accessionId: + type: string + title: Accession Id for file + version: + type: integer + title: Version of the file + status: + type: string + title: Status of the file + enum: ["added","ready","verified","completed","failed"] + objectId: + type: object + title: File size in bytes + properties: + accessionId: + type: string + description: accession id generated to identify an object + schema: + type: string + description: type of schema this Accession ID relates to and was added in submit + error: + type: string + title: Error for file if status is failed + description: required only if status is failed diff --git a/metadata_backend/api/handlers/publish.py b/metadata_backend/api/handlers/publish.py index 08f8aa0ab..180ab022d 100644 --- a/metadata_backend/api/handlers/publish.py +++ b/metadata_backend/api/handlers/publish.py @@ -516,9 +516,12 @@ async def publish_submission(self, req: Request) -> Response: # Publish to external services - must already have DOI and Metax ID publish_status = {} datacite_study = {} + # check first if all the files are ready, if not return HTTPBadRequest + await file_operator.check_submission_files_ready(submission_id) if "messageBroker" in workflow.endpoints: - files = await file_operator.read_submission_files(submission_id) + # we will only publish the files which are ready + files = await file_operator.read_submission_files(submission_id, ["ready"]) for file in files: ingest_msg = { "type": "ingest", diff --git a/metadata_backend/api/handlers/submission.py b/metadata_backend/api/handlers/submission.py index 632c79fd3..715a58ede 100644 --- a/metadata_backend/api/handlers/submission.py +++ b/metadata_backend/api/handlers/submission.py @@ -297,19 +297,37 @@ async def put_submission_path(self, req: Request) -> Response: elif req.path.endswith("rems"): schema = "rems" await self.check_rems_ok({"rems": data}) + elif req.path.endswith("files"): + schema = "files" else: raise web.HTTPNotFound(reason=f"'{req.path}' does not exist") - submission[schema] = data - JSONValidator(submission, "submission").validate + if schema == "files": + file_operator = FileOperator(db_client) + # we expect to get a list of dict for files + # that matches the json schema when added to submission object + submission[schema] = data + JSONValidator(submission, "submission").validate + for file in data: + if "accessionId" not in file: + reason = f"Updating {submission_id} failed. Files require an accessionId." + LOG.error(reason) + raise web.HTTPBadRequest(reason=reason) + _file_accessionId = file.pop("accessionId") + _file_update_op = {f"files.$.{k}": v for k, v in file.items()} + await file_operator.update_file_submission(_file_accessionId, submission_id, _file_update_op) + else: + op = "add" + if schema in submission: + op = "replace" + patch = [ + {"op": op, "path": f"/{schema}", "value": data}, + ] + + submission[schema] = data + JSONValidator(submission, "submission").validate - op = "add" - if schema in submission: - op = "replace" - patch = [ - {"op": op, "path": f"/{schema}", "value": data}, - ] - upd_submission = await operator.update_submission(submission_id, patch) + upd_submission = await operator.update_submission(submission_id, patch) body = ujson.dumps({"submissionId": upd_submission}, escape_forward_slashes=False) LOG.info("PUT %r in submission with ID: %r was successful.", schema, submission_id) @@ -363,7 +381,7 @@ async def add_submission_files(self, req: Request) -> Response: data: List[Dict] = await req.json() if all("accessionId" in d and "version" in d for d in data): - # add status to + # set status to file as added data = [{**item, "status": "added"} for item in data] await file_operator.add_files_submission(data, submission_id) LOG.info("Adding files to submission with ID: %r was successful.", submission_id) @@ -372,3 +390,28 @@ async def add_submission_files(self, req: Request) -> Response: reason = "Request does not contain a list of Objects each with `accessionId` and `version`" LOG.error(reason) raise web.HTTPBadRequest(reason=reason) + + async def delete_submission_files(self, req: Request) -> Response: + """Remove files from a submission. + + Body needs to contain a list of accessionId for files. + + :param req: DELETE request + :returns: HTTP No Content response + """ + submission_id = req.match_info["submissionId"] + file_accession_id = req.match_info["fileId"] + db_client = req.app["db_client"] + submission_operator = SubmissionOperator(db_client) + + # Check submission exists and is not already published + await submission_operator.check_submission_exists(submission_id) + await submission_operator.check_submission_published(submission_id, req.method) + + await self._handle_check_ownership(req, "submission", submission_id) + + file_operator = FileOperator(db_client) + + await file_operator.remove_file_submission(file_accession_id, "accessionId", submission_id) + LOG.info("Removing file: %r from submission with ID: %r was successful.", file_accession_id, submission_id) + return web.HTTPNoContent() diff --git a/metadata_backend/api/operators/file.py b/metadata_backend/api/operators/file.py index 3676930b6..30defa691 100644 --- a/metadata_backend/api/operators/file.py +++ b/metadata_backend/api/operators/file.py @@ -3,6 +3,7 @@ from datetime import datetime from typing import Dict, List, Optional, Tuple +import ujson from aiohttp import web from pymongo.errors import ConnectionFailure, OperationFailure @@ -49,6 +50,9 @@ async def _create_file(self, file: File) -> str: If a file with the same path already exists, add a new file version instead. + :raises: HTTPBadRequest if file creation in the db was not successful + :raises: HTTPInternalServerError if db operation failed because of connection + or other db issue :param file: file data as in the `file.json` schema :returns: Tuple of File id and file version """ @@ -80,9 +84,10 @@ async def create_file_or_version(self, file: File) -> Tuple[str, int]: If a file with the same path already exists, add a new file version instead. :param file: file data as in the `file.json` schema + :raises: HTTPInternalServerError if db operation failed because of connection + or other db issue :returns: Tuple of file accession id and file version """ - _projection = { "_id": 0, "accessionId": 1, @@ -97,7 +102,7 @@ async def create_file_or_version(self, file: File) -> Tuple[str, int]: accession_id = file_in_db["accessionId"] file_in_db = file_in_db["currentVersion"] - # pass in list of versions, which returns the file version with the highest version number + # pass latest versions number _current_version = file_in_db["version"] file_version = _current_version + 1 version_data = self._from_version_template(file, file_version) @@ -124,6 +129,9 @@ async def read_file(self, accession_id: str, version: Optional[int] = None) -> D :param accession_id: Accession ID of the file to read :param version: version number to extract. Defaults to latest :raises: HTTPBadRequest if reading was not successful + :raises: HTTPNotFound if file not found + :raises: HTTPInternalServerError if db operation failed because of connection + or other db issue :returns: File object of the latest file version """ aggregate_query = [ @@ -168,6 +176,8 @@ async def read_project_files(self, project_id: str) -> List[Dict]: The files are read by the latest version, and filtered either by projectId. :param project_id: Project ID to get files for + :raises: HTTPInternalServerError if db operation failed because of connection + or other db issue :returns: List of files """ aggregate_query = [ @@ -193,20 +203,72 @@ async def read_project_files(self, project_id: str) -> List[Dict]: LOG.exception(reason) raise web.HTTPInternalServerError(reason=reason) from error - async def read_submission_files(self, submission_id: str) -> List[Dict]: + async def check_submission_files_ready(self, submission_id: str) -> None: + """Check all files in a submission are marked as ready. + + Files marked as ready in a submission, means an metadata object has been + attached to the file. + + :param submission_id: Submission ID to get associated files status + :raises: HTTPInternalServerError if db operation failed because of connection + or other db issue + """ + aggregate_query = [ + {"$match": {"submissionId": submission_id}}, + {"$unwind": "$files"}, + # check the status is not in failed or added + # failed can occour when an file ingestion/verification/mapping fails + {"$match": {"files.status": {"$in": ["added", "failed"]}}}, + { + "$project": { + "_id": 0, + "accessionId": "$files.accessionId", + "version": "$files.version", + "status": "$files.status", + } + }, + ] + try: + problematic_files = await self.db_service.do_aggregate("submission", aggregate_query) + if len(problematic_files) > 0: + reason = ( + f"There are a problematic files: {','.join([i['accessionId'] for i in problematic_files])} " + f"in the submission with id: {submission_id}" + ) + LOG.error(reason) + raise web.HTTPBadRequest( + reason=reason, + text=ujson.dumps({"problematic-files": problematic_files}), + content_type="application/json", + ) + LOG.debug("All files have been marked as ready") + except (ConnectionFailure, OperationFailure) as error: + reason = f"Error happened while getting submission, err: {error}" + LOG.exception(reason) + raise web.HTTPInternalServerError(reason=reason) from error + + async def read_submission_files(self, submission_id: str, expected_status: Optional[List] = None) -> List[Dict]: """Get files in a submission. The files are identified in a submission by version. - :param submission_id: Submission ID to get files for + :param submission_id: Submission ID to read files associated with a submission + :param expected_status: List of expected statuses (can be one or more statuses) + :raises: HTTPInternalServerError if db operation failed because of connection + or other db issue or db aggregate does not return a list :returns: List of files specific to a submission """ - # TO_DO: add check for ready status aggregate_query = [ {"$match": {"submissionId": submission_id}}, {"$unwind": "$files"}, {"$project": {"_id": 0, "accessionId": "$files.accessionId", "version": "$files.version"}}, ] + if expected_status: + # we match only the files that have a specific status + aggregate_query.insert( + 2, + {"$match": {"files.status": {"$in": expected_status}}}, + ) files = [] try: submission_files = await self.db_service.do_aggregate("submission", aggregate_query) @@ -231,15 +293,18 @@ async def flag_file_deleted(self, file_path: str, deleted: bool = True) -> None: :param file_path: Path of the file to flag as deleted :param deleted: Whether file is marked as deleted, set to `False` to mark a file as available again :raises: HTTPBadRequest if deleting was not successful + :raises: HTTPInternalServerError if db operation failed because of connection + or other db issue """ try: delete_success = await self.db_service.update_by_key_value( - "file", "path", file_path, {"flagDeleted": deleted} + "file", {"path": file_path}, {"flagDeleted": deleted} ) + await self.remove_file_submission(file_path, id_type="path") except (ConnectionFailure, OperationFailure) as error: reason = f"Error happened while flagging file as deleted, err: {error}" LOG.exception(reason) - raise web.HTTPBadRequest(reason=reason) from error + raise web.HTTPInternalServerError(reason=reason) from error if not delete_success: reason = f"Flagging file with '{file_path}' as deleted failed." LOG.error(reason) @@ -247,6 +312,85 @@ async def flag_file_deleted(self, file_path: str, deleted: bool = True) -> None: LOG.info("Flagging file with file_path: %r as Deleted succeeded.", file_path) + async def remove_file_submission( + self, accession_id: str, id_type: Optional[str] = None, submission_id: Optional[str] = None + ) -> None: + """Flag file as deleted. + + If we flag the file as deleted we should remove it from any submission it has been attached to + + :param accession_id: Accession ID of the file to read + :param id_type: depending on the file id this can be either ``path`` or ``accessionId``. + :param submission_id: Submission ID to remove file associated with it + :raises: HTTPBadRequest if deleting was not successful + :raises: HTTPInternalServerError if db operation failed because of connection + or other db issue + """ + try: + if id_type == "path": + _fileId = await self.db_service.read_by_key_value("file", {"path": accession_id}, {"accessionId": 1}) + elif id_type == "accessionId": + _fileId["accessionId"] = accession_id + else: + reason = f"Cannot recognize '{id_type}' as a type of id for file deletion from submission." + LOG.error(reason) + raise web.HTTPBadRequest(reason=reason) + if submission_id: + delete_success = await self.db_service.remove( + "submission", submission_id, {"files": {"accession_id": _fileId["accessionId"]}} + ) + LOG.info("Removing file: %r from submission: %r succeeded.", accession_id, submission_id) + else: + delete_success = await self.db_service.remove_many( + "submission", {"files": {"accession_id": _fileId["accessionId"]}} + ) + LOG.info( + "Removing file with path: %r from submissions, by accessionID: %r succeeded.", + accession_id, + _fileId["accessionId"], + ) + except (ConnectionFailure, OperationFailure) as error: + reason = f"Error happened while removing file from submission, err: {error}" + LOG.exception(reason) + raise web.HTTPInternalServerError(reason=reason) from error + if not delete_success: + reason = f"Removing file identified via '{id_type}': '{accession_id}' from submission failed." + LOG.error(reason) + raise web.HTTPBadRequest(reason=reason) + + async def update_file_submission(self, accession_id: str, submission_id: str, update_data: dict) -> None: + """Update file in a submission. + + File should not be deleted from DB, only flagged as not available anymore + + :param accession_id: Accession ID of the file to update + :param submission_id: Submission ID to update file associated with it + :param update_data: Mongodb ``$set`` operation to be performed on the submission + :raises: HTTPBadRequest if deleting was not successful + :raises: HTTPInternalServerError if db operation failed because of connection + or other db issue + """ + try: + update_success = await self.db_service.update_by_key_value( + "submission", + {"submissionId": submission_id, "files": {"$elemMatch": {"accessionId": accession_id}}}, + # this can take the form of: {"files.$.status": "failed"} or + # {"files.$.status": "failed", "files.$.version": 3, + # "files.$.objectId": {"accessionId": 4, "schema": "study"} + # ideally we check before that we don't update the accessionId + update_data, + ) + except (ConnectionFailure, OperationFailure) as error: + reason = f"Error happened while updating file in submission, err: {error}" + LOG.exception(reason) + raise web.HTTPInternalServerError(reason=reason) from error + if not update_success: + reason = f"Updating file with '{accession_id}' in '{submission_id}' failed." + LOG.error(reason) + raise web.HTTPBadRequest(reason=reason) + + LOG.info("Updating file with file ID: %r in submission %r succeeded.", accession_id, submission_id) + async def add_files_submission(self, files: List[dict], submission_id: str) -> bool: """Add files to a submission. diff --git a/metadata_backend/database/db_service.py b/metadata_backend/database/db_service.py index 79ebb93b4..143d367b2 100644 --- a/metadata_backend/database/db_service.py +++ b/metadata_backend/database/db_service.py @@ -242,17 +242,15 @@ async def update(self, collection: str, accession_id: str, data_to_be_updated: D return result.acknowledged @auto_reconnect - async def update_by_key_value(self, collection: str, key: str, value: str, data_to_be_updated: Dict) -> bool: + async def update_by_key_value(self, collection: str, find_by_key_value: dict, data_to_be_updated: Dict) -> bool: """Update some elements of object by its accessionId. :param collection: Collection to search in - :param key: document property - :param value: property value + :param find_by_key_value: Key-value of document property as key and value :param data_to_be_updated: JSON representing the data that should be updated to object, can replace previous fields and add new ones. :returns: True if operation was successful """ - find_by_key_value = {key: value} update_op = {"$set": data_to_be_updated} result = await self.database[collection].update_one(find_by_key_value, update_op) LOG.debug( @@ -264,14 +262,14 @@ async def update_by_key_value(self, collection: str, key: str, value: str, data_ return result.acknowledged @auto_reconnect - async def remove(self, collection: str, accession_id: str, data_to_be_removed: Union[str, Dict]) -> bool: + async def remove(self, collection: str, accession_id: str, data_to_be_removed: Union[str, Dict]) -> dict: """Remove element of object by its accessionId. :param collection: Collection where document should be searched from :param accession_id: ID of the object/submission/user to be updated :param data_to_be_removed: str or JSON representing the data that should be updated to removed. - :returns: True if operation was successful + :returns: JSON after remove if operation was successful """ id_key = self._get_id_key(collection) find_by_id = {id_key: accession_id} @@ -280,17 +278,35 @@ async def remove(self, collection: str, accession_id: str, data_to_be_removed: U find_by_id, remove_op, projection={"_id": False}, return_document=ReturnDocument.AFTER ) LOG.debug( - "DB doc in collection: %r with data: %r removed the accesion ID: %r.", + "DB doc in collection: %r with data: %r removed the accession ID: %r.", collection, data_to_be_removed, accession_id, ) return result + @auto_reconnect + async def remove_many(self, collection: str, data_to_be_removed: Union[str, Dict]) -> bool: + """Remove element of object by its accessionId. + + :param collection: Collection where document should be searched from + :param data_to_be_removed: str or JSON representing the data that should be + updated to removed. + :returns: True if operation was successful + """ + remove_op = {"$pull": data_to_be_removed} + result = await self.database[collection].update_many({}, remove_op) + LOG.debug( + "DB doc in collection: %r with data: %r removed.", + collection, + data_to_be_removed, + ) + return result.acknowledged + @auto_reconnect async def append( self, collection: str, accession_id: str, data_to_be_addded: Union[str, Dict], upsert: bool = False - ) -> bool: + ) -> dict: """Append data by to object with accessionId in collection. :param collection: Collection where document should be searched from @@ -298,7 +314,7 @@ async def append( :param data_to_be_addded: str or JSON representing the data that should be updated to removed. :param upsert: If the document does not exist add it - :returns: True if operation was successful + :returns: JSON after remove if operation was successful """ id_key = self._get_id_key(collection) find_by_id = {id_key: accession_id} diff --git a/metadata_backend/helpers/schemas/file.json b/metadata_backend/helpers/schemas/file.json index 7ef382b68..dc9264fd2 100644 --- a/metadata_backend/helpers/schemas/file.json +++ b/metadata_backend/helpers/schemas/file.json @@ -126,7 +126,7 @@ }, "submissions": { "type": "array", - "title": "Submissions which linked to this file", + "title": "Submissions which are linked to this version of the file. This will only be updated when a file is published", "items": { "type": "string", "title": "Submission ID" diff --git a/metadata_backend/server.py b/metadata_backend/server.py index 9354e0c7b..c7917212c 100644 --- a/metadata_backend/server.py +++ b/metadata_backend/server.py @@ -140,8 +140,10 @@ async def on_prepare(_: web.Request, response: web.StreamResponse) -> None: web.post("/submissions/{submissionId}/files", _submission.add_submission_files), web.put("/submissions/{submissionId}/doi", _submission.put_submission_path), web.put("/submissions/{submissionId}/rems", _submission.put_submission_path), + web.put("/submissions/{submissionId}/files", _submission.put_submission_path), web.patch("/submissions/{submissionId}", _submission.patch_submission), web.delete("/submissions/{submissionId}", _submission.delete_submission), + web.delete("/submissions/{submissionId}/files/{fileId}", _submission.delete_submission_files), # publish submissions web.patch("/publish/{submissionId}", _publish_submission.publish_submission), # users operations diff --git a/tests/unit/test_handlers.py b/tests/unit/test_handlers.py index 2a79d944c..a2fbf7956 100644 --- a/tests/unit/test_handlers.py +++ b/tests/unit/test_handlers.py @@ -144,6 +144,7 @@ async def setUpAsync(self): } self.fileoperator_config = { "read_submission_files.side_effect": self.fake_read_submission_files, + "check_submission_files_ready.side_effect": self.fake_check_submission_files, } RESTAPIHandler._handle_check_ownership = make_mocked_coro(True) @@ -255,10 +256,14 @@ async def fake_useroperator_read_user(self, user_id): """Fake read operation to return mocked user.""" return self.test_user - async def fake_read_submission_files(self, submission_id): + async def fake_read_submission_files(self, submission_id, status_list): """Fake read submission files.""" return [self.projected_file_example] + async def fake_check_submission_files(self, submission_id): + """Fake check submission files.""" + return True, [] + class APIHandlerTestCase(HandlersTestCase): """Schema API endpoint class test cases.""" @@ -1064,8 +1069,6 @@ async def setUpAsync(self): self.patch_xmloperator = patch(class_xmloperator, **self.xmloperator_config, spec=True) self.MockedXMLOperator = self.patch_xmloperator.start() - self._mq_connection = "metadata_backend.api.handlers.publish.MQPublisher" - async def tearDownAsync(self): """Cleanup mocked stuff.""" await super().tearDownAsync() diff --git a/tests/unit/test_server.py b/tests/unit/test_server.py index a1d75cc43..1e40066b5 100644 --- a/tests/unit/test_server.py +++ b/tests/unit/test_server.py @@ -51,7 +51,7 @@ async def test_api_routes_are_set(self): """ server = await self.get_application() - self.assertIs(len(server.router.routes()), 59) + self.assertIs(len(server.router.routes()), 61) async def test_frontend_routes_are_set(self): """Test correct routes are set when frontend folder exists."""