From 25843a6ea15bc9c22c078eb8d4ea85475b760e88 Mon Sep 17 00:00:00 2001 From: BOsteen Date: Thu, 3 Aug 2023 10:32:42 -0700 Subject: [PATCH 1/2] RDF PATCH logic implemented --- source/web-service/flaskapp/__init__.py | 63 +++- source/web-service/flaskapp/errors.py | 15 +- source/web-service/flaskapp/routes/records.py | 337 +++++++++++++++++- .../flaskapp/storage_utilities/graph.py | 3 +- .../flaskapp/storage_utilities/record.py | 4 +- source/web-service/flaskapp/utilities.py | 6 + 6 files changed, 417 insertions(+), 11 deletions(-) diff --git a/source/web-service/flaskapp/__init__.py b/source/web-service/flaskapp/__init__.py index af33c1f5..9630e522 100644 --- a/source/web-service/flaskapp/__init__.py +++ b/source/web-service/flaskapp/__init__.py @@ -4,7 +4,7 @@ from flaskapp.logging_configuration import get_logging_config import json -from datetime import datetime +from datetime import datetime, timedelta import sqlite3 from flask import Flask, Response @@ -62,7 +62,7 @@ def create_app(): app.config["DEBUG_LEVEL"] = getenv("DEBUG_LEVEL", "INFO") app.config["FLASK_ENV"] = getenv("FLASK_ENV", "production") - app.logger.info(f"LOD Gateway logging INFO at level {app.config['DEBUG_LEVEL']}") + app.logger.info(f"LOD Gateway logging at level {app.config['DEBUG_LEVEL']}") CORS(app, send_wildcard=True) # Setup global configuration @@ -87,6 +87,7 @@ def create_app(): app.config["PROCESS_RDF"] = False if environ.get("PROCESS_RDF", "False").lower() == "true": app.config["PROCESS_RDF"] = True + app.logger.info(f"RDF Processing functionality is enabled.") app.config["SPARQL_QUERY_ENDPOINT"] = environ["SPARQL_QUERY_ENDPOINT"] app.config["SPARQL_UPDATE_ENDPOINT"] = environ["SPARQL_UPDATE_ENDPOINT"] @@ -99,6 +100,12 @@ def create_app(): environ.get("USE_PYLD_REFORMAT", "true").lower() == "true" ) + app.logger.info( + "Using PyLD for parsing/reserialization" + if app.config["USE_PYLD_REFORMAT"] + else "Using RDFLib for parsing/reserialization" + ) + # set up a default RDF context cache? doccache_default_expiry = int(environ.get("RDF_CONTEXT_CACHE_EXPIRES", 30)) app.config["RDF_DOCLOADER"] = document_loader( @@ -120,6 +127,8 @@ def create_app(): app.logger.error( f"The data in ENV: 'RDF_CONTEXT_CACHE' is not JSON! Will not load presets." ) + else: + app.logger.info(f"RDF Processing functionality is disabled.") # Setting the limit on number of records returned due to a glob browse request try: @@ -127,6 +136,23 @@ def create_app(): except (ValueError, TypeError) as e: app.config["BROWSE_PAGE_SIZE"] = 200 + # PATCH_UPDATE_THRESHOLD + app.config["PATCH_UPDATE_THRESHOLD"] = None + if patch_threshold := environ.get("PATCH_UPDATE_THRESHOLD"): + try: + patch_threshold = int(patch_threshold) + app.config["PATCH_UPDATE_THRESHOLD"] = timedelta(minutes=patch_threshold) + except TypeError.ValueError as e: + app.logger.warning( + f"Value in the PATCH_UPDATE_THRESHOLD env var was not understood as a number" + ) + + app.logger.info( + f"RDF GRAPH PATCH enabled, edit threshold is set to a duration of {app.config['PATCH_UPDATE_THRESHOLD']}" + if app.config["PATCH_UPDATE_THRESHOLD"] + else "RDF GRAPH PATCH is disabled" + ) + app.config["JSON_AS_ASCII"] = False app.config["FLASK_GZIP_COMPRESSION"] = environ["FLASK_GZIP_COMPRESSION"] app.config["PREFIX_RECORD_IDS"] = getenv("PREFIX_RECORD_IDS", default="RECURSIVE") @@ -147,19 +173,43 @@ def create_app(): if environ.get("LINK_HEADER_PREV_VERSION", "False").lower() == "true": app.config["LINK_HEADER_PREV_VERSION"] = True + app.logger.info( + "Versioning enabled" + if app.config["KEEP_LAST_VERSION"] + else "Versioning is disabled" + ) + app.config["SUBADDRESSING"] = False if environ.get("SUBADDRESSING", "False").lower() == "true": app.config["SUBADDRESSING"] = True - if environ.get("SUBADDRESSING_DEPTH") is not None: + app.config["SUBADDRESSING_MAX_PARTS"] = 4 + app.config["SUBADDRESSING_MIN_PARTS"] = 4 + + if environ.get("SUBADDRESSING_MAX_PARTS") is not None: + try: + app.config["SUBADDRESSING_MAX_PARTS"] = int( + environ.get("SUBADDRESSING_MAX_PARTS") + ) + except (ValueError, TypeError) as e: + app.logger.error( + f"Value for SUBADDRESSING_MAX_PARTS could not be interpreted as an integer. Ignoring." + ) + if environ.get("SUBADDRESSING_MIN_PARTS") is not None: try: - app.config["SUBADDRESSING_DEPTH"] = int( - environ.get("SUBADDRESSING_DEPTH") + app.config["SUBADDRESSING_MIN_PARTS"] = int( + environ.get("SUBADDRESSING_MIN_PARTS") ) except (ValueError, TypeError) as e: app.logger.error( - f"Value for SUBADDRESSING_DEPTH could not be interpreted as an integer. Ignoring." + f"Value for SUBADDRESSING_MIN_PARTS could not be interpreted as an integer. Ignoring." ) + app.logger.info( + f"Subaddressing enabled - search depth up to (app.config['SUBADDRESSING_MAX_PARTS']" + if app.config["SUBADDRESSING"] + else "Subaddressing disabled" + ) + if app.config["FLASK_ENV"].lower() == "development": app.config["SQLALCHEMY_ECHO"] = True @@ -196,6 +246,7 @@ def create_app(): ("FULL_BASE_GRAPH", "Base Graph"), ("SUBADDRESSING", "Subaddressing"), ("KEEP_LAST_VERSION", "Versioning"), + ("PATCH_UPDATE_THRESHOLD", "RDF Graph PATCH"), ] if app.config.get(k) ] diff --git a/source/web-service/flaskapp/errors.py b/source/web-service/flaskapp/errors.py index a7d8fa61..4f471481 100644 --- a/source/web-service/flaskapp/errors.py +++ b/source/web-service/flaskapp/errors.py @@ -10,6 +10,9 @@ status_nt = namedtuple("name", "code title detail") status_ok = status_nt(200, "OK", "OK") +status_not_modified = status_nt( + 304, "Not Modified", "No change was made to the resource." +) status_bad_auth_header = status_nt( 400, "Bad Authorization Header", "Syntax of Authorization header is invalid" @@ -31,6 +34,17 @@ 405, "Forbidden Method", "For the requested URL only 'POST' method is allowed" ) +status_patch_method_not_allowed = status_nt( + 405, "Method Not allowed", "This is not a valid target for a PATCH method" +) + +status_patch_request_unparsable = status_nt( + 400, + "Bad Request", + "This is not a valid request for the PATCH method. Requires a JSON body," + " with add and/or delete keys, and a format key indicating the correct RDF serialization type", +) + status_wrong_syntax = status_nt(422, "Invalid JSON", "Could not parse JSON record") status_id_missing = status_nt(422, "ID Missing", "ID for the JSON record not found") @@ -52,7 +66,6 @@ # Construct 'error response' object def construct_error_response(status, source=None): - err = {} err["status"] = status.code diff --git a/source/web-service/flaskapp/routes/records.py b/source/web-service/flaskapp/routes/records.py index 56000175..2b346270 100644 --- a/source/web-service/flaskapp/routes/records.py +++ b/source/web-service/flaskapp/routes/records.py @@ -16,18 +16,36 @@ from flaskapp.models import db from flaskapp.models.record import Record, Version from flaskapp.models.activity import Activity +from flaskapp.storage_utilities.record import ( + validate_record_set, + get_record, + record_delete, + record_create, + record_update, + process_activity, +) +from flaskapp.storage_utilities.graph import ( + graph_expand, + graph_replace, +) from flaskapp.utilities import ( format_datetime, + Event, containerRecursiveCallback, idPrefixer, + idPrefixRemover, is_ntriples, triples_to_quads, ) +from flaskapp.base_graph_utils import DEFAULT_BASE_GRAPH from flaskapp.errors import ( construct_error_response, status_record_not_found, status_page_not_found, + status_patch_method_not_allowed, + status_patch_request_unparsable, status_ok, + status_not_modified, ) from flaskapp.utilities import checksum_json @@ -222,7 +240,324 @@ def subaddressing_search(entity_id): return (None, None) -@records.route("/") +def _prefix_json_ld(json_ld): + hostPrefix = current_app.config["BASE_URL"] + idPrefix = hostPrefix + "/" + current_app.config["NAMESPACE"] + + attr = "@id" if "@id" in json_ld["@graph"] else "id" + + return containerRecursiveCallback( + data=json_ld, + attr=attr, + callback=idPrefixer, + prefix=idPrefix, + recursive=False if current_app.config["PREFIX_RECORD_IDS"] == "TOP" else True, + ) + + +def _remove_prefix_json_ld(json_ld): + hostPrefix = current_app.config["BASE_URL"] + idPrefix = hostPrefix + "/" + current_app.config["NAMESPACE"] + if not idPrefix.endswith("/"): + idPrefix += "/" + + attr = "@id" if "@id" in json_ld["@graph"] else "id" + + return containerRecursiveCallback( + data=json_ld, + attr=attr, + callback=idPrefixRemover, + prefix=idPrefix, + recursive=False if current_app.config["PREFIX_RECORD_IDS"] == "TOP" else True, + ) + + +def _parse_to_nt(json_obj, rdf_format): + if rdf_format in ["jsonld", "json-ld"]: + # prefix to make FQDN + if "@graph" not in json_obj.keys(): + current_app.logger.error( + f"The JSON-LD formatted PATCH request did not express it as a @graph - rejecting" + ) + return + data = _prefix_json_ld(json_obj) + g = ConjunctiveGraph() + g.parse(data=data, format="json-ld") + if len(g) == 0: + current_app.logger.error( + f"PATCH JSON-LD did not render down to any triples - rejecting" + ) + return None + return g.serialize(format="nt11") + elif rdf_format in ["nt", "nt11"]: + # skip roundtripping this + if len(json_obj.strip()) > 0: + return json_obj + else: + current_app.logger.error( + f"PATCH n-triples was empty or non-existent - rejecting" + ) + return None + else: + g = ConjunctiveGraph() + g.parse(data=data, format=rdf_format) + if len(g) == 0: + current_app.logger.error( + f"PATCH {rdf_format} RDF did not render down to any triples - rejecting" + ) + return None + return g.serialize(format="nt11") + + +def _generate_patch_update(json_obj): + # Expecting a JSON object with at least one of "add" or "delete" keys, with a "format" indicator + # create a valid SPARQL Update request using it and return the string + # If the request cannot be parsed, return None + if "format" not in json_obj: + current_app.logger.debug(f"No 'format' value was present in the PATCH request") + return + + match json_obj: + case {"add": add, "format": rdf_format} | { + "add": add, + "format": rdf_format, + "update_graphstore": _, + }: + # add only: + current_app.logger.debug( + f"Attempting to parse 'add' PATCH request data in {rdf_format} format" + ) + if add_nt := _parse_to_nt(add, rdf_format): + return "INSERT DATA {\n" + add_nt + "\n};" + else: + return + case {"delete": delete, "format": rdf_format} | { + "delete": delete, + "format": rdf_format, + "update_graphstore": _, + }: + # delete only: + current_app.logger.debug( + f"Attempting to parse 'delete' PATCH request data in {rdf_format} format" + ) + if delete_nt := _parse_to_nt(delete, rdf_format): + return "DELETE DATA {\n" + delete_nt + "\n};" + else: + return + case {"add": add, "delete": delete, "format": rdf_format} | { + "add": add, + "delete": delete, + "format": rdf_format, + "update_graphstore": _, + }: + # add AND delete: + current_app.logger.debug( + f"Attempting to parse 'delete' portion of delete->add PATCH request data in {rdf_format} format" + ) + delete_nt = _parse_to_nt(delete, rdf_format) + + current_app.logger.debug( + f"Attempting to parse 'add' portion of delete->add PATCH request data in {rdf_format} format" + ) + add_nt = _parse_to_nt(add, rdf_format) + + if delete_nt is not None and add_nt is not None: + return ( + "DELETE DATA {\n" + + delete_nt + + "\n};\nINSERT DATA {\n" + + add_nt + + "\n};" + ) + else: + return + + +def _update_beyond_threshold(record): + # Get the newest activity-stream created date, and see if it is more than + # PATCH_UPDATE_THRESHOLD timedelta in the past. Returns True is so. + # We need the DB TIME, not the local server time! + if dt := current_app.config["PATCH_UPDATE_THRESHOLD"]: + results = db.session.execute( + f"SELECT now()::timestamp, activities.datetime_created FROM activities " + f"WHERE activities.record_id = {record.id} " + f"ORDER BY activities.id DESC " + f"LIMIT 1;" + ) + if results.rowcount != 1: + current_app.logger.debug( + f"Couldn't find a latest activity record for {record.entity_id} - should make one!" + ) + return True + else: + now, then = next(results) + # Is the difference between now and the latest event more than the patch threshold? + return (now - then) > dt + else: + current_app.logger.warning( + "PATCH_UPDATE_THRESHOLD is disabled - all PATCH changes will be logged" + ) + return True + + +@records.route("/", methods=["PATCH"]) +def entity_record_patch(entity_id): + hostPrefix = current_app.config["BASE_URL"] + idPrefix = hostPrefix + "/" + current_app.config["NAMESPACE"] + graph_uri = idPrefix + entity_id + + current_app.logger.debug(f"{entity_id} PATCH - Profiling started 0.0000000") + profile_time = time.perf_counter() + + if entity_id.endswith("*") or "-VERSION-" in entity_id: + response = construct_error_response(status_patch_method_not_allowed) + return abort(response) + + # Validate request: + if not request.is_json: + current_app.logger.error( + f"No JSON body to the PATCH endpoint for {entity_id} - rejecting" + ) + response = construct_error_response(status_patch_request_unparsable) + return abort(response) + + patch_request = request.get_json() + + # Update graphstore? + update_graphstore = patch_request.get("update_graphstore") == True + + current_app.logger.debug( + f"{entity_id} - STARTED PATCH Update string at {time.perf_counter() - profile_time}" + ) + patch_update = _generate_patch_update(patch_request) + current_app.logger.debug( + f"{entity_id} - FINISHED PATCH Update string at {time.perf_counter() - profile_time}" + ) + + if patch_update is None: + response = construct_error_response(status_patch_request_unparsable) + return abort(response) + + with db.session.no_autoflush: + try: + current_app.logger.debug( + f"{entity_id} - STARTED db look up at {time.perf_counter() - profile_time}" + ) + + record = ( + db.session.query(Record) + .filter(Record.entity_id == entity_id) + .options(defer(Record.data)) + .limit(1) + .first() + ) + current_app.logger.debug( + f"{entity_id} - FINISHED db look up at {time.perf_counter() - profile_time}" + ) + new_record = False + EMPTY = { + "@id": None, + "@graph": [], + } + if record is None: + current_app.logger.warning( + f"{entity_id} not found in db - creating blank graph resource" + ) + base = EMPTY.copy() + base["@id"] = entity_id + record = Record.query.get(record_create(base, commit=True)) + new_record = True + elif record.data is None: + current_app.logger.warning( + f"{entity_id} was deleted - creating blank graph resource inside" + ) + base = EMPTY.copy() + base["@id"] = entity_id + record_update(record, base) + new_record = True + + # inflate relative URIs + data = _prefix_json_ld(record.data) + original_checksum = record.checksum + + # Perform parse + current_app.logger.debug( + f"{entity_id} - STARTED graph parse at {time.perf_counter() - profile_time}" + ) + g = get_bound_graph(data["@id"]) + g.parse(data=data, format="json-ld") + orig_len = len(g) + # Perform update + current_app.logger.info( + f"{entity_id} - STARTED graph update at {time.perf_counter() - profile_time}" + ) + g.update(patch_update) + current_app.logger.info( + f"{entity_id} - FINISHED graph parse + update at {time.perf_counter() - profile_time}" + ) + new_len = len(g) + current_app.logger.info( + f"{entity_id} - PATCH Δtriples: {new_len - orig_len}, total: {new_len}" + ) + + data = json.loads(g.serialize(format="json-ld", auto_compact=True)) + + # Deflate uri prefixes! + data = _remove_prefix_json_ld(data) + data["@id"] = entity_id + + checksum = checksum_json(data) + # any change? + if original_checksum != checksum: + if new_record is True: + # add activity regardless + record_update(record, data, version=False) + process_activity(record.id, Event.Create) + else: + # Update existing record with optional activity-stream update based on time + if _update_beyond_threshold(record) is True: + record_update(record, data) + process_activity(record.id, Event.Update) + else: + # Too soon after the last event/version. + # No activity event, and do not create a new version either: + record_update(record, data, version=False) + + # update triplestore: + if update_graphstore is True: + current_app.logger.debug( + f"{entity_id} - STARTED GRAPHSTORE SYNC {time.perf_counter() - profile_time}" + ) + current_app.config["SPARQL_UPDATE_ENDPOINT"] + current_app.logger.debug( + f"{entity_id} - FINISHED GRAPHSTORE SYNC {time.perf_counter() - profile_time}" + ) + + db.session.commit() + current_app.logger.info( + f"{entity_id} - FINISHED graph RESERIALIZATION and storage at {time.perf_counter() - profile_time}" + ) + + return ( + jsonify( + { + "entity_id": entity_id, + "updated": new_len - orig_len, + "total_triples": new_len, + } + ), + 201 if new_record else 200, + ) + else: + # no-op... + return jsonify({"code": 304, "title": "Not Modified"}), 304 + + except: + db.session.rollback() + raise + + +@records.route("/", methods=["GET", "POST", "OPTIONS"]) def entity_record(entity_id): """GET the record that exactly matches the entity_id, or if the entity_id ends with a '*', treat it as a wildcard search for items in the LOD Gateway""" diff --git a/source/web-service/flaskapp/storage_utilities/graph.py b/source/web-service/flaskapp/storage_utilities/graph.py index e2427bd1..89c091f8 100644 --- a/source/web-service/flaskapp/storage_utilities/graph.py +++ b/source/web-service/flaskapp/storage_utilities/graph.py @@ -165,7 +165,8 @@ def graph_expand(data, proc=None): f"No suitable quads or triples were parsed from the supplied JSON-LD. Is {json_ld_id} actually JSON-LD?" ) return False - return g.serialize(format="nquads") + # export as ntriples + return g.serialize(format="nt11") def graph_replace(graph_name, serialized_nt, update_endpoint): diff --git a/source/web-service/flaskapp/storage_utilities/record.py b/source/web-service/flaskapp/storage_utilities/record.py index 7da0a3e1..da251a67 100644 --- a/source/web-service/flaskapp/storage_utilities/record.py +++ b/source/web-service/flaskapp/storage_utilities/record.py @@ -70,8 +70,8 @@ def record_create(input_rec, commit=False): # Do not return anything. Calling function has all the info -def record_update(db_rec, input_rec, commit=False): - if current_app.config["KEEP_LAST_VERSION"] is True: +def record_update(db_rec, input_rec, version=True, commit=False): + if current_app.config["KEEP_LAST_VERSION"] is True and version is True: # Versioning current_app.logger.info( f"Versioning enabled: archiving a copy of {db_rec.entity_id} and replacing current with new data." diff --git a/source/web-service/flaskapp/utilities.py b/source/web-service/flaskapp/utilities.py index 1d7b2baf..070784d9 100644 --- a/source/web-service/flaskapp/utilities.py +++ b/source/web-service/flaskapp/utilities.py @@ -210,6 +210,12 @@ def generalModify(key, value, find=None, replace=None, prefix=None, suffix=None) return data +def idPrefixRemover(attr, value, prefix=None, **kwargs): + """Helper callback method to remove prefixes from JSON-LD document 'id' attributes""" + + return value.removeprefix(prefix) if prefix else value + + def idPrefixer(attr, value, prefix=None, **kwargs): """Helper callback method to prefix non-prefixed JSON-LD document 'id' attributes""" From a9634950d1690a5d9ed848498308fea24ffc7409 Mon Sep 17 00:00:00 2001 From: BOsteen Date: Thu, 3 Aug 2023 10:55:17 -0700 Subject: [PATCH 2/2] Typo on the subaddressing defaults --- source/web-service/flaskapp/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/web-service/flaskapp/__init__.py b/source/web-service/flaskapp/__init__.py index 9630e522..f5676d0c 100644 --- a/source/web-service/flaskapp/__init__.py +++ b/source/web-service/flaskapp/__init__.py @@ -183,7 +183,7 @@ def create_app(): if environ.get("SUBADDRESSING", "False").lower() == "true": app.config["SUBADDRESSING"] = True app.config["SUBADDRESSING_MAX_PARTS"] = 4 - app.config["SUBADDRESSING_MIN_PARTS"] = 4 + app.config["SUBADDRESSING_MIN_PARTS"] = 1 if environ.get("SUBADDRESSING_MAX_PARTS") is not None: try: