Skip to content

Commit

Permalink
Merge pull request #95 from TranslatorSRI/trapi-1.4
Browse files Browse the repository at this point in the history
plater 1.4
  • Loading branch information
EvanDietzMorris authored May 26, 2023
2 parents 5fcecff + 4c55a9a commit 048961c
Show file tree
Hide file tree
Showing 10 changed files with 193 additions and 164 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
ghcr.io/${{ github.repository }}
- name: Extract metadata (tags, labels) for Docker
id: meta_clustered_image
uses: docker/metadata-action@98669ae865ea3cffbcbaa878cf57c20bbf1c6c38
uses: docker/metadata-action@9ec57ed1fcdbf14dcef7dfbe97b2010124a938b7
with:
images:
ghcr.io/${{ github.repository }}-clustered
Expand Down
8 changes: 4 additions & 4 deletions PLATER/neo4j-docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
FROM neo4j:4.4.10
FROM neo4j:5.3.0

ARG GDS_VERSION=2.1.9
ARG APOC_VERSION=4.4.0.8
ENV APOC_URI https://github.com/neo4j-contrib/neo4j-apoc-procedures/releases/download/${APOC_VERSION}/apoc-${APOC_VERSION}-all.jar
ARG GDS_VERSION=2.2.6
ARG APOC_VERSION=5.3.0
ENV APOC_URI https://github.com/neo4j-contrib/neo4j-apoc-procedures/releases/download/${APOC_VERSION}/apoc-${APOC_VERSION}-extended.jar
ENV GDS_URL https://graphdatascience.ninja/neo4j-graph-data-science-${GDS_VERSION}.zip

RUN apt-get update
Expand Down
5 changes: 3 additions & 2 deletions PLATER/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ pytest==6.2.4
pytest-asyncio==0.15.1
starlette==0.20.4
uvicorn==0.14.0
reasoner-transpiler==1.11.11
reasoner-pydantic==3.0.1
reasoner-transpiler==2.0.1
reasoner-pydantic==4.0.5
httpx==0.18.2
pytest-httpx==0.12.0
jsonasobj==1.3.1
Expand All @@ -15,3 +15,4 @@ pydantic>=1.8
opentelemetry-sdk
opentelemetry-exporter-jaeger
opentelemetry-instrumentation-fastapi
opentelemetry-instrumentation-httpx
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
from PLATER.services.util.overlay import Overlay
from PLATER.services.util.api_utils import get_graph_interface, get_graph_metadata, construct_open_api_schema, get_example

# Mount open api at /1.2/openapi.json
APP_TRAPI_1_3 = FastAPI(openapi_url="/openapi.json", docs_url="/docs", root_path='/1.3')
# Mount open api at /1.4/openapi.json
APP_TRAPI_1_4 = FastAPI(openapi_url="/openapi.json", docs_url="/docs", root_path='/1.4')


