Skip to content

Commit

Permalink
Merge pull request #28 from TranslatorSRI/trapi_validation
Browse files Browse the repository at this point in the history
Trapi validation
  • Loading branch information
maximusunc authored Aug 29, 2024
2 parents 399b00b + eb267ab commit 78ef0ff
Show file tree
Hide file tree
Showing 9 changed files with 141 additions and 77 deletions.
1 change: 1 addition & 0 deletions requirements-runners.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
ARS_Test_Runner==0.2.3
# benchmarks-runner==0.1.3
# ui-test-runner==0.0.2
graph-validation-test-runners==0.1.5
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

setup(
name="sri-test-harness",
version="0.2.5",
version="0.3.0",
author="Max Wang",
author_email="[email protected]",
url="https://github.com/TranslatorSRI/TestHarness",
Expand Down
2 changes: 1 addition & 1 deletion test_harness/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,5 @@ def download_tests(
# test.test_case_type = "acceptance"
# tests = all_tests
# tests = list(filter((lambda x: x for x in all_tests for asset in x.test_assets if asset.output_id), all_tests))
logger.info(f"Passing along {len(test_suite.test_cases)} tests")
logger.info(f"Passing along {len(test_suite.test_cases)} queries")
return test_suite.test_cases
1 change: 1 addition & 0 deletions test_harness/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,4 @@ def setup_logger():
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("matplotlib").setLevel(logging.WARNING)
logging.getLogger("root").setLevel(logging.WARNING)
logging.getLogger("bmt").setLevel(logging.ERROR)
9 changes: 9 additions & 0 deletions test_harness/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from argparse import ArgumentParser
import asyncio
import json
from setproctitle import setproctitle
from urllib.parse import urlparse
from uuid import uuid4

Expand All @@ -12,6 +13,7 @@
from test_harness.reporter import Reporter
from test_harness.slacker import Slacker

setproctitle("TestHarness")
setup_logger()


Expand Down Expand Up @@ -111,6 +113,13 @@ def cli():
help="Have the Test Harness send the test results to the Testing Dashboard",
)

parser.add_argument(
"--trapi_version",
type=str,
default="1.5.0",
help="TRAPI (SemVer) version assumed for testing (1.5.0, if not given)",
)

parser.add_argument(
"--json_output",
action="store_true",
Expand Down
58 changes: 46 additions & 12 deletions test_harness/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import Dict

from ARS_Test_Runner.semantic_test import pass_fail_analysis
from standards_validation_test_runner import StandardsValidationTest

# from benchmarks_runner import run_benchmarks

Expand Down Expand Up @@ -43,7 +44,7 @@ async def run_tests(
)
query_runner = QueryRunner(logger)
logger.info("Runner is getting service registry")
await query_runner.retrieve_registry(trapi_version="1.5.0")
await query_runner.retrieve_registry(trapi_version=args["trapi_version"])
collector = ResultCollector(logger)
# loop over all tests
for test in tqdm(tests.values()):
Expand Down Expand Up @@ -91,7 +92,9 @@ async def run_tests(
"result": {},
}
for agent, response in test_query["responses"].items():
report["result"][agent] = {}
report["result"][agent] = {
"trapi_validation": "NA",
}
agent_report = report["result"][agent]
try:
if response["status_code"] > 299:
Expand All @@ -102,28 +105,59 @@ async def run_tests(
agent_report["message"] = (
f"Status code: {response['status_code']}"
)
continue
elif (
"response" not in response
or "message" not in response["response"]
):
agent_report["status"] = "FAILED"
agent_report["message"] = "Test Error"
elif (
continue
except Exception as e:
logger.warning(
f"Failed to parse basic response fields from {agent}: {e}"
)
try:
svt = StandardsValidationTest(
test_asset=asset,
environment=test.test_env,
component=agent,
trapi_version=args["trapi_version"],
biolink_version="suppress",
runner_settings="Inferred",
)
results = svt.test_case_processor(
trapi_response=response["response"]
)
agent_report["trapi_validation"] = results[
next(iter(results.keys()))
][agent]["status"]
if agent_report["trapi_validation"] == "FAILED":
agent_report["status"] = "FAILED"
agent_report["message"] = "TRAPI Validation Error"
continue
except Exception as e:
logger.warning(f"Failed to run TRAPI validation with {e}")
agent_report["trapi_validation"] = "ERROR"
try:
if (
response["response"]["message"].get("results") is None
or len(response["response"]["message"]["results"]) == 0
):
agent_report["status"] = "DONE"
agent_report["message"] = "No results"
else:
await pass_fail_analysis(
report["result"],
agent,
response["response"]["message"]["results"],
normalized_curies[asset.output_id],
asset.expected_output,
)
continue
await pass_fail_analysis(
report["result"],
agent,
response["response"]["message"]["results"],
normalized_curies[asset.output_id],
asset.expected_output,
)
except Exception as e:
logger.error(f"Failed to run analysis on {agent}: {e}")
logger.error(
f"Failed to run acceptance test analysis on {agent}: {e}"
)
agent_report["status"] = "FAILED"
agent_report["message"] = "Test Error"

Expand Down
8 changes: 6 additions & 2 deletions test_harness/runner/generate_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ def generate_query(test_asset: TestAsset) -> dict:
test_asset.input_id
]
else:
raise Exception("Unsupported input category for MVP1")
raise Exception(
f"Unsupported input category for MVP1: {test_asset.input_category}"
)
# add knowledge_type
if "inferred" in test_asset.test_runner_settings:
query["message"]["query_graph"]["edges"]["t_edge"][
Expand All @@ -88,7 +90,7 @@ def generate_query(test_asset: TestAsset) -> dict:
test_asset.input_id
]
else:
raise Exception("Unsupported input category.")
raise Exception(f"Unsupported input category: {test_asset.input_category}")
# add qualifier constraints
aspect_qualifier, direction_qualifier = get_qualifier_constraints(test_asset)
query["message"]["query_graph"]["edges"]["t_edge"]["qualifier_constraints"][0][
Expand All @@ -102,5 +104,7 @@ def generate_query(test_asset: TestAsset) -> dict:
query["message"]["query_graph"]["edges"]["t_edge"][
"knowledge_type"
] = "inferred"
else:
raise Exception(f"Unsupported predicate: {test_asset.predicate_id}")

return query
134 changes: 74 additions & 60 deletions test_harness/runner/query_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,15 @@ async def run_queries(
asset_hash = hash_test_asset(test_asset)
if asset_hash not in queries:
# generate query
query = generate_query(test_asset)
queries[asset_hash] = {
"query": query,
"responses": {},
"pks": {},
}
try:
query = generate_query(test_asset)
queries[asset_hash] = {
"query": query,
"responses": {},
"pks": {},
}
except Exception as e:
self.logger.warning(e)

# send queries to a single type of component at a time
for component in test_case.components:
Expand Down Expand Up @@ -213,8 +216,8 @@ async def get_ars_responses(
}
async with httpx.AsyncClient(timeout=30) as client:
# retain this response for testing
# res = await client.post(f"{base_url}/ars/api/retain/{parent_pk}")
# res.raise_for_status()
res = await client.post(f"{base_url}/ars/api/retain/{parent_pk}")
res.raise_for_status()
# Get all children queries
res = await client.get(f"{base_url}/ars/api/messages/{parent_pk}?trace=y")
res.raise_for_status()
Expand All @@ -231,64 +234,75 @@ async def get_ars_responses(
self.get_ars_child_response(child_pk, base_url, infores, start_time)
)

child_responses = await asyncio.gather(*child_tasks)
try:
child_responses = await asyncio.gather(*child_tasks)

for child_response in child_responses:
infores, response = child_response
responses[infores] = response
for child_response in child_responses:
infores, response = child_response
responses[infores] = response
except Exception as e:
self.logger.warning(f"Failed to get all child responses: {e}")

# After getting all individual ARA responses, get and save the merged version
current_time = time.time()
while current_time - start_time <= MAX_QUERY_TIME:
async with httpx.AsyncClient(timeout=30) as client:
res = await client.get(
f"{base_url}/ars/api/messages/{parent_pk}?trace=y"
)
res.raise_for_status()
response = res.json()
status = response.get("status")
if status == "Done" or status == "Error":
merged_pk = response.get("merged_version")
if merged_pk is None:
self.logger.error(
f"Failed to get the ARS merged message from pk: {parent_pk}."
)
pks["ars"] = "None"
responses["ars"] = {
"response": {"message": {"results": []}},
"status_code": 410,
}
try:
# After getting all individual ARA responses, get and save the merged version
current_time = time.time()
while current_time - start_time <= MAX_QUERY_TIME:
async with httpx.AsyncClient(timeout=30) as client:
res = await client.get(
f"{base_url}/ars/api/messages/{parent_pk}?trace=y"
)
res.raise_for_status()
response = res.json()
status = response.get("status")
if status == "Done" or status == "Error":
merged_pk = response.get("merged_version")
if merged_pk is None:
self.logger.error(
f"Failed to get the ARS merged message from pk: {parent_pk}."
)
pks["ars"] = "None"
responses["ars"] = {
"response": {"message": {"results": []}},
"status_code": 410,
}
else:
# add final ars pk
pks["ars"] = merged_pk
# get full merged pk
res = await client.get(
f"{base_url}/ars/api/messages/{merged_pk}"
)
res.raise_for_status()
merged_message = res.json()
responses["ars"] = {
"response": merged_message.get("fields", {}).get(
"data", {"message": {"results": []}}
),
"status_code": merged_message.get("fields", {}).get(
"code", 410
),
}
self.logger.info("Got ARS merged message!")
break
else:
# add final ars pk
pks["ars"] = merged_pk
# get full merged pk
res = await client.get(
f"{base_url}/ars/api/messages/{merged_pk}"
)
res.raise_for_status()
merged_message = res.json()
responses["ars"] = {
"response": merged_message.get("fields", {}).get(
"data", {"message": {"results": []}}
),
"status_code": merged_message.get("fields", {}).get(
"code", 410
),
}
self.logger.info("Got ARS merged message!")
break
else:
self.logger.info("ARS merging not done, waiting...")
current_time = time.time()
await asyncio.sleep(10)
else:
self.logger.warning(
f"ARS merging took greater than {MAX_QUERY_TIME / 60} minutes."
)
self.logger.info("ARS merging not done, waiting...")
current_time = time.time()
await asyncio.sleep(10)
else:
self.logger.warning(
f"ARS merging took greater than {MAX_QUERY_TIME / 60} minutes."
)
pks["ars"] = "None"
responses["ars"] = {
"response": {"message": {"results": []}},
"status_code": 598,
}
except Exception as e:
self.logger.warning(f"Failed to get ARS merged message: {e}")
pks["ars"] = "None"
responses["ars"] = {
"response": {"message": {"results": []}},
"status_code": 410,
"status_code": 500,
}

return responses, pks
3 changes: 2 additions & 1 deletion test_harness/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ async def normalize_curies(
response = response.json()
for curie, attrs in response.items():
if attrs is None:
normalized_curies[curie] = "Unknown"
# keep original curie
normalized_curies[curie] = curie
else:
# choose the perferred id
normalized_curies[curie] = attrs["id"]["identifier"]
Expand Down

0 comments on commit 78ef0ff

Please sign in to comment.