From 0b3ba003578a88da472f0588ed2abb092c310d0f Mon Sep 17 00:00:00 2001 From: George Boukeas Date: Fri, 16 Aug 2024 13:01:44 +0100 Subject: [PATCH] refactor(agent): unify treatment of output and other events --- agent/testflinger_agent/handlers.py | 9 +-- agent/testflinger_agent/job.py | 10 ++- agent/testflinger_agent/runner.py | 65 +++++++++++-------- .../stop_condition_checkers.py | 3 +- agent/testflinger_agent/tests/test_job.py | 6 +- .../tests/test_stop_condition_checkers.py | 3 +- 6 files changed, 55 insertions(+), 41 deletions(-) diff --git a/agent/testflinger_agent/handlers.py b/agent/testflinger_agent/handlers.py index 3877d15a..d20fb4ee 100644 --- a/agent/testflinger_agent/handlers.py +++ b/agent/testflinger_agent/handlers.py @@ -13,6 +13,7 @@ # along with this program. If not, see from .client import TestflingerClient +from .runner import OutputEvent class LiveOutputHandler: @@ -20,14 +21,14 @@ def __init__(self, client: TestflingerClient, job_id: str): self.client = client self.job_id = job_id - def __call__(self, data: str): - self.client.post_live_output(self.job_id, data) + def __call__(self, event: OutputEvent): + self.client.post_live_output(self.job_id, event.output) class LogUpdateHandler: def __init__(self, log_file: str): self.log_file = log_file - def __call__(self, data: str): + def __call__(self, event: OutputEvent): with open(self.log_file, "a") as log: - log.write(data) + log.write(event.output) diff --git a/agent/testflinger_agent/job.py b/agent/testflinger_agent/job.py index 6b7d9855..f8b8d510 100644 --- a/agent/testflinger_agent/job.py +++ b/agent/testflinger_agent/job.py @@ -19,7 +19,7 @@ import time from testflinger_agent.errors import TFServerError -from .runner import CommandRunner, RunnerEvents +from .runner import CommandRunner, OutputEvent from .handlers import LiveOutputHandler, LogUpdateHandler from .stop_condition_checkers import ( JobCancelledChecker, @@ -85,8 +85,8 @@ def run_test_phase(self, phase, rundir): runner = CommandRunner(cwd=rundir, env=self.client.config) output_log_handler = LogUpdateHandler(output_log) live_output_handler = LiveOutputHandler(self.client, self.job_id) - runner.register_output_handler(output_log_handler) - runner.register_output_handler(live_output_handler) + runner.subscribe_event(OutputEvent, output_log_handler) + runner.subscribe_event(OutputEvent, live_output_handler) # Reserve phase uses a separate timeout handler if phase != "reserve": @@ -101,9 +101,7 @@ def run_test_phase(self, phase, rundir): self.get_output_timeout() ) runner.register_stop_condition_checker(output_timeout_checker) - runner.subscribe_event( - RunnerEvents.OUTPUT_RECEIVED, output_timeout_checker.update - ) + runner.subscribe_event(OutputEvent, output_timeout_checker.update) # Do not allow cancellation during provision for safety reasons if phase != "provision": diff --git a/agent/testflinger_agent/runner.py b/agent/testflinger_agent/runner.py index 3fc067de..07ffe2a8 100644 --- a/agent/testflinger_agent/runner.py +++ b/agent/testflinger_agent/runner.py @@ -22,60 +22,72 @@ import time from collections import defaultdict -from enum import Enum -from typing import Callable, List, Optional, Tuple +from typing import Callable, List, Optional, Tuple, Type from testflinger_common.enums import TestEvent logger = logging.getLogger(__name__) -OutputHandlerType = Callable[[str], None] StopConditionType = Callable[[], Optional[str]] -class RunnerEvents(Enum): +class RunnerEvent: """ Runner events that can be subscribed to. """ - OUTPUT_RECEIVED = "output_received" + @classmethod + def event_name(cls): + """ + Return a unique event name (the fully qualified class name) to use + as a key in dictionaries. + """ + return f"{cls.__module__}.{cls.__qualname__}" + + +# Any event handler expects to be called with the event as an argument +RunnerEventHandlerType = Callable[[RunnerEvent], None] + + +class OutputEvent(RunnerEvent): + """ + A type of event corresponding to the generation of output during a test. + The output is stored in the corresponding `output` attribute. + """ + + def __init__(self, output: str): + self.output = output class CommandRunner: """ - Run a command and handle output and stop conditions. + Run a command and handle events and stop conditions. - There are also events that can be subscribed to for notifications. The - known event types are defined in RunnerEvents. + There are also events that can be subscribed to for notifications. """ def __init__(self, cwd: Optional[str], env: Optional[dict]): - self.output_handlers: List[OutputHandlerType] = [] self.stop_condition_checkers: List[StopConditionType] = [] self.process: Optional[subprocess.Popen] = None self.cwd = cwd self.env = os.environ.copy() + # a mapping of event names to lists of registered event handlers self.events = defaultdict(list) if env: self.env.update( {k: str(v) for k, v in env.items() if isinstance(v, str)} ) - def register_output_handler(self, handler: OutputHandlerType): - self.output_handlers.append(handler) - - def subscribe_event(self, event_name: RunnerEvents, handler: Callable): + def subscribe_event( + self, event_cls: Type[RunnerEvent], handler: RunnerEventHandlerType + ): """Set a callback for an event that we want to be notified of""" - self.events[event_name].append(handler) + self.events[event_cls.event_name()].append(handler) - def post_event(self, event_name: RunnerEvents): + def post_event(self, event: RunnerEvent): """Post an event for subscribers to be notified of""" - for handler in self.events[event_name]: - handler() - - def post_output(self, data: str): - for handler in self.output_handlers: - handler(data) + for handler in self.events[type(event).event_name()]: + handler(event) def register_stop_condition_checker(self, checker: StopConditionType): self.stop_condition_checkers.append(checker) @@ -95,10 +107,11 @@ def check_and_post_output(self): raw_output = self.process.stdout.read() if not raw_output: return - self.post_event(RunnerEvents.OUTPUT_RECEIVED) - - output = raw_output.decode(sys.stdout.encoding, errors="replace") - self.post_output(output) + self.post_event( + OutputEvent( + raw_output.decode(sys.stdout.encoding, errors="replace") + ) + ) def run_command_thread(self, cmd: str): self.process = subprocess.Popen( @@ -141,7 +154,7 @@ def run(self, cmd: str) -> Tuple[int, Optional[TestEvent], str]: stop_event, stop_reason = self.check_stop_conditions() if stop_event is not None: - self.post_output(f"\n{stop_reason}\n") + self.post_event(OutputEvent(f"\n{stop_reason}\n")) self.cleanup() break diff --git a/agent/testflinger_agent/stop_condition_checkers.py b/agent/testflinger_agent/stop_condition_checkers.py index 7bedb668..7fc0bb34 100644 --- a/agent/testflinger_agent/stop_condition_checkers.py +++ b/agent/testflinger_agent/stop_condition_checkers.py @@ -16,6 +16,7 @@ from typing import Optional, Tuple from testflinger_common.enums import JobState, TestEvent from .client import TestflingerClient +from .runner import RunnerEvent class JobCancelledChecker: @@ -59,6 +60,6 @@ def __call__(self) -> Tuple[Optional[TestEvent], str]: ) return None, "" - def update(self): + def update(self, _: RunnerEvent): """Update the last output time to the current time.""" self.last_output_time = time.time() diff --git a/agent/testflinger_agent/tests/test_job.py b/agent/testflinger_agent/tests/test_job.py index 83fb8e67..9219bac3 100644 --- a/agent/testflinger_agent/tests/test_job.py +++ b/agent/testflinger_agent/tests/test_job.py @@ -9,7 +9,7 @@ import testflinger_agent from testflinger_agent.client import TestflingerClient as _TestflingerClient from testflinger_agent.job import TestflingerJob as _TestflingerJob -from testflinger_agent.runner import CommandRunner +from testflinger_agent.runner import CommandRunner, OutputEvent from testflinger_agent.handlers import LogUpdateHandler from testflinger_agent.stop_condition_checkers import ( GlobalTimeoutChecker, @@ -73,7 +73,7 @@ def test_job_global_timeout(self, tmp_path): logfile = tmp_path / "testlog" runner = CommandRunner(tmp_path, env={}) log_handler = LogUpdateHandler(logfile) - runner.register_output_handler(log_handler) + runner.subscribe_event(OutputEvent, log_handler) global_timeout_checker = GlobalTimeoutChecker(1) runner.register_stop_condition_checker(global_timeout_checker) exit_code, exit_event, exit_reason = runner.run("sleep 12") @@ -98,7 +98,7 @@ def test_job_output_timeout(self, tmp_path): logfile = tmp_path / "testlog" runner = CommandRunner(tmp_path, env={}) log_handler = LogUpdateHandler(logfile) - runner.register_output_handler(log_handler) + runner.subscribe_event(OutputEvent, log_handler) output_timeout_checker = OutputTimeoutChecker(1) runner.register_stop_condition_checker(output_timeout_checker) # unfortunately, we need to sleep for longer that 10 seconds here diff --git a/agent/testflinger_agent/tests/test_stop_condition_checkers.py b/agent/testflinger_agent/tests/test_stop_condition_checkers.py index baba8327..3bcb01c0 100644 --- a/agent/testflinger_agent/tests/test_stop_condition_checkers.py +++ b/agent/testflinger_agent/tests/test_stop_condition_checkers.py @@ -22,6 +22,7 @@ ) from testflinger_common.enums import TestEvent +from testflinger_agent.runner import OutputEvent class TestStopConditionCheckers: @@ -68,5 +69,5 @@ def test_output_timeout_update(self): checker = OutputTimeoutChecker(0.3) for _ in range(5): time.sleep(0.1) - checker.update() + checker.update(OutputEvent("")) assert checker() == (None, "")