Skip to content

Commit

Permalink
Merge pull request #25 from TranslatorSRI/better_better_query_runner
Browse files Browse the repository at this point in the history
Better better query runner
  • Loading branch information
maximusunc authored Aug 16, 2024
2 parents 2736c64 + 7b43ed2 commit 399b00b
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 61 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

setup(
name="sri-test-harness",
version="0.2.4",
version="0.2.5",
author="Max Wang",
author_email="[email protected]",
url="https://github.com/TranslatorSRI/TestHarness",
Expand Down
2 changes: 1 addition & 1 deletion test_harness/result_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def collect_result(
for agent in self.agents
)
pk_url = (
f"https://arax.ncats.io/?r={parent_pk}" if parent_pk is not None else ""
f"https://arax.ci.ncats.io/?r={parent_pk}" if parent_pk is not None else ""
)
self.csv += (
f""""{asset.name}",{url},{pk_url},{test.id},{asset.id},{agent_results}\n"""
Expand Down
63 changes: 40 additions & 23 deletions test_harness/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async def run_tests(
) -> Dict:
"""Send tests through the Test Runners."""
start_time = time.time()
logger.info(f"Running {len(tests)} tests...")
logger.info(f"Running {len(tests)} queries...")
full_report = {
"PASSED": 0,
"FAILED": 0,
Expand All @@ -38,7 +38,7 @@ async def run_tests(
env = "None"
await slacker.post_notification(
messages=[
f"Running {args['suite']} ({sum([len(test.test_assets) for test in tests.values()])} tests)...\n<{reporter.base_path}/test-runs/{reporter.test_run_id}|View in the Information Radiator>"
f"Running {args['suite']} ({sum([len(test.test_assets) for test in tests.values()])} tests, {len(tests.values())} queries)...\n<{reporter.base_path}/test-runs/{reporter.test_run_id}|View in the Information Radiator>"
]
)
query_runner = QueryRunner(logger)
Expand All @@ -54,7 +54,7 @@ async def run_tests(
logger.warning(f"Test has missing required fields: {test.id}")
continue

query_responses = await query_runner.run_queries(test)
query_responses, normalized_curies = await query_runner.run_queries(test)
if test.test_case_objective == "AcceptanceTest":
test_ids = []

Expand All @@ -72,6 +72,7 @@ async def run_tests(
test_ids.append(test_id)
except Exception:
logger.error(f"Failed to create test: {test.id}")
continue

test_asset_hash = hash_test_asset(asset)
test_query = query_responses.get(test_asset_hash)
Expand All @@ -92,28 +93,39 @@ async def run_tests(
for agent, response in test_query["responses"].items():
report["result"][agent] = {}
agent_report = report["result"][agent]
if response["status_code"] > 299:
agent_report["status"] = "FAILED"
if response["status_code"] == "598":
agent_report["message"] = "Timed out"
try:
if response["status_code"] > 299:
agent_report["status"] = "FAILED"
if response["status_code"] == "598":
agent_report["message"] = "Timed out"
else:
agent_report["message"] = (
f"Status code: {response['status_code']}"
)
elif (
"response" not in response
or "message" not in response["response"]
):
agent_report["status"] = "FAILED"
agent_report["message"] = "Test Error"
elif (
response["response"]["message"].get("results") is None
or len(response["response"]["message"]["results"]) == 0
):
agent_report["status"] = "DONE"
agent_report["message"] = "No results"
else:
agent_report["message"] = (
f"Status code: {response['status_code']}"
await pass_fail_analysis(
report["result"],
agent,
response["response"]["message"]["results"],
normalized_curies[asset.output_id],
asset.expected_output,
)
elif (
response["response"]["message"].get("results") is None
or len(response["response"]["message"]["results"]) == 0
):
agent_report["status"] = "DONE"
agent_report["message"] = "No results"
else:
await pass_fail_analysis(
report["result"],
agent,
response["response"]["message"]["results"],
query_runner.normalized_curies[asset.output_id],
asset.expected_output,
)
except Exception as e:
logger.error(f"Failed to run analysis on {agent}: {e}")
agent_report["status"] = "FAILED"
agent_report["message"] = "Test Error"

status = "PASSED"
# grab only ars result if it exists, otherwise default to failed
Expand Down Expand Up @@ -147,6 +159,8 @@ async def run_tests(
await reporter.upload_log(test_id, json.dumps(report, indent=4))
except Exception:
logger.error(f"[{test.id}] failed to upload logs.")
else:
status = "SKIPPED"

try:
await reporter.finish_test(test_id, status)
Expand Down Expand Up @@ -205,6 +219,9 @@ async def run_tests(
except Exception:
logger.error(f"Failed to report errors with: {test.id}")

# delete this big object to help out the garbage collector
del query_responses

await slacker.post_notification(
messages=[
"""Test Suite: {test_suite}\nDuration: {duration} | Environment: {env}\n<{ir_url}|View in the Information Radiator>\n> Test Results:\n> Passed: {num_passed}, Failed: {num_failed}, Skipped: {num_skipped}""".format(
Expand Down
97 changes: 63 additions & 34 deletions test_harness/runner/query_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ class QueryRunner:

def __init__(self, logger: logging.Logger):
self.registry = {}
self.normalized_curies = {}
self.logger = logger

async def retrieve_registry(self, trapi_version: str):
Expand Down Expand Up @@ -84,15 +83,15 @@ async def run_queries(
self,
test_case: TestCase,
concurrency: int = 1, # for performance testing
) -> Dict[int, dict]:
) -> Tuple[Dict[int, dict], Dict[str, str]]:
"""Run all queries specified in a Test Case."""
# normalize all the curies in a test case
self.normalized_curies.update(await normalize_curies(test_case, self.logger))
normalized_curies = await normalize_curies(test_case, self.logger)
# TODO: figure out the right way to handle input category wrt normalization

queries: Dict[int, dict] = {}
for test_asset in test_case.test_assets:
test_asset.input_id = self.normalized_curies[test_asset.input_id]
test_asset.input_id = normalized_curies[test_asset.input_id]
# TODO: make this better
asset_hash = hash_test_asset(test_asset)
if asset_hash not in queries:
Expand All @@ -104,7 +103,6 @@ async def run_queries(
"pks": {},
}

self.logger.debug(queries)
# send queries to a single type of component at a time
for component in test_case.components:
# component = "ara"
Expand Down Expand Up @@ -134,36 +132,23 @@ async def run_queries(
except Exception as e:
self.logger.error(f"Something went wrong with the queries: {e}")

return queries
return queries, normalized_curies

async def get_ars_responses(
self, parent_pk: str, base_url: str
) -> Tuple[Dict[str, dict], Dict[str, str]]:
"""Given a parent pk, get responses for all ARS things."""
responses = {}
pks = {
"parent_pk": parent_pk,
}
async with httpx.AsyncClient(timeout=30) as client:
# retain this response for testing
res = await client.post(f"{base_url}/ars/api/retain/{parent_pk}")
res.raise_for_status()
# Get all children queries
res = await client.get(f"{base_url}/ars/api/messages/{parent_pk}?trace=y")
res.raise_for_status()
response = res.json()
async def get_ars_child_response(
self,
child_pk: str,
base_url: str,
infores: str,
start_time: float,
):
"""Given a child pk, get response from ARS."""
self.logger.info(f"Getting response for {infores}...")

start_time = time.time()
for child in response.get("children", []):
child_pk = child["message"]
infores = child["actor"]["inforesid"].split("infores:")[1]
self.logger.info(f"Getting response for {infores}...")
# add child pk
pks[infores] = child_pk
current_time = time.time()
current_time = time.time()

response = None
status = 500
response = None
status = 500
try:
# while we stay within the query max time
while current_time - start_time <= MAX_ARA_TIME:
# get query status of child query
Expand Down Expand Up @@ -195,18 +180,62 @@ async def get_ars_responses(
self.logger.info(
f"Got reponse for {infores} with status code {status_code}."
)
responses[infores] = {
response = {
"response": response.get("fields", {}).get(
"data", {"message": {"results": []}}
),
"status_code": status_code,
}
else:
self.logger.warning(f"Got error from {infores}")
responses[infores] = {
response = {
"response": {"message": {"results": []}},
"status_code": status,
}
except Exception as e:
self.logger.error(
f"Getting ARS child response ({infores}) failed with: {e}"
)
response = {
"response": {"message": {"results": []}},
"status_code": status,
}

return infores, response

async def get_ars_responses(
self, parent_pk: str, base_url: str
) -> Tuple[Dict[str, dict], Dict[str, str]]:
"""Given a parent pk, get responses for all ARS things."""
responses = {}
pks = {
"parent_pk": parent_pk,
}
async with httpx.AsyncClient(timeout=30) as client:
# retain this response for testing
# res = await client.post(f"{base_url}/ars/api/retain/{parent_pk}")
# res.raise_for_status()
# Get all children queries
res = await client.get(f"{base_url}/ars/api/messages/{parent_pk}?trace=y")
res.raise_for_status()
response = res.json()

start_time = time.time()
child_tasks = []
for child in response.get("children", []):
child_pk = child["message"]
infores = child["actor"]["inforesid"].split("infores:")[1]
# add child pk
pks[infores] = child_pk
child_tasks.append(
self.get_ars_child_response(child_pk, base_url, infores, start_time)
)

child_responses = await asyncio.gather(*child_tasks)

for child_response in child_responses:
infores, response = child_response
responses[infores] = response

# After getting all individual ARA responses, get and save the merged version
current_time = time.time()
Expand Down
4 changes: 2 additions & 2 deletions test_harness/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ async def normalize_curies(
curies.update([asset.input_id for asset in test.test_assets])
curies.add(test.test_case_input_id)

normalized_curies = {}
async with httpx.AsyncClient() as client:
normalized_curies = {}
try:
response = await client.post(
node_norm + "/get_normalized_nodes",
Expand All @@ -49,7 +49,7 @@ async def normalize_curies(
logger.error("Using original curies.")
for curie in curies:
normalized_curies[curie] = curie
return normalized_curies
return normalized_curies


def get_tag(result):
Expand Down

0 comments on commit 399b00b

Please sign in to comment.