async def get_meta_knowledge_graph(
Expand Down Expand Up @@ -58,7 +58,7 @@ async def reasoner_api(
return request_json


APP_TRAPI_1_3.add_api_route(
APP_TRAPI_1_4.add_api_route(
"/meta_knowledge_graph",
get_meta_knowledge_graph,
methods=["GET"],
Expand All @@ -68,7 +68,7 @@ async def reasoner_api(
tags=["trapi"]
)

APP_TRAPI_1_3.add_api_route(
APP_TRAPI_1_4.add_api_route(
"/sri_testing_data",
get_sri_testing_data,
methods=["GET"],
Expand All @@ -79,7 +79,7 @@ async def reasoner_api(
tags=["trapi"]
)

APP_TRAPI_1_3.add_api_route(
APP_TRAPI_1_4.add_api_route(
"/query",
reasoner_api,
methods=["POST"],
Expand All @@ -89,4 +89,4 @@ async def reasoner_api(
tags=["trapi"]
)

APP_TRAPI_1_3.openapi_schema = construct_open_api_schema(app=APP_TRAPI_1_3, trapi_version="1.3.0", prefix='/1.3')
APP_TRAPI_1_4.openapi_schema = construct_open_api_schema(app=APP_TRAPI_1_4, trapi_version="1.4.0", prefix='/1.4')
33 changes: 28 additions & 5 deletions PLATER/services/server.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
"""FastAPI app."""
import os
import logging, warnings, os, json

from fastapi import FastAPI
from starlette.middleware.cors import CORSMiddleware
from PLATER.services.config import config
from PLATER.services.util.logutil import LoggingUtil
from PLATER.services.app_common import APP_COMMON
from PLATER.services.app_trapi_1_3 import APP_TRAPI_1_3
from PLATER.services.app_trapi_1_4 import APP_TRAPI_1_4
from PLATER.services.util.api_utils import construct_open_api_schema

TITLE = config.get('PLATER_TITLE', 'Plater API')
Expand All @@ -22,13 +22,13 @@

APP = FastAPI()

# Mount 1.2 app at /1.2
APP.mount('/1.3', APP_TRAPI_1_3, 'Trapi 1.3')
# Mount 1.4 app at /1.4
APP.mount('/1.4', APP_TRAPI_1_4, 'Trapi 1.4')
# Mount default app at /
APP.mount('/', APP_COMMON, '')
# Add all routes of each app for open api generation at /openapi.json
# This will create an aggregate openapi spec.
APP.include_router(APP_TRAPI_1_3.router, prefix='/1.3')
APP.include_router(APP_TRAPI_1_4.router, prefix='/1.4')
APP.include_router(APP_COMMON.router)
# Construct app /openapi.json # Note this is not to be registered on smart api . Instead /1.1/openapi.json
# or /1.2/openapi.json should be used.
Expand All @@ -50,6 +50,13 @@
from opentelemetry.sdk.resources import SERVICE_NAME as telemetery_service_name_key, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor

# httpx connections need to be open a little longer by the otel decorators
# but some libs display warnings of resource being unclosed.
# these supresses such warnings.
logging.captureWarnings(capture=True)
warnings.filterwarnings("ignore", category=ResourceWarning)
service_name = os.environ.get('PLATER_TITLE', 'PLATER')
assert service_name and isinstance(service_name, str)
trace.set_tracer_provider(
Expand All @@ -67,6 +74,22 @@
tracer = trace.get_tracer(__name__)
FastAPIInstrumentor.instrument_app(APP, tracer_provider=trace, excluded_urls=
"docs,openapi.json") #,*cypher,*1.3/sri_testing_data")
async def request_hook(span, request):
# logs cypher queries set to neo4j
# check url
if span.attributes.get('http.url').endswith('/db/data/transaction/commit'):
# if url matches try to json load the query
try:
neo4j_query = json.loads(
request.stream._stream.decode('utf-8')
)['statements'][0]['statement']
span.set_attribute('cypher', neo4j_query)
except Exception as ex:
logger.error(f"error logging neo4j query when sending to OTEL: {ex}")
neo4j_query = ""
HTTPXClientInstrumentor().instrument(request_hook=request_hook)


if __name__ == '__main__':
import uvicorn
uvicorn.run(APP, host='0.0.0.0', port=8080)
4 changes: 2 additions & 2 deletions PLATER/services/util/api_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def get_bl_helper():

def construct_open_api_schema(app, trapi_version, prefix=""):
plater_title = config.get('PLATER_TITLE', 'Plater API')
plater_version = os.environ.get('PLATER_VERSION', '1.3.0-13')
plater_version = os.environ.get('PLATER_VERSION', '1.4.0-0')
server_url = os.environ.get('PUBLIC_URL', '')
if app.openapi_schema:
return app.openapi_schema
Expand All @@ -53,7 +53,7 @@ def construct_open_api_schema(app, trapi_version, prefix=""):
terms_of_service = open_api_extended_spec.get("termsOfService")
servers_conf = open_api_extended_spec.get("servers")
tags = open_api_extended_spec.get("tags")
title_override = (open_api_extended_spec.get("title") or plater_title) + f' (trapi v-{trapi_version})'
title_override = (open_api_extended_spec.get("title") or plater_title)
description = open_api_extended_spec.get("description")
x_trapi_extension = open_api_extended_spec.get("x-trapi", {"version": trapi_version, "operations": ["lookup"]})
if tags:
Expand Down
98 changes: 60 additions & 38 deletions PLATER/services/util/question.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,45 @@ def compile_cypher(self, **kwargs):
item['qualifier_type_id'] = item['qualifier_type_id'].replace('biolink:', '')
return get_query(query_graph, **kwargs)

# @staticmethod

def _construct_sources_tree(self, sources):
# if primary source and aggregator source are specified in the graph, upstream_resource_ids of all aggregator_ks
# be that source

# if aggregator ks are coming from db, plater would add itself as aggregator and use other aggregator ids
# as upstream resources, if no aggregators are found and only primary ks is provided that would be added
# as upstream for the plater entry
formatted_sources = []
# filter out source entries that actually have values
temp = {}
for source in sources:
if not source['resource_id']:
continue
temp[source['resource_role']] = temp.get(source['resource_role'], set())
if isinstance(source["resource_id"], str):
temp[source["resource_role"]].add(source["resource_id"])
elif isinstance(source["resource_id"], list):
for resource_id in source["resource_id"]:
temp[source["resource_role"]].add(resource_id)

for resource_role in temp:
upstreams = None
if resource_role == "biolink:aggregator_knowledge_source":
upstreams = temp.get("biolink:primary_knowledge_source", None)

formatted_sources += [
{"resource_id": resource_id, "resource_role": resource_role.lstrip('biolink:'), "upstream_resource_ids": upstreams}
for resource_id in temp[resource_role]
]
upstreams_for_plater_entry = temp.get("biolink:aggregator_knowledge_source") or temp.get("biolink:primary_knowledge_source")
formatted_sources.append({
"resource_id":self.provenance,
"resource_role": "aggregator_knowledge_source",
"upstream_resource_ids": upstreams_for_plater_entry
})
return formatted_sources


def format_attribute_trapi(self, kg_items, node=False):
for identifier in kg_items:
# get the properties for the record
Expand Down Expand Up @@ -104,32 +142,9 @@ def format_attribute_trapi(self, kg_items, node=False):
if attribute_data:
attr.update(attribute_data)

# update edge provenance with automat infores
# update edge provenance with automat infores, filter empty ones, expand list type resource ids
if not node:
found_previous_aggregator = False
for attribute in new_attribs:
if attribute.get('attribute_type_id') == "biolink:primary_knowledge_source":
# setting this to self provenance (eg. infores:automat-biolink).
attribute['attribute_source'] = self.provenance
elif attribute.get('attribute_type_id') == "biolink:aggregator_knowledge_source":
found_previous_aggregator = True
attribute['attribute_source'] = self.provenance
if isinstance(attribute['value'], str):
attribute['value'] = [attribute['value']]
attribute['value'].append(self.provenance) # add automat infores
attribute['value'] = list(set(attribute['value'])) # force uniqueness

# create aggregator provenance attribute for plater if not present
if not found_previous_aggregator:
provenance_attrib = {
"attribute_type_id": "biolink:aggregator_knowledge_source",
"attribute_source": self.provenance,
"value": [self.provenance],
"value_type_id": "biolink:InformationResource",
"original_attribute_name": "biolink:aggregator_knowledge_source"
}
new_attribs.append(provenance_attrib)

kg_items[identifier]["sources"] = self._construct_sources_tree(kg_items[identifier].get("sources", []))
# assign these attribs back to the original attrib list without the core properties
props['attributes'] = new_attribs

Expand All @@ -138,6 +153,10 @@ def format_attribute_trapi(self, kg_items, node=False):
def transform_attributes(self, trapi_message, graph_interface: GraphInterface):
self.format_attribute_trapi(trapi_message.get('knowledge_graph', {}).get('nodes', {}), node=True)
self.format_attribute_trapi(trapi_message.get('knowledge_graph', {}).get('edges', {}))
for r in trapi_message.get("results", []):
# add resource id
for analyses in r["analyses"]:
analyses["resource_id"] = self.provenance
return trapi_message

async def answer(self, graph_interface: GraphInterface):
Expand All @@ -146,7 +165,7 @@ async def answer(self, graph_interface: GraphInterface):
:param graph_interface: interface for neo4j
:return: None
"""
cypher = self.compile_cypher(**{"use_hints": True, "relationship_id": "internal"})
cypher = self.compile_cypher(**{"use_hints": True, "relationship_id": "internal", "primary_ks_required": True})
logger.info(f"answering query_graph: {json.dumps(self._question_json)}")
logger.debug(f"cypher: {cypher}")
s = time.time()
Expand Down Expand Up @@ -181,8 +200,9 @@ def apply_attribute_constraints(message):
for node in r['node_bindings'][q_id]:
constrained_node_ids[node['id']] = node_constraints[q_id]
for q_id in edge_constraints.keys():
for edge in r['edge_bindings'][q_id]:
constrained_edge_ids[edge['id']] = edge_constraints[q_id]
for analyses in r['analyses']:
for edge in analyses.get('edge_bindings', {}).get(q_id, []):
constrained_edge_ids[edge['id']] = edge_constraints[q_id]
# mark nodes for deletion
nodes_to_filter = set()
for node_id in constrained_node_ids:
Expand Down Expand Up @@ -227,20 +247,22 @@ def apply_attribute_constraints(message):
# if node bindings are empty for a q_id skip the whole result
if skip_result:
continue
new_edge_bindings = {}
for q_id, binding in result['edge_bindings'].items():
binding_new = [x for x in binding if x['id'] not in edges_to_filter]
# if this list is empty well, skip the whole result
if not binding_new:
skip_result = True
break
new_edge_bindings[q_id] = binding_new
for analysis in result["analyses"]:
new_edge_bindings = {}
for q_id, binding in analysis["edge_bindings"].items():
binding_new = [x for x in binding if x['id'] not in edges_to_filter]
# if this list is empty well, skip the whole result
if not binding_new:
skip_result = True
break
new_edge_bindings[q_id] = binding_new
analysis["edge_bindings"] = new_edge_bindings
# if edge bindings are empty for a q_id skip the whole result
if skip_result:
continue
filtered_bindings.append({
"node_bindings": new_node_bindings,
"edge_bindings": new_edge_bindings
"analyses": result["analyses"]
})

return {
Expand Down
Loading

0 comments on commit 048961c

Please sign in to comment.