diff --git a/acto/__main__.py b/acto/__main__.py index 0e75cb99fc..6ae5371377 100644 --- a/acto/__main__.py +++ b/acto/__main__.py @@ -9,8 +9,6 @@ import time import random -from acto.config import actoConfig - from acto.lib.monkey_patch_loader import load_monkey_patch from acto.lib.operator_config import OperatorConfig @@ -95,19 +93,8 @@ logging.getLogger("kubernetes").setLevel(logging.ERROR) logging.getLogger("sh").setLevel(logging.ERROR) -if actoConfig.parallel.executor == 'ray': - import ansible_runner - import ray - - ansible_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'scripts', 'ansible') - ansible_runner.run(inventory=actoConfig.parallel.ansible_inventory, playbook=os.path.join(ansible_dir, 'acto_ray.yaml')) - head_result = ansible_runner.run(inventory=actoConfig.parallel.ansible_inventory, - playbook=os.path.join(ansible_dir, 'ray_head.yaml')) - ansible_runner.run(inventory=actoConfig.parallel.ansible_inventory, - playbook=os.path.join(ansible_dir, 'ray_worker.yaml')) - if head_result.stats['changed'] != {}: - time.sleep(5) - ray.init(address='auto') +import acto.ray_acto as ray +ray.start_service() from acto import common @@ -170,7 +157,6 @@ elif not args.learn: acto.run(modes=['normal']) normal_finish_time = datetime.now() -acto.teardown() logger.info('Acto normal run finished in %s', normal_finish_time - start_time) logger.info('Start post processing steps') diff --git a/acto/checker/impl/kubectl_cli.py b/acto/checker/impl/kubectl_cli.py index c8ba7285fa..93d8624eec 100644 --- a/acto/checker/impl/kubectl_cli.py +++ b/acto/checker/impl/kubectl_cli.py @@ -72,4 +72,4 @@ def _binary_check(self, snapshot: Snapshot, prev_snapshot: Snapshot) -> OracleRe logger.log(logging.CRITICAL if actoConfig.strict else logging.ERROR, f'stderr is not empty, but invalid_input_message mark it as valid: {stderr}') - return KubectlCliResult() + return KubectlCliResult(f'Invalid input, field path: None, error {stderr}') diff --git a/acto/config.py b/acto/config.py index 4cfedf917a..65b54306ca 100644 --- a/acto/config.py +++ b/acto/config.py @@ -11,6 +11,7 @@ def load_config(path=os.path.join(os.path.dirname(os.path.dirname(os.path.abspat global actoConfig config = yaml.safe_load(open(path)) actoConfig = Config.parse_obj(config) + return actoConfig load_config() diff --git a/acto/deploy.py b/acto/deploy.py index e93e7aa7d0..2b948571d6 100644 --- a/acto/deploy.py +++ b/acto/deploy.py @@ -1,3 +1,4 @@ +import logging from enum import auto, unique from functools import wraps, partial from typing import TypeVar, Callable @@ -5,6 +6,8 @@ import yaml from strenum import StrEnum +from acto.monkey_patch import monkey_patch + import acto.utils as utils from acto.checker.checker_set import CheckerSet from acto.checker.impl.health import HealthChecker @@ -150,7 +153,6 @@ def deploy(self, runner: Runner) -> str: the namespace from the provided object "rabbitmq-system" does not match the namespace "acto-namespace". You must pass '--namespace=rabbitmq-system' to perform this operation. """ - logger = get_thread_logger(with_prefix=True) print_event('Deploying operator...') kubectl_client = runner.kubectl_client @@ -158,7 +160,7 @@ def deploy(self, runner: Runner) -> str: namespace = utils.get_yaml_existing_namespace(self.crd_yaml_files) or CONST.ACTO_NAMESPACE ret = utils.create_namespace(kubectl_client.api_client, namespace) if ret is None: - logger.critical('Failed to create namespace') + logging.critical('Failed to create namespace') # use server side apply to avoid last-applied-configuration if self.init_yaml_files: kubectl_client.apply(self.init_yaml_files, server_side=None) diff --git a/acto/engine_new.py b/acto/engine_new.py index 536d4d086b..68e6964562 100644 --- a/acto/engine_new.py +++ b/acto/engine_new.py @@ -34,6 +34,11 @@ from ssa.analysis import analyze +def task(runner: Runner, data: Tuple[Callable[[Runner, Trial, dict],Snapshot], CheckerSet, int, TrialInputIteratorLike]) -> Trial: + collector, checkers, num_of_mutations, iterator = data + trial = Trial(iterator, checkers, num_mutation=num_of_mutations) + return runner.run.remote(trial, collector) + class Acto: def __init__(self, workdir_path: str, @@ -149,7 +154,7 @@ def __init__(self, for oracle_modules in operator_config.custom_oracles: module = importlib.import_module(oracle_modules) for name, obj in inspect.getmembers(module): - if inspect.isclass(obj) and issubclass(obj, Checker) and obj != Checker: + if inspect.isclass(obj) and issubclass(obj, Checker) and not inspect.isabstract(obj): checker_generators.append(obj) self.checkers = CheckerSet(self.context, self.input_model, checker_generators) @@ -207,24 +212,19 @@ def collect_learn_context(namespace_discovered: str, runner: Runner, trial: Tria json.dump(self.context, context_fout, cls=ContextEncoder, indent=4, sort_keys=True) def run_trials(self, iterators: Sequence[TrialInputIteratorLike], _collector: Callable[[Runner, Trial, dict], Snapshot] = None): - def task(runner: Runner, iterator: TrialInputIteratorLike) -> Trial: - if _collector is None: - collector = with_context(CollectorContext( - namespace=self.context['namespace'], - crd_meta_info=self.context['crd'], - collect_coverage=self.collect_coverage, - ), snapshot_collector) - else: - collector = _collector - # Inject the deploy step into the collector, to deploy the operator before running the test - collector = self.deploy.chain_with(drop_first_parameter(collector)) - assert isinstance(self.input_model.get_root_schema(), ObjectSchema) - - trial = Trial(iterator, self.checkers, num_mutation=self.num_of_mutations) - return runner.run.remote(trial, collector) - + if _collector is None: + collector = with_context(CollectorContext( + namespace=self.context['namespace'], + crd_meta_info=self.context['crd'], + collect_coverage=self.collect_coverage, + ), snapshot_collector) + else: + collector = _collector + # Inject the deploy step into the collector, to deploy the operator before running the test + collector = self.deploy.chain_with(drop_first_parameter(collector)) + assert isinstance(self.input_model.get_root_schema(), ObjectSchema) for it in iterators: - self.runners.submit(task, it) + self.runners.submit(task, (collector, self.checkers, self.num_of_mutations, it)) while self.runners.has_next(): # As long as we have remaining test cases @@ -268,13 +268,6 @@ def run_test_plan(test_case_list: List[Tuple[List[str], TestCase]]): logger.info('All tests finished') - def teardown(self): - while True: - runner = self.runners.pop_idle() - if not runner: - break - runner.teardown_cluster.remote() - def do_static_analysis(analysis: AnalysisConfig): with tempfile.TemporaryDirectory() as project_src: diff --git a/acto/lib/monkey_patch_loader.py b/acto/lib/monkey_patch_loader.py index 0e79a7f932..42e5a04f66 100644 --- a/acto/lib/monkey_patch_loader.py +++ b/acto/lib/monkey_patch_loader.py @@ -3,6 +3,16 @@ from acto.config import actoConfig from acto.lib.operator_config import OperatorConfig +def patch_process_pool(): + import sys + import dill + # pickle cannot serialize closures, so we use dill instead + # https://stackoverflow.com/questions/19984152/what-can-multiprocessing-and-dill-do-together + dill.Pickler.dumps, dill.Pickler.loads = dill.dumps, dill.loads + from multiprocessing import reduction + reduction.ForkingPickler = dill.Pickler + reduction.dump = dill.dump + assert 'multiprocessing.connection' not in sys.modules def load_monkey_patch(config: OperatorConfig): monkey_patch_load_path = os.path.expanduser('~/.acto_monkey_patch.rc') @@ -12,6 +22,9 @@ def load_monkey_patch(config: OperatorConfig): else: open(monkey_patch_load_path, 'w').write('') + if actoConfig.parallel.executor == 'process': + patch_process_pool() + if actoConfig.parallel.executor == 'ray': import ansible_runner diff --git a/acto/post_diff_test.py b/acto/post_diff_test.py index ebed5c640d..5c5d18d9c6 100644 --- a/acto/post_diff_test.py +++ b/acto/post_diff_test.py @@ -11,15 +11,13 @@ if __name__ == '__main__': - from acto.config import actoConfig import acto.config as acto_config - acto_config.load_config(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), + actoConfig = acto_config.load_config(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'config_post_diff_test.yaml')) from acto.lib.monkey_patch_loader import load_monkey_patch from acto.lib.operator_config import OperatorConfig - from acto.post_process import PostDiffTest # for debugging, set random seed to 0 random.seed(0) @@ -53,23 +51,13 @@ format='%(asctime)s %(levelname)-7s, %(name)s, %(filename)-9s:%(lineno)d, %(message)s') logging.getLogger("kubernetes").setLevel(logging.ERROR) logging.getLogger("sh").setLevel(logging.ERROR) - if actoConfig.parallel.executor == 'ray': - import ansible_runner - import ray - - ansible_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'scripts', 'ansible') - ansible_runner.run(inventory=actoConfig.parallel.ansible_inventory, - playbook=os.path.join(ansible_dir, 'acto_ray.yaml')) - head_result = ansible_runner.run(inventory=actoConfig.parallel.ansible_inventory, - playbook=os.path.join(ansible_dir, 'ray_head.yaml')) - ansible_runner.run(inventory=actoConfig.parallel.ansible_inventory, - playbook=os.path.join(ansible_dir, 'ray_worker.yaml')) - if head_result.stats['changed'] != {}: - time.sleep(5) - ray.init(address='auto') + import acto.ray_acto as ray + ray.start_service() def main(): + from acto.post_process import PostDiffTest + post_diff_test_dir = os.path.join(args.workdir_path, 'post_diff_test') trials = {} trial_paths = glob.glob(os.path.join(args.workdir_path, '**', 'trial.pkl')) @@ -81,7 +69,6 @@ def main(): if not args.checkonly: p.post_process(post_diff_test_dir) p.check(post_diff_test_dir) - p.teardown() main() diff --git a/acto/post_process/post_diff_test.py b/acto/post_process/post_diff_test.py index 7ee310c271..b16f170984 100644 --- a/acto/post_process/post_diff_test.py +++ b/acto/post_process/post_diff_test.py @@ -18,6 +18,8 @@ from deepdiff.operator import BaseOperator from pandas import DataFrame +from acto.monkey_patch import monkey_patch + from acto.checker.checker import OracleResult, OracleControlFlow from acto.checker.impl import recovery from acto.checker.impl.recovery import RecoveryResult @@ -191,18 +193,18 @@ def history(self) -> List[Tuple[dict, dict]]: return self.__history -def post_process_task(runner: 'PostDiffRunner', data: Tuple[dict, str]) -> Trial: - system_input, system_input_hash = data +def post_process_task(runner: 'PostDiffRunner', data: Tuple[dict, str, dict, Deploy]) -> Trial: + system_input, system_input_hash, context, deploy = data collector = with_context(CollectorContext( - namespace=runner.context['namespace'], - crd_meta_info=runner.context['crd'], + namespace=context['namespace'], + crd_meta_info=context['crd'], ), snapshot_collector) # Inject the deploy step into the collector, to deploy the operator before running the test - collector = runner.deploy.chain_with(drop_first_parameter(collector)) + collector = deploy.chain_with(drop_first_parameter(collector)) trial = Trial(TrialSingleInputIterator(system_input, system_input_hash), None, num_mutation=10) - return runner.run.remote(trial, collector) + return ray.get(runner.run.remote(trial, collector)) def post_diff_compare_task(runner: 'PostDiffRunner', data: Tuple[Snapshot, DataFrame, bool]) -> OracleResult: @@ -211,15 +213,18 @@ def post_diff_compare_task(runner: 'PostDiffRunner', data: Tuple[Snapshot, DataF @ray.remote(scheduling_strategy="SPREAD", num_cpus=1, resources={"disk": 10}) -class PostDiffRunner(Runner): +class PostDiffRunner: def __init__(self, context: dict, deploy: Deploy, diff_ignore_fields: List[str], engine_class: Type[Engine], engine_version: str, num_nodes: int, preload_images: List[str] = None, preload_images_store: Callable[[str], str] = None): - super().__init__(engine_class, engine_version, num_nodes, preload_images, preload_images_store) + self.trial_runner = Runner.remote(engine_class, engine_version, num_nodes, preload_images, preload_images_store) self.context = context self.deploy = deploy self.diff_ignore_fields = diff_ignore_fields + def run(self, trial: Trial, snapshot_collector: Callable[['Runner', Trial, dict], Snapshot])-> Trial: + return self.trial_runner.run.remote(trial, snapshot_collector) + def check_trial(self, diff_snapshot: Snapshot, originals: DataFrame, run_check_indeterministic: bool = False): group_errs = [] diff_system_input = diff_snapshot.input @@ -239,7 +244,7 @@ def check_trial(self, diff_snapshot: Snapshot, originals: DataFrame, run_check_i errored = False if run_check_indeterministic: - additional_trial = post_process_task(self, (diff_system_input, digest)) + additional_trial = post_process_task(self, (diff_system_input, digest, self.context, self.deploy)) try: additional_snapshot, _ = unpack_history_iterator_or_raise(additional_trial.history_iterator()) except RuntimeError as e: @@ -341,7 +346,7 @@ def post_process(self, workdir: str): trial_name = group.iloc[0]['trial'] if os.path.exists(os.path.join(workdir, trial_name, f'difftest-{digest}.pkl')): continue - self._runners.submit(post_process_task, (group.iloc[0]['input'], group.iloc[0]['input_digest'])) + self._runners.submit(post_process_task, (group.iloc[0]['input'], group.iloc[0]['input_digest'], self._context, self._deploy)) while self._runners.has_next(): # As long as there are still runners running, or we have remaining test cases @@ -388,12 +393,6 @@ def check_by_snapshots(self, snapshots: Iterable[Snapshot], run_check_indetermin results.extend(result) return results - def teardown(self): - while True: - runner = self._runners.pop_idle() - if not runner: - break - runner.teardown_cluster.remote() if __name__ == '__main__': diff --git a/acto/ray_acto/__init__.py b/acto/ray_acto/__init__.py index 2d43a4a6c1..36ea909fb9 100644 --- a/acto/ray_acto/__init__.py +++ b/acto/ray_acto/__init__.py @@ -1 +1,23 @@ +import os +import time + from .ray import remote, get +from acto.config import actoConfig + + +def start_service(): + if actoConfig.parallel.executor == 'ray': + import ansible_runner + import ray + + ansible_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), + 'scripts', 'ansible') + ansible_runner.run(inventory=actoConfig.parallel.ansible_inventory, + playbook=os.path.join(ansible_dir, 'acto_ray.yaml')) + head_result = ansible_runner.run(inventory=actoConfig.parallel.ansible_inventory, + playbook=os.path.join(ansible_dir, 'ray_head.yaml')) + ansible_runner.run(inventory=actoConfig.parallel.ansible_inventory, + playbook=os.path.join(ansible_dir, 'ray_worker.yaml')) + if head_result.stats['changed'] != {}: + time.sleep(5) + ray.init(address='auto') diff --git a/acto/reproduce.py b/acto/reproduce.py index 6a1a184377..76dc4f5cd3 100644 --- a/acto/reproduce.py +++ b/acto/reproduce.py @@ -29,7 +29,14 @@ args = parser.parse_args() from acto.lib.monkey_patch_loader import load_monkey_patch - load_monkey_patch(args.config) + from acto.lib.operator_config import OperatorConfig + with open(args.config, 'r') as config_file: + config = OperatorConfig(**json.load(config_file)) + load_monkey_patch(config) + + import acto.ray_acto as ray + + ray.start_service() from acto.engine_new import Acto from acto.input import TestCase @@ -56,6 +63,9 @@ def flush(self): def revert(self): pass + def redo(self): + pass + def swap_iterator(self, _: Iterator[Tuple[List[str], 'TestCase']]) -> Iterator[Tuple[List[str], 'TestCase']]: return iter(()) diff --git a/acto/utils/process_with_except.py b/acto/utils/process_with_except.py deleted file mode 100644 index df125378e4..0000000000 --- a/acto/utils/process_with_except.py +++ /dev/null @@ -1,13 +0,0 @@ -import sys -from multiprocessing import Process -from sys import excepthook - - -class MyProcess(Process): - '''Process class with excepthook''' - - def run(self): - try: - super().run() - except Exception: - excepthook(*sys.exc_info()) \ No newline at end of file diff --git a/acto/utils/thread_logger.py b/acto/utils/thread_logger.py index 2a507c90f3..d82a3ab383 100644 --- a/acto/utils/thread_logger.py +++ b/acto/utils/thread_logger.py @@ -2,30 +2,26 @@ import threading from typing import Tuple - +# TODO: We need a new logger to handle logs for different executors class PrefixLoggerAdapter(logging.LoggerAdapter): """ A logger adapter that adds a prefix to every message """ def process(self, msg: str, kwargs: dict) -> Tuple[str, dict]: return (f'[{self.extra["prefix"]}] {msg}', kwargs) -logger_prefix = threading.local() def set_thread_logger_prefix(prefix: str) -> None: ''' Store the prefix in the thread local storag, invoke get_thread_logger_with_prefix to get the updated logger ''' - logger_prefix.prefix = prefix + pass def get_thread_logger(with_prefix: bool) -> logging.LoggerAdapter: '''Get the logger with the prefix from the thread local storage''' logger = logging.getLogger(threading.current_thread().name) logger.setLevel(logging.DEBUG) # if the prefix is not set, return the original logger - if not with_prefix or not hasattr(logger_prefix, 'prefix'): - return logger - - return PrefixLoggerAdapter(logger, extra={'prefix': logger_prefix.prefix}) + return logger diff --git a/poetry.lock b/poetry.lock index 4e505af114..21a1929f88 100644 --- a/poetry.lock +++ b/poetry.lock @@ -619,6 +619,21 @@ ordered-set = ">=4.0.2,<4.2.0" cli = ["click (==8.1.3)", "pyyaml (==6.0)"] optimize = ["orjson"] +[[package]] +name = "dill" +version = "0.3.7" +description = "serialize all of Python" +category = "main" +optional = false +python-versions = ">=3.7" +files = [ + {file = "dill-0.3.7-py3-none-any.whl", hash = "sha256:76b122c08ef4ce2eedcd4d1abd8e641114bfc6c2867f49f3c41facf65bf19f5e"}, + {file = "dill-0.3.7.tar.gz", hash = "sha256:cc1c8b182eb3013e24bd475ff2e9295af86c1a38eb1aff128dac8962a9ce3c03"}, +] + +[package.extras] +graph = ["objgraph (>=1.7.2)"] + [[package]] name = "distlib" version = "0.3.7" @@ -2471,4 +2486,4 @@ ray = ["ansible-core", "ansible-runner", "ray"] [metadata] lock-version = "2.0" python-versions = ">=3.8" -content-hash = "de4fcb5e467db5f4a462f9433c25cdb486a80134b4b0b14943bc1be5e21f3e01" +content-hash = "9c6a4abfc3736a8a366fe5547e6480eac2a91b5810e9a522d6322350be70e47a" diff --git a/pyproject.toml b/pyproject.toml index 94ae67dea9..654aaeeed0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,6 +23,7 @@ ray = { extras = ["default"], version = "^2.5.1", optional = true } ansible-runner = { version = "^2.3.3", optional = true } ansible-core = { version = "~2.13.0", optional = true } pre-commit = "^3.3.3" +dill = "^0.3.7" [tool.poetry.group.dev] optional = true diff --git a/requirements.txt b/requirements.txt index d838b4a69e..90fea31afc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -357,6 +357,9 @@ cryptography==41.0.2 ; python_version >= "3.8" \ deepdiff==6.3.1 ; python_version >= "3.8" \ --hash=sha256:e8c1bb409a2caf1d757799add53b3a490f707dd792ada0eca7cac1328055097a \ --hash=sha256:eae2825b2e1ea83df5fc32683d9aec5a56e38b756eb2b280e00863ce4def9d33 +dill==0.3.7 ; python_version >= "3.8" \ + --hash=sha256:76b122c08ef4ce2eedcd4d1abd8e641114bfc6c2867f49f3c41facf65bf19f5e \ + --hash=sha256:cc1c8b182eb3013e24bd475ff2e9295af86c1a38eb1aff128dac8962a9ce3c03 distlib==0.3.7 ; python_version >= "3.8" \ --hash=sha256:2e24928bc811348f0feb63014e97aaae3037f2cf48712d51ae61df7fd6075057 \ --hash=sha256:9dafe54b34a028eafd95039d5e5d4851a13734540f1331060d31c9916e7147a8 diff --git a/test/test_cassop_bugs.py b/test/test_cassop_bugs.py index f377a87ddb..6edfbd4589 100644 --- a/test/test_cassop_bugs.py +++ b/test/test_cassop_bugs.py @@ -1,5 +1,4 @@ import json -import multiprocessing import os import pathlib import unittest