Skip to content

Commit

Permalink
Merge pull request #284 from NASA-IMPACT/multithreading
Browse files Browse the repository at this point in the history
Implement Multithreading for Enhanced Performance in Custom Check Processing
  • Loading branch information
xhagrg authored May 6, 2024
2 parents 292bae7 + dfd681c commit a0fbe59
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 45 deletions.
93 changes: 66 additions & 27 deletions pyQuARC/code/checker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json

from xmltodict import parse
from concurrent.futures import ThreadPoolExecutor, as_completed

from .custom_checker import CustomChecker
from .schema_validator import SchemaValidator
Expand Down Expand Up @@ -154,43 +155,81 @@ def _check_dependencies_validity(self, dependencies, field_dict):
return False
return True

def _process_field(
self,
func,
check,
rule_id,
metadata_content,
field_dict,
result_dict,
rule_mapping,
):
"""
Process a single field according to the given rule and update result_dict
"""
external_data = rule_mapping.get("data", [])
relation = rule_mapping.get("relation")
dependencies = self.scheduler.get_all_dependencies(
rule_mapping, check, field_dict
)
main_field = field_dict["fields"][0]
external_data = field_dict.get("data", external_data)
result_dict.setdefault(main_field, {})

if not self._check_dependencies_validity(dependencies, field_dict):
return

result = self.custom_checker.run(
func, metadata_content, field_dict, external_data, relation
)

self.tracker.update_data(rule_id, main_field, result["valid"])

# Avoid adding null valid results for rules that are not applied
if result["valid"] is None:
return

result_dict[main_field][rule_id] = result

message = self.build_message(result, rule_id)
if message:
result["message"] = message
result["remediation"] = self.message(rule_id, "remediation")

def _run_func(self, func, check, rule_id, metadata_content, result_dict):
"""
Run the check function for `rule_id` and update `result_dict`
"""
rule_mapping = self.rules_override.get(rule_id) or self.rule_mapping.get(
rule_id
)
external_data = rule_mapping.get("data", [])
relation = rule_mapping.get("relation")
list_of_fields_to_apply = rule_mapping.get("fields_to_apply").get(
self.metadata_format, {}
)

for field_dict in list_of_fields_to_apply:
dependencies = self.scheduler.get_all_dependencies(
rule_mapping, check, field_dict
)
main_field = field_dict["fields"][0]
external_data = field_dict.get("data", external_data)
result_dict.setdefault(main_field, {})
if not self._check_dependencies_validity(dependencies, field_dict):
continue
result = self.custom_checker.run(
func, metadata_content, field_dict, external_data, relation
)

self.tracker.update_data(rule_id, main_field, result["valid"])

# this is to avoid "valid" = null in the result, for rules that are not applied
if result["valid"] is None:
continue
result_dict[main_field][rule_id] = result

message = self.build_message(result, rule_id)
if message:
result["message"] = message
result["remediation"] = self.message(rule_id, "remediation")
with ThreadPoolExecutor(max_workers=5) as executor:
futures = []
for field_dict in list_of_fields_to_apply:
future = executor.submit(
self._process_field,
func,
check,
rule_id,
metadata_content,
field_dict,
result_dict,
rule_mapping,
)
futures.append(future)

# Wait for all futures to complete
for future in as_completed(futures):
# Retrieve the result or raise an exception if an error occurred
try:
future.result()
except Exception as e:
# Handle the exception from the thread
raise e

def perform_custom_checks(self, metadata_content):
"""
Expand Down
75 changes: 57 additions & 18 deletions pyQuARC/code/custom_checker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from urllib.parse import urlparse
from concurrent.futures import ThreadPoolExecutor, as_completed


class CustomChecker:
Expand Down Expand Up @@ -103,6 +104,33 @@ def _get_path_value(content_to_validate, path_string):
)
return container

@staticmethod
def _process_argument(arg, func, relation, external_data, external_relation):
"""
Process the argument by calling the provided function with the given arguments.
Args:
arg: The argument to be processed.
func: The function to be called.
relation: The relation argument.
external_data: The external data argument.
external_relation: The external relation argument.
Returns:
A dict containing the updated invalid_values list and the updated validity flag.
"""

function_args = [*arg]
function_args.extend(
[
extra_arg
for extra_arg in [relation, *external_data, external_relation]
if extra_arg
]
)
func_return = func(*function_args)
return func_return

def run(
self, func, content_to_validate, field_dict, external_data, external_relation
):
Expand Down Expand Up @@ -137,24 +165,35 @@ def run(

invalid_values = []
validity = None
for arg in args:
function_args = [*arg]
function_args.extend(
[
extra_arg
for extra_arg in [relation, *external_data, external_relation]
if extra_arg
]
)
func_return = func(*function_args)
valid = func_return["valid"] # can be True, False or None
if valid is not None:
if valid:
validity = validity or (validity is None)
else:
if "value" in func_return:
invalid_values.append(func_return["value"])
validity = False

# Process arguments using multithreading
with ThreadPoolExecutor() as executor:
future_results = []
for arg in args:
future = executor.submit(
self._process_argument,
arg,
func,
relation,
external_data,
external_relation,
)
future_results.append(future)

# Retrieve results from futures
for future in as_completed(future_results):
try:
func_return = future.result()
valid = func_return["valid"] # can be True, False or None
if valid is not None:
if valid:
validity = validity or (validity is None)
else:
if "value" in func_return:
invalid_values.append(func_return["value"])
validity = False
except Exception as e:
raise e
result["valid"] = validity
result["value"] = invalid_values
return result

0 comments on commit a0fbe59

Please sign in to comment.