From 3c343cc4ebbf91aa8aa199be641505c1d17bb611 Mon Sep 17 00:00:00 2001 From: George Boukeas Date: Thu, 3 Oct 2024 17:21:52 +0100 Subject: [PATCH 01/12] refactor: Introduce job phases and refactor agent job loop (WIP) --- agent/testflinger_agent/agent.py | 219 ++---- agent/testflinger_agent/job.py | 816 +++++++++++++++++----- agent/testflinger_agent/runner.py | 1 - agent/testflinger_agent/tests/test_job.py | 141 ++-- 4 files changed, 781 insertions(+), 396 deletions(-) diff --git a/agent/testflinger_agent/agent.py b/agent/testflinger_agent/agent.py index 57ba0512..c3ca3a1c 100644 --- a/agent/testflinger_agent/agent.py +++ b/agent/testflinger_agent/agent.py @@ -12,62 +12,25 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see -import json import logging import os -from pathlib import Path import shutil -import tempfile +import signal from testflinger_agent.job import TestflingerJob from testflinger_agent.errors import TFServerError -from testflinger_agent.config import ATTACHMENTS_DIR -from testflinger_agent.event_emitter import EventEmitter -from testflinger_common.enums import JobState, TestPhase, TestEvent - - -try: - # attempt importing a tarfile filter, to check if filtering is supported - from tarfile import data_filter - - del data_filter -except ImportError: - # import a patched version of `tarfile` that supports filtering; - # this conditional import can be removed when all agents run - # versions of Python that support filtering, i.e. at least: - # 3.8.17, 3.9.17, 3.10.12, 3.11.4, 3.12 - from . import tarfile_patch as tarfile -else: - import tarfile +from testflinger_common.enums import JobState, TestPhase logger = logging.getLogger(__name__) -def secure_filter(member, path): - """Combine the `data` filter with custom attachment filtering - - Makes sure that the starting folder for all attachments coincides - with one of the supported phases, i.e. that the attachment archive - has been created properly and no attachment will be extracted to an - unexpected location. - """ - try: - resolved = Path(member.name).resolve().relative_to(Path.cwd()) - except ValueError as error: - # essentially trying to extract higher than the attachments folder - raise tarfile.OutsideDestinationError(member, path) from error - if not str(resolved).startswith( - ("provision/", "firmware_update/", "test/") - ): - # trying to extract in an invalid folder, under the attachments folder - raise tarfile.OutsideDestinationError(member, path) - return tarfile.data_filter(member, path) - - class TestflingerAgent: + def __init__(self, client): self.client = client + signal.signal(signal.SIGUSR1, self.restart_signal_handler) + # [TODO] Investigate relation between the agent state and the job state self.set_agent_state("waiting") self._post_initial_agent_data() @@ -153,54 +116,9 @@ def mark_device_offline(self): # Create the offline file, this should work even if it exists open(self.get_offline_files()[0], "w").close() - def unpack_attachments(self, job_data: dict, cwd: Path): - """Download and unpack the attachments associated with a job""" - job_id = job_data["job_id"] - - with tempfile.NamedTemporaryFile(suffix="tar.gz") as archive_tmp: - archive_path = Path(archive_tmp.name) - # download attachment archive - logger.info(f"Downloading attachments for {job_id}") - self.client.get_attachments(job_id, path=archive_path) - # extract archive into the attachments folder - logger.info(f"Unpacking attachments for {job_id}") - with tarfile.open(archive_path, "r:gz") as tar: - tar.extractall(cwd / ATTACHMENTS_DIR, filter=secure_filter) - - # side effect: remove all attachment data from `job_data` - # (so there is no interference with existing processes, especially - # provisioning or firmware update, which are triggered when these - # sections are not empty) - for phase in ( - TestPhase.PROVISION, - TestPhase.FIRMWARE_UPDATE, - TestPhase.TEST, - ): - phase_str = f"{phase}_data" - try: - phase_data = job_data[phase_str] - except KeyError: - pass - else: - # delete attachments, if they exist - phase_data.pop("attachments", None) - # it may be the case that attachments were the only data - # included for this phase, so the phase can now be removed - if not phase_data: - del job_data[phase_str] - def process_jobs(self): """Coordinate checking for new jobs and handling them if they exists""" - TEST_PHASES = [ - TestPhase.SETUP, - TestPhase.PROVISION, - TestPhase.FIRMWARE_UPDATE, - TestPhase.TEST, - TestPhase.ALLOCATE, - TestPhase.RESERVE, - ] - # First, see if we have any old results that we couldn't send last time self.retry_old_results() @@ -208,109 +126,55 @@ def process_jobs(self): job_data = self.client.check_jobs() while job_data: try: + job_id = job_data["job_id"] + + # Create the job job = TestflingerJob(job_data, self.client) - event_emitter = EventEmitter( - job_data.get("job_queue"), - job_data.get("job_status_webhook"), - self.client, - job.job_id, - ) - job_end_reason = TestEvent.NORMAL_EXIT - - logger.info("Starting job %s", job.job_id) - event_emitter.emit_event( - TestEvent.JOB_START, - f"{self.client.server}/jobs/{job.job_id}", - ) - rundir = os.path.join( - self.client.config.get("execution_basedir"), job.job_id - ) - os.makedirs(rundir) - - self.client.post_agent_data({"job_id": job.job_id}) - - # Dump the job data to testflinger.json in our execution dir - with open(os.path.join(rundir, "testflinger.json"), "w") as f: - json.dump(job_data, f) - # Create json outcome file where phases will store their output - with open( - os.path.join(rundir, "testflinger-outcome.json"), "w" - ) as f: - json.dump({}, f) - - # Handle job attachments, if any. - # - # *Always* place this after creating "testflinger.json": - # - If there is an unpacking error, the file is required - # for reporting - # - The `unpack_attachments` method has a side effect on - # `job_data`: it removes attachment data. However, the - # file will still contain all the data received and - # pass it on to the device container - if job_data.get("attachments_status") == "complete": - self.unpack_attachments(job_data, cwd=Path(rundir)) - - for phase in TEST_PHASES: + + # Let the server know the agent has picked up the job + self.client.post_agent_data({"job_id": job_id}) + job.start() + + if job.check_attachments(): + job.unpack_attachments() + + # Go through the job phases + for phase in job.phase_sequence: + # First make sure the job hasn't been cancelled - if ( - self.client.check_job_state(job.job_id) - == JobState.CANCELLED - ): - logger.info("Job cancellation was requested, exiting.") - event_emitter.emit_event(TestEvent.CANCELLED) + if job.check_cancel(): + job.cancel() break - self.client.post_job_state(job.job_id, phase) - self.set_agent_state(phase) - - event_emitter.emit_event(TestEvent(phase + "_start")) - exit_code, exit_event, exit_reason = job.run_test_phase( - phase, rundir - ) - self.client.post_influx(phase, exit_code) - event_emitter.emit_event(exit_event, exit_reason) - - if exit_code: - # exit code 46 is our indication that recovery failed! - # In this case, we need to mark the device offline - if exit_code == 46: - self.mark_device_offline() - exit_event = TestEvent.RECOVERY_FAIL - else: - exit_event = TestEvent(phase + "_fail") - event_emitter.emit_event(exit_event) - if phase == "provision": - self.client.post_provision_log( - job.job_id, exit_code, exit_event - ) - if phase != "test": - logger.debug( - "Phase %s failed, aborting job" % phase - ) - job_end_reason = exit_event + # Run the phase or skip it + if job.go(phase): + self.set_agent_state(phase) + job.run(phase) + if job.check_end(): break - else: - event_emitter.emit_event(TestEvent(phase + "_success")) + except Exception as e: logger.exception(e) finally: # Always run the cleanup, even if the job was cancelled - event_emitter.emit_event(TestEvent.CLEANUP_START) - job.run_test_phase(TestPhase.CLEANUP, rundir) - event_emitter.emit_event(TestEvent.CLEANUP_SUCCESS) - event_emitter.emit_event(TestEvent.JOB_END, job_end_reason) - # clear job id - self.client.post_agent_data({"job_id": ""}) + job.run(TestPhase.CLEANUP) + + # let the server know the agent is available (clear job id) + job.end() + self.client.post_agent_data({"job_id": ""}) + + if job.phases[TestPhase.PROVISIONING].result.exit_status == 46: + self.mark_device_offline() try: - self.client.transmit_job_outcome(rundir) + self.client.transmit_job_outcome(job.params.rundir) except Exception as e: # TFServerError will happen if we get other-than-good status # Other errors can happen too for things like connection # problems logger.exception(e) results_basedir = self.client.config.get("results_basedir") - shutil.move(rundir, results_basedir) + shutil.move(job.params.rundir, results_basedir) self.set_agent_state(JobState.WAITING) self.check_restart() @@ -337,3 +201,12 @@ def retry_old_results(self): except TFServerError: # Problems still, better luck next time? pass + + def restart_signal_handler(self, _, __): + """ + If we receive the restart signal, tell the agent to restart safely when + it is not running a job + """ + logger.info("Marked agent for restart") + restart_file = self.get_restart_files()[0] + open(restart_file, "w").close() diff --git a/agent/testflinger_agent/job.py b/agent/testflinger_agent/job.py index 6b7d9855..520d61cf 100644 --- a/agent/testflinger_agent/job.py +++ b/agent/testflinger_agent/job.py @@ -12,13 +12,21 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see +from abc import ABC, abstractmethod import fcntl import json import logging import os +from pathlib import Path +import tempfile import time +from typing import Optional, Tuple, NamedTuple +from testflinger_agent.client import TestflingerClient +from testflinger_agent.config import ATTACHMENTS_DIR from testflinger_agent.errors import TFServerError +from testflinger_agent.event_emitter import EventEmitter +from testflinger_common.enums import TestEvent, TestPhase, JobState from .runner import CommandRunner, RunnerEvents from .handlers import LiveOutputHandler, LogUpdateHandler from .stop_condition_checkers import ( @@ -27,148 +35,444 @@ OutputTimeoutChecker, ) +try: + # attempt importing a tarfile filter, to check if filtering is supported + from tarfile import data_filter + + del data_filter +except ImportError: + # import a patched version of `tarfile` that supports filtering; + # this conditional import can be removed when all agents run + # versions of Python that support filtering, i.e. at least: + # 3.8.17, 3.9.17, 3.10.12, 3.11.4, 3.12 + from . import tarfile_patch as tarfile +else: + import tarfile + + logger = logging.getLogger(__name__) -class TestflingerJob: - def __init__(self, job_data, client): - """ - :param job_data: - Dictionary containing data for the test job_data - :param client: - Testflinger client object for communicating with the server - """ - self.client = client - self.job_data = job_data - self.job_id = job_data.get("job_id") - self.phase = "unknown" +class TestflingerJobParameters(NamedTuple): + """The bundle of data required (and shared) by a job and its phases""" - def run_test_phase(self, phase, rundir): - """Run the specified test phase in rundir + job_data: dict + client: TestflingerClient + rundir: Path - :param phase: - Name of the test phase (setup, provision, test, ...) - :param rundir: - Directory in which to run the command defined for the phase - :return: - Returncode from the command that was executed, 0 will be returned - if there was no command to run - """ - self.phase = phase - cmd = self.client.config.get(phase + "_command") - node = self.client.config.get("agent_id") - if not cmd: - logger.info("No %s_command configured, skipping...", phase) - return 0, None, None - if phase == "provision" and not self.job_data.get("provision_data"): - logger.info("No provision_data defined in job data, skipping...") - return 0, None, None - if phase == "firmware_update" and not self.job_data.get( - "firmware_update_data" - ): - logger.info( - "No firmware_update_data defined in job data, skipping..." - ) - return 0, None, None - if phase == "test" and not self.job_data.get("test_data"): - logger.info("No test_data defined in job data, skipping...") - return 0, None, None - if phase == "allocate" and not self.job_data.get("allocate_data"): - return 0, None, None - if phase == "reserve" and not self.job_data.get("reserve_data"): - return 0, None, None - results_file = os.path.join(rundir, "testflinger-outcome.json") - output_log = os.path.join(rundir, phase + ".log") - serial_log = os.path.join(rundir, phase + "-serial.log") - - logger.info("Running %s_command: %s", phase, cmd) - runner = CommandRunner(cwd=rundir, env=self.client.config) - output_log_handler = LogUpdateHandler(output_log) - live_output_handler = LiveOutputHandler(self.client, self.job_id) - runner.register_output_handler(output_log_handler) - runner.register_output_handler(live_output_handler) + def get_global_timeout(self): + """Get the global timeout for the test run in seconds""" + # Default timeout is 4 hours + default_timeout = 4 * 60 * 60 - # Reserve phase uses a separate timeout handler - if phase != "reserve": - global_timeout_checker = GlobalTimeoutChecker( - self.get_global_timeout() - ) - runner.register_stop_condition_checker(global_timeout_checker) + # Don't exceed the maximum timeout configured for the device! + return min( + self.job_data.get("global_timeout", default_timeout), + self.client.config.get("global_timeout", default_timeout), + ) - # We only need to check for output timeouts during the test phase - if phase == "test": - output_timeout_checker = OutputTimeoutChecker( - self.get_output_timeout() - ) - runner.register_stop_condition_checker(output_timeout_checker) - runner.subscribe_event( - RunnerEvents.OUTPUT_RECEIVED, output_timeout_checker.update - ) + def get_output_timeout(self): + """Get the output timeout for the test run in seconds""" + # Default timeout is 15 minutes + default_timeout = 15 * 60 - # Do not allow cancellation during provision for safety reasons - if phase != "provision": - job_cancelled_checker = JobCancelledChecker( - self.client, self.job_id + # Don't exceed the maximum timeout configured for the device! + return min( + self.job_data.get("output_timeout", default_timeout), + self.client.config.get("output_timeout", default_timeout), + ) + + +class JobPhaseResult(NamedTuple): + """The bundle of data holding the results of a phase""" + + exit_code: Optional[int] = None + event: Optional[TestEvent] = None + detail: Optional[str] = None + + +class JobPhase(ABC): + """ + A phase in a Testflinger job + + This is an abstract class that captures the interface and basic + functionality of a phase. + """ + + # Classes derived from JobPhase are associated with a TestPhase id + phase_id: TestPhase = None + + def __init_subclass__(subcls, phase_id: Optional[TestPhase] = None): + if subcls is not None: + subcls.phase_id = phase_id + + def __init__(self, params: TestflingerJobParameters): + self.params = params + # phase-specific files + self.results_file = params.rundir / "testflinger-outcome.json" + self.output_log = params.rundir / f"{self.phase_id}.log" + self.serial_log = params.rundir / f"{self.phase_id}-serial.log" + # phase-specific runner (for executing commands in the rundir) + self.runner = CommandRunner( + cwd=params.rundir, env=params.client.config + ) + # phase result: initially empty, set later by `JobPhase.run` + self.result: Optional[JobPhaseResult] = None + + @abstractmethod + def go(self) -> bool: + """Return True if the phase should run or False if skipping""" + raise NotImplementedError + + @abstractmethod + def register(self): + """Perform all necessary registrations with the phase runner""" + raise NotImplementedError + + @abstractmethod + def run_core(self): + """Execute the "core" of the phase and set self.result""" + raise NotImplementedError + + def process_results(self): + """Execute any possible actions after the "core" of the phase""" + pass + + def run(self) -> Tuple[int, Optional[TestEvent], str]: + """This is the generic structure of every phase""" + + # register the output handlers with the runner + self.runner.register_output_handler(LogUpdateHandler(self.output_log)) + self.runner.register_output_handler( + LiveOutputHandler( + self.params.client, self.params.job_data["job_id"] ) - runner.register_stop_condition_checker(job_cancelled_checker) + ) + # perform additional runner registrations + self.register() + # display phase banner in the Testflinger output for line in self.banner( - "Starting testflinger {} phase on {}".format(phase, node) + f"Starting testflinger {self.phase_id} phase " + f"on {self.params.client.config.get('agent_id')}" ): - runner.run(f"echo '{line}'") - try: - # Set exit_event to fail for this phase in case of an exception - exit_event = f"{phase}_fail" - exitcode, exit_event, exit_reason = runner.run(cmd) - except Exception as exc: - logger.exception(exc) - exitcode = 100 - exit_reason = str(exc) # noqa: F841 - ignore this until it's used - finally: - self._update_phase_results( - results_file, phase, exitcode, output_log, serial_log - ) - if phase == "allocate": - self.allocate_phase(rundir) - return exitcode, exit_event, exit_reason + self.runner.run(f"echo '{line}'") - def _update_phase_results( - self, results_file, phase, exitcode, output_log, serial_log - ): - """Update the results file with the results of the specified phase + # run the "core" of the phase + self.run_core() + self.update_results() - :param results_file: - Path to the results file - :param phase: - Name of the phase - :param exitcode: - Exitcode from the device agent - :param output_log: - Path to the output log file - :param serial_log: - Path to the serial log file - """ - with open(results_file, "r+") as results: + # perform any post-core actions + self.process_results() + + def update_results(self): + """Update the results file with the results of this phase""" + with open(self.results_file, "r+") as results: outcome_data = json.load(results) - if os.path.exists(output_log): - with open(output_log, "r+", encoding="utf-8") as logfile: - self._set_truncate(logfile) - outcome_data[phase + "_output"] = logfile.read() - if os.path.exists(serial_log): - with open(serial_log, "r+", encoding="utf-8") as logfile: - self._set_truncate(logfile) - outcome_data[phase + "_serial"] = logfile.read() - outcome_data[phase + "_status"] = exitcode + outcome_data[f"{self.phase_id}_status"] = self.result.exit_code + try: + with open(self.output_log, "r+", encoding="utf-8") as logfile: + set_truncate(logfile) + outcome_data[f"{self.phase_id}_output"] = logfile.read() + except FileNotFoundError: + pass + try: + with open( + self.serial_log, "r+", encoding="utf-8", errors="ignore" + ) as logfile: + set_truncate(logfile) + outcome_data[f"{self.phase_id}_serial"] = logfile.read() + except FileNotFoundError: + pass results.seek(0) json.dump(outcome_data, results) - def allocate_phase(self, rundir): + def banner(self, line): + """Yield text lines to print a banner around a string + + :param line: + Line of text to print a banner around + """ + yield "*" * (len(line) + 4) + yield "* {} *".format(line) + yield "*" * (len(line) + 4) + + +class ExternalCommandPhase(JobPhase): + """ + Phases with a core executing an external command, specified in the config + """ + + def __init__(self, params: TestflingerJobParameters): + super().__init__(params) + # retieve the external command to be executed + self.cmd = self.params.client.config.get(self.phase_id + "_command") + + def go(self) -> bool: + # the phase is "go" if the external command has been specified + if not self.cmd: + logger.info("No %s_command configured, skipping...", self.phase_id) + return False + return True + + def run_core(self) -> Tuple[int, Optional[TestEvent], str]: + # execute the external command + logger.info("Running %s_command: %s", self.phase_id, self.cmd) + try: + # events returned by the runner can only be stop events + exit_code, event, detail = self.runner.run(self.cmd) + if not event: + if exit_code == 0: + # complete, successful run + self.result = JobPhaseResult( + exit_code=exit_code, + event=f"{self.phase_id}_success", + ) + else: + # complete but failed run + self.result = JobPhaseResult( + exit_code=exit_code, + event=f"{self.phase_id}_fail", + ) + else: + # stopped run + self.result = JobPhaseResult( + exit_code=exit_code, event=event, detail=detail + ) + except Exception as error: + # failed phase run due to an exception + logger.exception("%s: %s", type(error).__name__, str(error)) + self.result = JobPhaseResult( + exit_code=100, + event=f"{self.phase_id}_fail", + detail=self.parse_error_logs() + ) + + def parse_error_logs(self): + # [TODO] Move filenames used to pass information across entities to the common module + print("PARSING", str(self.params.rundir / "device-connector-error.json")) + with open(self.params.rundir / "device-connector-error.json", "r") as error_file: + print("OPENED") + error_file_contents = error_file.read() + try: + exception_info = json.loads(error_file_contents)[ + f"{self.phase_id}_exception_info" + ] + if exception_info["exception_cause"] is None: + detail = "%s: %s" % ( + exception_info["exception_name"], + exception_info["exception_message"], + ) + else: + detail = "%s: %s caused by %s" % ( + exception_info["exception_name"], + exception_info["exception_message"], + exception_info["exception_cause"], + ) + return detail + except (json.JSONDecodeError, KeyError): + # [TODO] When do these errors arise? + return "" + + +''' +class UnpackPhase(JobPhase, phase_id=TestPhase.UNPACK): + + # phases for which attachments are allowed + supported_phases = ( + TestPhase.PROVISION, + TestPhase.FIRMWARE_UPDATE, + TestPhase.TEST, + ) + + def register(self): + self.runner.register_stop_condition_checker( + GlobalTimeoutChecker(self.params.get_global_timeout()) + ) + + def go(self) -> bool: + if self.params.job_data.get("attachments_status") != "complete": + # the phase is "go" if attachments have been provided + logger.info("No attachments provided in job data, skipping...") + return False + return True + + def run_core(self) -> Tuple[int, Optional[TestEvent], str]: + try: + self.unpack_attachments() + except Exception as error: + # use the runner to display the error + # (so that the output is also included in the phase results) + for line in f"{type(error).__name__}: {error}".split("\n"): + self.runner.run(f"echo '{line}'") + # propagate the error (`run` uniformly handles fail cases) + raise + return 0, TestEvent.UNPACK_SUCCESS, None + + def secure_filter(self, member, path): + """Combine the `data` filter with custom attachment filtering + + Makes sure that the starting folder for all attachments coincides + with one of the supported phases, i.e. that the attachment archive + has been created properly and no attachment will be extracted to an + unexpected location. + """ + try: + resolved = Path(member.name).resolve().relative_to(Path.cwd()) + except ValueError as error: + # essentially trying to extract higher than the attachments folder + raise tarfile.OutsideDestinationError(member, path) from error + if not str(resolved).startswith( + tuple(f"{phase}/" for phase in self.supported_phases) + ): + # trying to extract in invalid folder under the attachments folder + raise tarfile.OutsideDestinationError(member, path) + return tarfile.data_filter(member, path) + + def unpack_attachments(self): + """Download and unpack the attachments associated with a job""" + job_id = self.params.job_data["job_id"] + + with tempfile.NamedTemporaryFile(suffix=".tar.gz") as archive_tmp: + archive_path = Path(archive_tmp.name) + # download attachment archive + logger.info(f"Downloading attachments for {job_id}") + self.params.client.get_attachments(job_id, path=archive_path) + # extract archive into the attachments folder + logger.info(f"Unpacking attachments for {job_id}") + with tarfile.open(archive_path, "r:gz") as tar: + tar.extractall( + self.params.rundir / ATTACHMENTS_DIR, + filter=self.secure_filter, + ) + + # side effect: remove all attachment data from `self.params.job_data` + # (so there is no interference with existing processes, especially + # provisioning or firmware update, which are triggered when these + # sections are not empty) + for phase in self.supported_phases: + phase_str = f"{phase}_data" + try: + phase_data = self.params.job_data[phase_str] + except KeyError: + pass + else: + # delete attachments, if they exist + phase_data.pop("attachments", None) + # it may be the case that attachments were the only data + # included for this phase, so the phase can now be removed + if not phase_data: + del self.params.job_data[phase_str] +''' + + +class SetupPhase(ExternalCommandPhase, phase_id=TestPhase.SETUP): + + def register(self): + self.runner.register_stop_condition_checker( + GlobalTimeoutChecker(self.params.get_global_timeout()) + ) + + +class FirmwarePhase(ExternalCommandPhase, phase_id=TestPhase.FIRMWARE_UPDATE): + + def go(self) -> bool: + if not super().go(): + return False + # the phase is "go" if the phase data has been provided + if not self.params.job_data.get(f"{self.phase_id}_data"): + logger.info("No %s_data defined in job data, skipping...", self.phase_id) + return False + return True + + def register(self): + self.runner.register_stop_condition_checker( + GlobalTimeoutChecker(self.params.get_global_timeout()) + ) + + +class ProvisionPhase(ExternalCommandPhase, phase_id=TestPhase.PROVISION): + + def go(self) -> bool: + if not super().go(): + return False + # the phase is "go" if the phase data has been provided + if not self.params.job_data.get(f"{self.phase_id}_data"): + logger.info("No %s_data defined in job data, skipping...", self.phase_id) + return False + return True + + def register(self): + self.runner.register_stop_condition_checker( + GlobalTimeoutChecker(self.params.get_global_timeout()) + ) + # Do not allow cancellation during provision for safety reasons + self.runner.register_stop_condition_checker( + JobCancelledChecker( + self.params.client, self.params.job_data["job_id"] + ) + ) + + def process_results(self): + if self.result.exit_code == 46: + # exit code 46 is our indication that recovery failed! + # In this case, we need to mark the device offline + self.mark_device_offline() + self.result = self.result.replace(event=TestEvent.RECOVERY_FAIL) + + self.params.client.post_provision_log( + self.params.job_data["job_id"], self.result.exit_code, self.result.event + ) + + +class TestCommandsPhase(ExternalCommandPhase, phase_id=TestPhase.TEST): + + def go(self) -> bool: + if not super().go(): + return False + # the phase is "go" if the phase data has been provided + if not self.params.job_data.get(f"{self.phase_id}_data"): + logger.info("No %s_data defined in job data, skipping...", self.phase_id) + return False + return True + + def register(self): + self.runner.register_stop_condition_checker( + GlobalTimeoutChecker(self.params.get_global_timeout()) + ) + # We only need to check for output timeouts during the test phase + output_timeout_checker = OutputTimeoutChecker( + self.params.get_output_timeout() + ) + self.runner.register_stop_condition_checker(output_timeout_checker) + self.runner.subscribe_event( + RunnerEvents.OUTPUT_RECEIVED, output_timeout_checker.update + ) + + +class AllocatePhase(ExternalCommandPhase, phase_id=TestPhase.ALLOCATE): + + def go(self) -> bool: + if not super().go(): + return False + # the phase is "go" if the phase data has been provided + # (but no message is logged otherwise) + if not self.params.job_data.get(f"{self.phase_id}_data"): + return False + return True + + def register(self): + self.runner.register_stop_condition_checker( + GlobalTimeoutChecker(self.params.get_global_timeout()) + ) + + def process_results(self): """ Read the json dict from "device-info.json" and send it to the server so that the multi-device agent can find the IP addresses of all subordinate jobs """ - device_info_file = os.path.join(rundir, "device-info.json") + device_info_file = self.params.rundir / "device-info.json" with open(device_info_file, "r") as f: device_info = json.load(f) @@ -176,14 +480,17 @@ def allocate_phase(self, rundir): # device job can't continue while True: try: - self.client.post_result(self.job_id, device_info) + self.params.client.post_result( + self.params.job_data["job_id"], device_info + ) break except TFServerError: logger.warning("Failed to post device_info, retrying...") time.sleep(60) - self.client.post_job_state(self.job_id, "allocated") - + self.params.client.post_job_state( + self.params.job_data["job_id"], JobState.ALLOCATED + ) self.wait_for_completion() def wait_for_completion(self): @@ -191,73 +498,232 @@ def wait_for_completion(self): while True: try: - this_job_state = self.client.check_job_state(self.job_id) - if this_job_state in ("complete", "completed", "cancelled"): + this_job_state = self.params.client.check_job_state( + self.params.job_data["job_id"] + ) + if this_job_state in ( + "complete", + JobState.COMPLETED, + JobState.CANCELLED, + ): logger.info("This job completed, exiting...") break - parent_job_id = self.job_data.get("parent_job_id") + parent_job_id = self.params.job_data.get("parent_job_id") if not parent_job_id: logger.warning("No parent job ID found while allocated") continue - parent_job_state = self.client.check_job_state( - self.job_data.get("parent_job_id") + parent_job_state = self.params.client.check_job_state( + self.params.job_data.get("parent_job_id") ) - if parent_job_state in ("complete", "completed", "cancelled"): + if parent_job_state in ( + "complete", + JobState.COMPLETED, + JobState.CANCELLED, + ): logger.info("Parent job completed, exiting...") break except TFServerError: logger.warning("Failed to get allocated job status, retrying") time.sleep(60) - def _set_truncate(self, f, size=1024 * 1024): - """Set up an open file so that we don't read more than a specified - size. We want to read from the end of the file rather than the - beginning. Write a warning at the end of the file if it was too big. - :param f: - The file object, which should be opened for read/write - :param size: - Maximum number of bytes we want to allow from reading the file - """ - end = f.seek(0, 2) - if end > size: - f.write("\nWARNING: File has been truncated due to length!") - f.seek(end - size, 0) - else: - f.seek(0, 0) +class ReservePhase(ExternalCommandPhase, phase_id=TestPhase.RESERVE): - def get_global_timeout(self): - """Get the global timeout for the test run in seconds""" - # Default timeout is 4 hours - default_timeout = 4 * 60 * 60 + def go(self) -> bool: + if not super().go(): + return False + # the phase is "go" if the phase data has been provided + # (but no message is logged otherwise) + if not self.params.job_data.get(f"{self.phase_id}_data"): + return False + return True - # Don't exceed the maximum timeout configured for the device! - return min( - self.job_data.get("global_timeout", default_timeout), - self.client.config.get("global_timeout", default_timeout), + def register(self): + # Reserve phase uses a separate timeout handler + pass + + +class CleanupPhase(ExternalCommandPhase, phase_id=TestPhase.CLEANUP): + + def register(self): + self.runner.register_stop_condition_checker( + GlobalTimeoutChecker(self.params.get_global_timeout()) ) - def get_output_timeout(self): - """Get the output timeout for the test run in seconds""" - # Default timeout is 15 minutes - default_timeout = 15 * 60 - # Don't exceed the maximum timeout configured for the device! - return min( - self.job_data.get("output_timeout", default_timeout), - self.client.config.get("output_timeout", default_timeout), +class TestflingerJob: + + phase_sequence = ( + TestPhase.SETUP, + TestPhase.PROVISION, + TestPhase.FIRMWARE_UPDATE, + TestPhase.TEST, + TestPhase.ALLOCATE, + TestPhase.RESERVE, + ) + + phase_cls_map = { + TestPhase.SETUP: SetupPhase, + TestPhase.PROVISION: ProvisionPhase, + TestPhase.FIRMWARE_UPDATE: FirmwarePhase, + TestPhase.TEST: TestCommandsPhase, + TestPhase.ALLOCATE: AllocatePhase, + TestPhase.RESERVE: ReservePhase, + TestPhase.CLEANUP: CleanupPhase, + } + + def __init__(self, job_data: dict, client: TestflingerClient): + """ + :param job_data: + Dictionary containing data for the test job_data + :param client: + Testflinger client object for communicating with the server + :param rundir: + Directory in which to run the command defined for the phase + """ + self.job_id = job_data["job_id"] + + rundir = Path(client.config.get("execution_basedir")) / self.job_id + rundir.mkdir() + # specify directories and result files + # Dump the job data to testflinger.json in our execution dir + with open(rundir / "testflinger.json", "w") as f: + json.dump(job_data, f) + # Create json outcome file where phases will store their output + with open(rundir / "testflinger-outcome.json", "w") as f: + json.dump({}, f) + # Clear error log before starting + with open(rundir / "device-connector-error.json", "w") as f: + pass + + # bundle all necessary job parameters into `self.params` so that + # the job phases are only passed a single reference + self.params = TestflingerJobParameters(job_data, client, Path(rundir)) + self.current_phase = None + self.end_phase = None + self.end_reason = TestEvent.NORMAL_EXIT + self.emitter = EventEmitter( + job_data["job_queue"], + job_data.get("job_status_webhook"), + client, + job_data["job_id"], + ) + self.phases = { + phase_id: phase_cls(self.params) + for phase_id, phase_cls in self.phase_cls_map.items() + } + + def start(self): + logger.info("Starting job %s", self.job_id) + self.emitter.emit_event( + TestEvent.JOB_START, + f"{self.params.client.server}/jobs/{self.job_id}", ) - def banner(self, line): - """Yield text lines to print a banner around a sting + def check_end(self): + phase = self.phases[self.current_phase] + if phase.result.exitcode and self.current_phase != TestPhase.TEST: + logger.debug("Phase %s failed, aborting job" % phase) + self.end_reason = phase.result.event - :param line: - Line of text to print a banner around + def end(self): + logger.info("Ending job %s", self.job_id) + self.event_emitter.emit_event(TestEvent.JOB_END, self.end_reason) + + def check_cancel(self): + return ( + self.params.client.check_job_state(self.job_id) + == JobState.CANCELLED + ) + + def cancel(self): + logger.info("Job cancellation was requested, exiting.") + self.emitter.emit_event(TestEvent.CANCELLED) + + def go(self, phase_id: TestPhase) -> bool: + return self.phases[phase_id].go() + + def run(self, phase_id: TestPhase): + """Run the specified test phase in rundir + + :param phase: + Name of the test phase (setup, provision, test, ...) + :return: + Returncode from the command that was executed, 0 will be returned + if there was no command to run """ - yield "*" * (len(line) + 4) - yield "* {} *".format(line) - yield "*" * (len(line) + 4) + self.current_phase = phase_id + self.params.client.post_job_state(self.job_id, phase_id) + self.emitter.emit_event(TestEvent(f"{phase_id}_start")) + phase = self.phases[phase_id] + phase.run() + # self.params.client.post_influx(phase.id, phase.result.exit_code) + self.emitter.emit_event(phase.result.event, phase.result.detail) + + def secure_filter(self, member, path): + """Combine the `data` filter with custom attachment filtering + + Makes sure that the starting folder for all attachments coincides + with one of the supported phases, i.e. that the attachment archive + has been created properly and no attachment will be extracted to an + unexpected location. + """ + try: + resolved = Path(member.name).resolve().relative_to(Path.cwd()) + except ValueError as error: + # essentially trying to extract higher than the attachments folder + raise tarfile.OutsideDestinationError(member, path) from error + if not str(resolved).startswith( + tuple( + f"{phase}/" + for phase in ( + TestPhase.PROVISION, + TestPhase.FIRMWARE_UPDATE, + TestPhase.TEST, + ) + ) + ): + # trying to extract in invalid folder under the attachments folder + raise tarfile.OutsideDestinationError(member, path) + return tarfile.data_filter(member, path) + + def check_attachments(self): + return self.params.job_data.get("attachments_status") == "complete" + + def unpack_attachments(self): + """Download and unpack the attachments associated with a job""" + + with tempfile.NamedTemporaryFile(suffix=".tar.gz") as archive_tmp: + archive_path = Path(archive_tmp.name) + # download attachment archive + logger.info(f"Downloading attachments for {self.job_id}") + self.params.client.get_attachments(self.job_id, path=archive_path) + # extract archive into the attachments folder + logger.info(f"Unpacking attachments for {self.job_id}") + with tarfile.open(archive_path, "r:gz") as tar: + tar.extractall( + self.params.rundir / ATTACHMENTS_DIR, + filter=self.secure_filter, + ) + + # side effect: remove all attachment data from `self.params.job_data` + # (so there is no interference with existing processes, especially + # provisioning or firmware update, which are triggered when these + # sections are not empty) + for phase in self.supported_phases: + phase_str = f"{phase}_data" + try: + phase_data = self.params.job_data[phase_str] + except KeyError: + pass + else: + # delete attachments, if they exist + phase_data.pop("attachments", None) + # it may be the case that attachments were the only data + # included for this phase, so the phase can now be removed + if not phase_data: + del self.params.job_data[phase_str] def set_nonblock(fd): @@ -271,3 +737,21 @@ def set_nonblock(fd): # moving it if it gets wider use in the future fl = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) + + +def set_truncate(f, size=1024 * 1024): + """Set up an open file so that we don't read more than a specified + size. We want to read from the end of the file rather than the + beginning. Write a warning at the end of the file if it was too big. + + :param f: + The file object, which should be opened for read/write + :param size: + Maximum number of bytes we want to allow from reading the file + """ + end = f.seek(0, 2) + if end > size: + f.write("\nWARNING: File has been truncated due to length!") + f.seek(end - size, 0) + else: + f.seek(0, 0) diff --git a/agent/testflinger_agent/runner.py b/agent/testflinger_agent/runner.py index 3fc067de..0b638fde 100644 --- a/agent/testflinger_agent/runner.py +++ b/agent/testflinger_agent/runner.py @@ -153,7 +153,6 @@ def run(self, cmd: str) -> Tuple[int, Optional[TestEvent], str]: self.cleanup() if stop_reason == "": stop_reason = get_stop_reason(self.process.returncode, "") - return self.process.returncode, stop_event, stop_reason diff --git a/agent/testflinger_agent/tests/test_job.py b/agent/testflinger_agent/tests/test_job.py index 83fb8e67..29e2b9b8 100644 --- a/agent/testflinger_agent/tests/test_job.py +++ b/agent/testflinger_agent/tests/test_job.py @@ -1,14 +1,17 @@ import os import pytest +import re import shutil import tempfile +import uuid import requests_mock as rmock from unittest.mock import patch import testflinger_agent from testflinger_agent.client import TestflingerClient as _TestflingerClient -from testflinger_agent.job import TestflingerJob as _TestflingerJob +from testflinger_common.enums import TestPhase +from testflinger_agent.job import TestflingerJob as _TestflingerJob, set_truncate from testflinger_agent.runner import CommandRunner from testflinger_agent.handlers import LogUpdateHandler from testflinger_agent.stop_condition_checkers import ( @@ -43,14 +46,24 @@ def test_skip_missing_data(self, client, phase): """ Test that optional phases are skipped when the data is missing """ - fake_job_data = {"global_timeout": 1, "provision_data": ""} + job_id = str(uuid.uuid1()) + fake_job_data = { + "job_id": job_id, + "job_queue": "", + "global_timeout": 1, + "provision_data": "", + } job = _TestflingerJob(fake_job_data, client) - self.config[f"{phase}_command"] = "/bin/true" - return_value, exit_event, exit_reason = job.run_test_phase(phase, None) - assert return_value == 0 - assert exit_event is None - assert exit_reason is None + assert not job.go(phase) + + ''' + with rmock.Mocker() as mocker: + # mock response to result submissions + #mocker.post(re.compile(r"/v1/result/"), status_code=200) + #mocker.post(re.compile(r"/v1/result/[-a-z0-9]+/output"), status_code=200) + #mocker.get(re.compile(r"/v1/result/[-a-z0-9]+"), status_code=200) + ''' @pytest.mark.parametrize( "phase", ["setup", "provision", "test", "allocate", "reserve"] @@ -60,22 +73,23 @@ def test_skip_empty_provision_data(self, client, phase): Test that phases are skipped when there is no command configured """ self.config[f"{phase}_command"] = "" - fake_job_data = {"global_timeout": 1, f"{phase}_data": "foo"} + job_id = str(uuid.uuid1()) + fake_job_data = { + "job_id": job_id, + "job_queue": "", + "global_timeout": 1, + f"{phase}_data": "foo", + } job = _TestflingerJob(fake_job_data, client) - return_value, exit_event, exit_reason = job.run_test_phase(phase, None) - assert return_value == 0 - assert exit_event is None - assert exit_reason is None + assert not job.go(phase) def test_job_global_timeout(self, tmp_path): """Test that timeout from job_data is respected""" timeout_str = "ERROR: Global timeout reached! (1s)" logfile = tmp_path / "testlog" runner = CommandRunner(tmp_path, env={}) - log_handler = LogUpdateHandler(logfile) - runner.register_output_handler(log_handler) - global_timeout_checker = GlobalTimeoutChecker(1) - runner.register_stop_condition_checker(global_timeout_checker) + runner.register_output_handler(LogUpdateHandler(logfile)) + runner.register_stop_condition_checker(GlobalTimeoutChecker(1)) exit_code, exit_event, exit_reason = runner.run("sleep 12") with open(logfile) as log: log_data = log.read() @@ -87,9 +101,14 @@ def test_job_global_timeout(self, tmp_path): def test_config_global_timeout(self, client): """Test that timeout from device config is preferred""" self.config["global_timeout"] = 1 - fake_job_data = {"global_timeout": 3} + job_id = str(uuid.uuid1()) + fake_job_data = { + "job_id": job_id, + "job_queue": "", + "global_timeout": 3, + } job = _TestflingerJob(fake_job_data, client) - timeout = job.get_global_timeout() + timeout = job.params.get_global_timeout() assert timeout == 1 def test_job_output_timeout(self, tmp_path): @@ -97,10 +116,8 @@ def test_job_output_timeout(self, tmp_path): timeout_str = "ERROR: Output timeout reached! (1s)" logfile = tmp_path / "testlog" runner = CommandRunner(tmp_path, env={}) - log_handler = LogUpdateHandler(logfile) - runner.register_output_handler(log_handler) - output_timeout_checker = OutputTimeoutChecker(1) - runner.register_stop_condition_checker(output_timeout_checker) + runner.register_output_handler(LogUpdateHandler(logfile)) + runner.register_stop_condition_checker(OutputTimeoutChecker(1)) # unfortunately, we need to sleep for longer that 10 seconds here # or else we fall under the polling time exit_code, exit_event, exit_reason = runner.run("sleep 12") @@ -114,9 +131,14 @@ def test_job_output_timeout(self, tmp_path): def test_config_output_timeout(self, client): """Test that output timeout from device config is preferred""" self.config["output_timeout"] = 1 - fake_job_data = {"output_timeout": 3} + job_id = str(uuid.uuid1()) + fake_job_data = { + "job_id": job_id, + "job_queue": "", + "output_timeout": 3, + } job = _TestflingerJob(fake_job_data, client) - timeout = job.get_output_timeout() + timeout = job.params.get_output_timeout() assert timeout == 1 def test_no_output_timeout_in_provision( @@ -124,24 +146,28 @@ def test_no_output_timeout_in_provision( ): """Test that output timeout is ignored when not in test phase""" timeout_str = "complete\n" - logfile = tmp_path / "provision.log" - fake_job_data = {"output_timeout": 1, "provision_data": {"url": "foo"}} + job_id = str(uuid.uuid1()) + fake_job_data = { + "job_id": job_id, + "job_queue": "", + "output_timeout": 1, + "provision_data": {"url": "foo"}, + } + self.config["execution_basedir"] = tmp_path + # unfortunately, we need to sleep for longer that 10 seconds here + # or else we fall under the polling time self.config["provision_command"] = ( "bash -c 'sleep 12 && echo complete'" ) - requests_mock.post(rmock.ANY, status_code=200) job = _TestflingerJob(fake_job_data, client) - job.phase = "provision" - - # create the outcome file since we bypassed that - with open(tmp_path / "testflinger-outcome.json", "w") as outcome_file: - outcome_file.write("{}") - - # unfortunately, we need to sleep for longer that 10 seconds here - # or else we fall under the polling time - # job.run_with_log("sleep 12 && echo complete", logfile) - job.run_test_phase("provision", tmp_path) - with open(logfile) as log: + with rmock.Mocker() as mocker: + # mock response to result requests + mocker.get(re.compile(r"/v1/result/"), status_code=200) + mocker.post(re.compile(r"/v1/result/"), status_code=200) + mocker.post(re.compile(r"/v1/agents/provision_logs/"), status_code=200) + job.run(TestPhase.PROVISION) + + with open(job.phases[TestPhase.PROVISION].output_log) as log: log_data = log.read() assert timeout_str in log_data @@ -152,41 +178,41 @@ def test_run_test_phase_with_run_exception( Test that job.run_test_phase() exits with 100 so that it has some non-zero value if CommandRunner.run() raises an exception """ - - # create the outcome file since we bypassed that - with open(tmp_path / "testflinger-outcome.json", "w") as outcome_file: - outcome_file.write("{}") - self.config["setup_command"] = "fake_setup_command" - requests_mock.post(rmock.ANY, status_code=200) - job = _TestflingerJob({}, client) - job.phase = "setup" + job_id = str(uuid.uuid1()) + job = _TestflingerJob({"job_id": job_id, "job_queue": ""}, client) + # Don't raise the exception on the 3 banner lines with patch( "testflinger_agent.job.CommandRunner.run", side_effect=[None, None, None, Exception("failed")], ): - exit_code, exit_event, exit_reason = job.run_test_phase( - "setup", tmp_path - ) + with rmock.Mocker() as mocker: + # mock response to result requests + mocker.get(re.compile(r"/v1/result/"), status_code=200) + mocker.post(re.compile(r"/v1/result/"), status_code=200) + job.run(TestPhase.SETUP) + + exit_code, exit_event, exit_reason = job.phases[TestPhase.SETUP].result assert exit_code == 100 - assert exit_event == "setup_fail" + assert exit_event == TestEvent.SETUP_FAIL assert exit_reason == "failed" def test_set_truncate(self, client): """Test the _set_truncate method of TestflingerJob""" - job = _TestflingerJob({}, client) + job_id = str(uuid.uuid1()) + job = _TestflingerJob({"job_id": job_id, "job_queue": ""}, client) with tempfile.TemporaryFile(mode="r+") as f: # First check that a small file doesn't get truncated f.write("x" * 100) - job._set_truncate(f, size=100) + set_truncate(f, size=100) contents = f.read() assert len(contents) == 100 assert "WARNING" not in contents # Now check that a larger file does get truncated f.write("x" * 100) - job._set_truncate(f, size=100) + set_truncate(f, size=100) contents = f.read() # It won't be exactly 100 bytes, because a warning is added assert len(contents) < 150 @@ -198,7 +224,10 @@ def test_wait_for_completion(self, client): # Make sure we return "completed" for the parent job state client.check_job_state = lambda _: "completed" - - job = _TestflingerJob({"parent_job_id": "999"}, client) - job.wait_for_completion() + job_id = str(uuid.uuid1()) + parent_job_id = str(uuid.uuid1()) + job = _TestflingerJob( + {"job_id": job_id, "parent_job_id": parent_job_id, "job_queue": ""}, client + ) + job.phases[TestPhase.ALLOCATE].wait_for_completion() # No assertions needed, just make sure we don't timeout From 035bd5c8d61e9ee3558779b9bf3a795eaf536503 Mon Sep 17 00:00:00 2001 From: George Boukeas Date: Fri, 4 Oct 2024 17:35:13 +0100 Subject: [PATCH 02/12] fix: required changes to pass tests --- agent/testflinger_agent/agent.py | 22 +++-- agent/testflinger_agent/job.py | 77 +++++++++------ agent/testflinger_agent/tests/test_agent.py | 101 ++++++++++---------- agent/testflinger_agent/tests/test_job.py | 28 +++--- 4 files changed, 123 insertions(+), 105 deletions(-) diff --git a/agent/testflinger_agent/agent.py b/agent/testflinger_agent/agent.py index 5e0d97af..ec69f46e 100644 --- a/agent/testflinger_agent/agent.py +++ b/agent/testflinger_agent/agent.py @@ -16,6 +16,7 @@ import os import shutil import signal +import sys from testflinger_agent.job import TestflingerJob from testflinger_agent.errors import TFServerError @@ -125,9 +126,8 @@ def process_jobs(self): self.check_restart() job_data = self.client.check_jobs() while job_data: + job_id = job_data["job_id"] try: - job_id = job_data["job_id"] - # Create the job job = TestflingerJob(job_data, self.client) @@ -152,22 +152,28 @@ def process_jobs(self): job.run(phase) if job.check_end(): break - - except Exception as e: - logger.exception(e) + except Exception as error: + logger.exception(f"{type(error).__name__}: {error}") finally: + phase = TestPhase.CLEANUP # Always run the cleanup, even if the job was cancelled - job.run(TestPhase.CLEANUP) + if job.go(phase): + self.set_agent_state(phase) + job.run(phase) # let the server know the agent is available (clear job id) job.end() self.client.post_agent_data({"job_id": ""}) - if job.phases[TestPhase.PROVISIONING].result.exit_status == 46: + provision_result = job.phases[TestPhase.PROVISION].result + if ( + provision_result is not None + and provision_result.exit_code == 46 + ): self.mark_device_offline() try: - self.client.transmit_job_outcome(job.params.rundir) + self.client.transmit_job_outcome(str(job.params.rundir)) except Exception as e: # TFServerError will happen if we get other-than-good status # Other errors can happen too for things like connection diff --git a/agent/testflinger_agent/job.py b/agent/testflinger_agent/job.py index 520d61cf..bd13ab9b 100644 --- a/agent/testflinger_agent/job.py +++ b/agent/testflinger_agent/job.py @@ -141,6 +141,8 @@ def process_results(self): def run(self) -> Tuple[int, Optional[TestEvent], str]: """This is the generic structure of every phase""" + assert self.go() + # register the output handlers with the runner self.runner.register_output_handler(LogUpdateHandler(self.output_log)) self.runner.register_output_handler( @@ -215,44 +217,50 @@ def go(self) -> bool: return False return True - def run_core(self) -> Tuple[int, Optional[TestEvent], str]: + def run_core(self): # execute the external command logger.info("Running %s_command: %s", self.phase_id, self.cmd) try: - # events returned by the runner can only be stop events exit_code, event, detail = self.runner.run(self.cmd) - if not event: - if exit_code == 0: - # complete, successful run + if exit_code == 0: + # complete, successful run + self.result = JobPhaseResult( + exit_code=exit_code, + event=f"{self.phase_id}_success", + ) + else: + # [NOTE] This is a deviation from the current approach + # where a separate event is emitted when stop events are + # returned from the runner and then a fail event on top + # Here we *only* emit the stop event + # self.emitter.emit_event(event, detail) + if event: self.result = JobPhaseResult( exit_code=exit_code, - event=f"{self.phase_id}_success", + event=event, + detail=detail, ) else: - # complete but failed run self.result = JobPhaseResult( exit_code=exit_code, event=f"{self.phase_id}_fail", + detail=self.parse_error_logs(), ) - else: - # stopped run - self.result = JobPhaseResult( - exit_code=exit_code, event=event, detail=detail - ) except Exception as error: # failed phase run due to an exception - logger.exception("%s: %s", type(error).__name__, str(error)) + detail = f"{type(error).__name__}: {error}" + logger.exception(detail) self.result = JobPhaseResult( exit_code=100, event=f"{self.phase_id}_fail", - detail=self.parse_error_logs() + detail=detail, ) def parse_error_logs(self): - # [TODO] Move filenames used to pass information across entities to the common module - print("PARSING", str(self.params.rundir / "device-connector-error.json")) - with open(self.params.rundir / "device-connector-error.json", "r") as error_file: - print("OPENED") + # [TODO] Move filenames used to pass information to the common module + with open( + self.params.rundir / "device-connector-error.json", "r" + ) as error_file: error_file_contents = error_file.read() try: exception_info = json.loads(error_file_contents)[ @@ -297,7 +305,7 @@ def go(self) -> bool: return False return True - def run_core(self) -> Tuple[int, Optional[TestEvent], str]: + def run_core(self): try: self.unpack_attachments() except Exception as error: @@ -381,7 +389,9 @@ def go(self) -> bool: return False # the phase is "go" if the phase data has been provided if not self.params.job_data.get(f"{self.phase_id}_data"): - logger.info("No %s_data defined in job data, skipping...", self.phase_id) + logger.info( + "No %s_data defined in job data, skipping...", self.phase_id + ) return False return True @@ -398,7 +408,9 @@ def go(self) -> bool: return False # the phase is "go" if the phase data has been provided if not self.params.job_data.get(f"{self.phase_id}_data"): - logger.info("No %s_data defined in job data, skipping...", self.phase_id) + logger.info( + "No %s_data defined in job data, skipping...", self.phase_id + ) return False return True @@ -417,11 +429,12 @@ def process_results(self): if self.result.exit_code == 46: # exit code 46 is our indication that recovery failed! # In this case, we need to mark the device offline - self.mark_device_offline() - self.result = self.result.replace(event=TestEvent.RECOVERY_FAIL) + self.result = self.result._replace(event=TestEvent.RECOVERY_FAIL) self.params.client.post_provision_log( - self.params.job_data["job_id"], self.result.exit_code, self.result.event + self.params.job_data["job_id"], + self.result.exit_code, + self.result.event, ) @@ -432,7 +445,9 @@ def go(self) -> bool: return False # the phase is "go" if the phase data has been provided if not self.params.job_data.get(f"{self.phase_id}_data"): - logger.info("No %s_data defined in job data, skipping...", self.phase_id) + logger.info( + "No %s_data defined in job data, skipping...", self.phase_id + ) return False return True @@ -621,17 +636,19 @@ def start(self): f"{self.params.client.server}/jobs/{self.job_id}", ) - def check_end(self): + def check_end(self) -> bool: phase = self.phases[self.current_phase] - if phase.result.exitcode and self.current_phase != TestPhase.TEST: + if phase.result.exit_code and self.current_phase != TestPhase.TEST: logger.debug("Phase %s failed, aborting job" % phase) self.end_reason = phase.result.event + return True + return False def end(self): logger.info("Ending job %s", self.job_id) - self.event_emitter.emit_event(TestEvent.JOB_END, self.end_reason) + self.emitter.emit_event(TestEvent.JOB_END, self.end_reason) - def check_cancel(self): + def check_cancel(self) -> bool: return ( self.params.client.check_job_state(self.job_id) == JobState.CANCELLED @@ -688,7 +705,7 @@ def secure_filter(self, member, path): raise tarfile.OutsideDestinationError(member, path) return tarfile.data_filter(member, path) - def check_attachments(self): + def check_attachments(self) -> bool: return self.params.job_data.get("attachments_status") == "complete" def unpack_attachments(self): diff --git a/agent/testflinger_agent/tests/test_agent.py b/agent/testflinger_agent/tests/test_agent.py index a0aee422..00287b69 100644 --- a/agent/testflinger_agent/tests/test_agent.py +++ b/agent/testflinger_agent/tests/test_agent.py @@ -12,10 +12,11 @@ from mock import patch import testflinger_agent +from testflinger_agent.agent import TestflingerAgent as _TestflingerAgent +from testflinger_agent.client import TestflingerClient as _TestflingerClient from testflinger_agent.config import ATTACHMENTS_DIR from testflinger_agent.errors import TFServerError -from testflinger_agent.client import TestflingerClient as _TestflingerClient -from testflinger_agent.agent import TestflingerAgent as _TestflingerAgent +from testflinger_agent.job import TestflingerJob, JobPhaseResult from testflinger_common.enums import TestPhase, TestEvent @@ -429,8 +430,10 @@ def test_post_agent_status_update(self, agent, requests_mock): event_name_list = [event["event_name"] for event in event_list] expected_event_name_list = [ phase.value + postfix - for phase in TestPhase + for phase in TestflingerJob.phase_sequence for postfix in ["_start", "_success"] + if f"{phase}_data" in fake_job_data + and f"{phase}_command" in self.config ] expected_event_name_list.insert(0, "job_start") expected_event_name_list.append("job_end") @@ -587,12 +590,12 @@ def test_post_provision_log_fail(self, agent, requests_mock): def test_provision_error_in_event_detail(self, agent, requests_mock): """Tests provision log error messages in event log detail field""" - self.config["test_command"] = "echo test1" + self.config["provision_command"] = "echo provision1" job_id = str(uuid.uuid1()) fake_job_data = { "job_id": job_id, "job_queue": "test", - "test_data": {"test_cmds": "foo"}, + "provision_data": {"url": "foo"}, "job_status_webhook": "https://mywebhook", } requests_mock.get( @@ -610,29 +613,25 @@ def test_provision_error_in_event_detail(self, agent, requests_mock): } } - with patch("shutil.rmtree"): - with patch( - "testflinger_agent.agent.TestflingerJob.run_test_phase" - ) as mock_run_test_phase: - - def run_test_phase_side_effect(phase, rundir): - if phase == "provision": - provision_log_path = os.path.join( - rundir, "device-connector-error.json" - ) - with open( - provision_log_path, "w" - ) as provision_log_file: - provision_log_file.write( - json.dumps(provision_exception_info) - ) - provision_log_file.close() - return 99, None, "" - else: - return 0, None, "" - - mock_run_test_phase.side_effect = run_test_phase_side_effect - agent.process_jobs() + # patch + def run_core_provision_patch(self): + provision_log_path = self.params.rundir / "device-connector-error.json" + with open(provision_log_path, "w") as provision_log_file: + provision_log_file.write(json.dumps(provision_exception_info)) + self.result = JobPhaseResult( + exit_code=99, + event=f"{self.phase_id}_fail", + detail=self.parse_error_logs(), + ) + + with ( + patch("shutil.rmtree"), + patch( + "testflinger_agent.job.ProvisionPhase.run", + run_core_provision_patch, + ) + ): + agent.process_jobs() status_update_requests = list( filter( @@ -656,12 +655,12 @@ def run_test_phase_side_effect(phase, rundir): def test_provision_error_no_cause(self, agent, requests_mock): """Tests provision log error messages for exceptions with no cause""" - self.config["test_command"] = "echo test1" + self.config["provision_command"] = "echo provision1" job_id = str(uuid.uuid1()) fake_job_data = { "job_id": job_id, "job_queue": "test", - "test_data": {"test_cmds": "foo"}, + "provision_data": {"url": "foo"}, "job_status_webhook": "https://mywebhook", } requests_mock.get( @@ -679,29 +678,25 @@ def test_provision_error_no_cause(self, agent, requests_mock): } } - with patch("shutil.rmtree"): - with patch( - "testflinger_agent.agent.TestflingerJob.run_test_phase" - ) as mock_run_test_phase: - - def run_test_phase_side_effect(phase, rundir): - if phase == "provision": - provision_log_path = os.path.join( - rundir, "device-connector-error.json" - ) - with open( - provision_log_path, "w" - ) as provision_log_file: - provision_log_file.write( - json.dumps(provision_exception_info) - ) - provision_log_file.close() - return 99, None, "" - else: - return 0, None, "" - - mock_run_test_phase.side_effect = run_test_phase_side_effect - agent.process_jobs() + # patch + def run_core_provision_patch(self): + provision_log_path = self.params.rundir / "device-connector-error.json" + with open(provision_log_path, "w") as provision_log_file: + provision_log_file.write(json.dumps(provision_exception_info)) + self.result = JobPhaseResult( + exit_code=99, + event=f"{self.phase_id}_fail", + detail=self.parse_error_logs(), + ) + + with ( + patch("shutil.rmtree"), + patch( + "testflinger_agent.job.ProvisionPhase.run", + run_core_provision_patch, + ) + ): + agent.process_jobs() status_update_requests = list( filter( diff --git a/agent/testflinger_agent/tests/test_job.py b/agent/testflinger_agent/tests/test_job.py index 29e2b9b8..962c6fef 100644 --- a/agent/testflinger_agent/tests/test_job.py +++ b/agent/testflinger_agent/tests/test_job.py @@ -11,7 +11,10 @@ import testflinger_agent from testflinger_agent.client import TestflingerClient as _TestflingerClient from testflinger_common.enums import TestPhase -from testflinger_agent.job import TestflingerJob as _TestflingerJob, set_truncate +from testflinger_agent.job import ( + TestflingerJob as _TestflingerJob, + set_truncate, +) from testflinger_agent.runner import CommandRunner from testflinger_agent.handlers import LogUpdateHandler from testflinger_agent.stop_condition_checkers import ( @@ -57,14 +60,6 @@ def test_skip_missing_data(self, client, phase): self.config[f"{phase}_command"] = "/bin/true" assert not job.go(phase) - ''' - with rmock.Mocker() as mocker: - # mock response to result submissions - #mocker.post(re.compile(r"/v1/result/"), status_code=200) - #mocker.post(re.compile(r"/v1/result/[-a-z0-9]+/output"), status_code=200) - #mocker.get(re.compile(r"/v1/result/[-a-z0-9]+"), status_code=200) - ''' - @pytest.mark.parametrize( "phase", ["setup", "provision", "test", "allocate", "reserve"] ) @@ -164,7 +159,9 @@ def test_no_output_timeout_in_provision( # mock response to result requests mocker.get(re.compile(r"/v1/result/"), status_code=200) mocker.post(re.compile(r"/v1/result/"), status_code=200) - mocker.post(re.compile(r"/v1/agents/provision_logs/"), status_code=200) + mocker.post( + re.compile(r"/v1/agents/provision_logs/"), status_code=200 + ) job.run(TestPhase.PROVISION) with open(job.phases[TestPhase.PROVISION].output_log) as log: @@ -196,12 +193,10 @@ def test_run_test_phase_with_run_exception( exit_code, exit_event, exit_reason = job.phases[TestPhase.SETUP].result assert exit_code == 100 assert exit_event == TestEvent.SETUP_FAIL - assert exit_reason == "failed" + assert exit_reason == "Exception: failed" def test_set_truncate(self, client): """Test the _set_truncate method of TestflingerJob""" - job_id = str(uuid.uuid1()) - job = _TestflingerJob({"job_id": job_id, "job_queue": ""}, client) with tempfile.TemporaryFile(mode="r+") as f: # First check that a small file doesn't get truncated f.write("x" * 100) @@ -227,7 +222,12 @@ def test_wait_for_completion(self, client): job_id = str(uuid.uuid1()) parent_job_id = str(uuid.uuid1()) job = _TestflingerJob( - {"job_id": job_id, "parent_job_id": parent_job_id, "job_queue": ""}, client + { + "job_id": job_id, + "parent_job_id": parent_job_id, + "job_queue": "", + }, + client, ) job.phases[TestPhase.ALLOCATE].wait_for_completion() # No assertions needed, just make sure we don't timeout From b343edb2555478bed08b3f8ad59507320005f050 Mon Sep 17 00:00:00 2001 From: George Boukeas Date: Fri, 4 Oct 2024 20:55:22 +0100 Subject: [PATCH 03/12] fix: formatting (black) --- agent/testflinger_agent/agent.py | 4 ++-- agent/testflinger_agent/runner.py | 1 + agent/testflinger_agent/tests/test_agent.py | 14 +++++++++----- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/agent/testflinger_agent/agent.py b/agent/testflinger_agent/agent.py index ec69f46e..7a5dac11 100644 --- a/agent/testflinger_agent/agent.py +++ b/agent/testflinger_agent/agent.py @@ -20,7 +20,7 @@ from testflinger_agent.job import TestflingerJob from testflinger_agent.errors import TFServerError -from testflinger_common.enums import JobState, TestPhase +from testflinger_common.enums import JobState, TestPhase, TestEvent logger = logging.getLogger(__name__) @@ -168,7 +168,7 @@ def process_jobs(self): provision_result = job.phases[TestPhase.PROVISION].result if ( provision_result is not None - and provision_result.exit_code == 46 + and provision_result.event == TestEvent.RECOVERY_FAIL ): self.mark_device_offline() diff --git a/agent/testflinger_agent/runner.py b/agent/testflinger_agent/runner.py index 0b638fde..3fc067de 100644 --- a/agent/testflinger_agent/runner.py +++ b/agent/testflinger_agent/runner.py @@ -153,6 +153,7 @@ def run(self, cmd: str) -> Tuple[int, Optional[TestEvent], str]: self.cleanup() if stop_reason == "": stop_reason = get_stop_reason(self.process.returncode, "") + return self.process.returncode, stop_event, stop_reason diff --git a/agent/testflinger_agent/tests/test_agent.py b/agent/testflinger_agent/tests/test_agent.py index 00287b69..bdfcaa2f 100644 --- a/agent/testflinger_agent/tests/test_agent.py +++ b/agent/testflinger_agent/tests/test_agent.py @@ -17,7 +17,7 @@ from testflinger_agent.config import ATTACHMENTS_DIR from testflinger_agent.errors import TFServerError from testflinger_agent.job import TestflingerJob, JobPhaseResult -from testflinger_common.enums import TestPhase, TestEvent +from testflinger_common.enums import TestEvent class TestClient: @@ -615,7 +615,9 @@ def test_provision_error_in_event_detail(self, agent, requests_mock): # patch def run_core_provision_patch(self): - provision_log_path = self.params.rundir / "device-connector-error.json" + provision_log_path = ( + self.params.rundir / "device-connector-error.json" + ) with open(provision_log_path, "w") as provision_log_file: provision_log_file.write(json.dumps(provision_exception_info)) self.result = JobPhaseResult( @@ -629,7 +631,7 @@ def run_core_provision_patch(self): patch( "testflinger_agent.job.ProvisionPhase.run", run_core_provision_patch, - ) + ), ): agent.process_jobs() @@ -680,7 +682,9 @@ def test_provision_error_no_cause(self, agent, requests_mock): # patch def run_core_provision_patch(self): - provision_log_path = self.params.rundir / "device-connector-error.json" + provision_log_path = ( + self.params.rundir / "device-connector-error.json" + ) with open(provision_log_path, "w") as provision_log_file: provision_log_file.write(json.dumps(provision_exception_info)) self.result = JobPhaseResult( @@ -694,7 +698,7 @@ def run_core_provision_patch(self): patch( "testflinger_agent.job.ProvisionPhase.run", run_core_provision_patch, - ) + ), ): agent.process_jobs() From 8f202c5a1503e58bc1fcffbb5d8fde7468a2b253 Mon Sep 17 00:00:00 2001 From: George Boukeas Date: Fri, 4 Oct 2024 21:20:20 +0100 Subject: [PATCH 04/12] fix: Python 3.8 compatibility issues --- agent/testflinger_agent/agent.py | 3 ++- agent/testflinger_agent/tests/test_agent.py | 20 ++++++++------------ 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/agent/testflinger_agent/agent.py b/agent/testflinger_agent/agent.py index 7a5dac11..e85acc3b 100644 --- a/agent/testflinger_agent/agent.py +++ b/agent/testflinger_agent/agent.py @@ -180,7 +180,8 @@ def process_jobs(self): # problems logger.exception(e) results_basedir = self.client.config.get("results_basedir") - shutil.move(job.params.rundir, results_basedir) + # [NOTE] In Python 3.8 `shutil.move`` still expects strings + shutil.move(str(job.params.rundir), results_basedir) self.set_agent_state(JobState.WAITING) self.check_restart() diff --git a/agent/testflinger_agent/tests/test_agent.py b/agent/testflinger_agent/tests/test_agent.py index bdfcaa2f..4079b906 100644 --- a/agent/testflinger_agent/tests/test_agent.py +++ b/agent/testflinger_agent/tests/test_agent.py @@ -626,14 +626,12 @@ def run_core_provision_patch(self): detail=self.parse_error_logs(), ) - with ( - patch("shutil.rmtree"), - patch( + with patch("shutil.rmtree"): + with patch( "testflinger_agent.job.ProvisionPhase.run", run_core_provision_patch, - ), - ): - agent.process_jobs() + ): + agent.process_jobs() status_update_requests = list( filter( @@ -693,14 +691,12 @@ def run_core_provision_patch(self): detail=self.parse_error_logs(), ) - with ( - patch("shutil.rmtree"), - patch( + with patch("shutil.rmtree"): + with patch( "testflinger_agent.job.ProvisionPhase.run", run_core_provision_patch, - ), - ): - agent.process_jobs() + ): + agent.process_jobs() status_update_requests = list( filter( From 352f4e0829a3b0541b0b8c07246d1f19b5b451fb Mon Sep 17 00:00:00 2001 From: George Boukeas Date: Sun, 6 Oct 2024 15:55:30 +0100 Subject: [PATCH 05/12] refactor: `run_core` returns the result instead of storing it --- agent/testflinger_agent/agent.py | 1 - agent/testflinger_agent/job.py | 36 +++++++++++------------ agent/testflinger_agent/tests/test_job.py | 3 +- 3 files changed, 19 insertions(+), 21 deletions(-) diff --git a/agent/testflinger_agent/agent.py b/agent/testflinger_agent/agent.py index e85acc3b..571d988f 100644 --- a/agent/testflinger_agent/agent.py +++ b/agent/testflinger_agent/agent.py @@ -31,7 +31,6 @@ class TestflingerAgent: def __init__(self, client): self.client = client signal.signal(signal.SIGUSR1, self.restart_signal_handler) - # [TODO] Investigate relation between the agent state and the job state self.set_agent_state("waiting") self._post_initial_agent_data() diff --git a/agent/testflinger_agent/job.py b/agent/testflinger_agent/job.py index bd13ab9b..ee26ef7f 100644 --- a/agent/testflinger_agent/job.py +++ b/agent/testflinger_agent/job.py @@ -20,7 +20,7 @@ from pathlib import Path import tempfile import time -from typing import Optional, Tuple, NamedTuple +from typing import Optional, NamedTuple from testflinger_agent.client import TestflingerClient from testflinger_agent.config import ATTACHMENTS_DIR @@ -130,15 +130,15 @@ def register(self): raise NotImplementedError @abstractmethod - def run_core(self): - """Execute the "core" of the phase and set self.result""" + def run_core(self) -> JobPhaseResult: + """Execute the "core" of the phase""" raise NotImplementedError def process_results(self): """Execute any possible actions after the "core" of the phase""" pass - def run(self) -> Tuple[int, Optional[TestEvent], str]: + def run(self): """This is the generic structure of every phase""" assert self.go() @@ -160,14 +160,14 @@ def run(self) -> Tuple[int, Optional[TestEvent], str]: ): self.runner.run(f"echo '{line}'") - # run the "core" of the phase - self.run_core() - self.update_results() + # run the "core" of the phase and store the result + self.result = self.run_core() # perform any post-core actions self.process_results() + self.update_results_file() - def update_results(self): + def update_results_file(self): """Update the results file with the results of this phase""" with open(self.results_file, "r+") as results: outcome_data = json.load(results) @@ -217,14 +217,14 @@ def go(self) -> bool: return False return True - def run_core(self): + def run_core(self) -> JobPhaseResult: # execute the external command logger.info("Running %s_command: %s", self.phase_id, self.cmd) try: exit_code, event, detail = self.runner.run(self.cmd) if exit_code == 0: # complete, successful run - self.result = JobPhaseResult( + return JobPhaseResult( exit_code=exit_code, event=f"{self.phase_id}_success", ) @@ -235,13 +235,13 @@ def run_core(self): # Here we *only* emit the stop event # self.emitter.emit_event(event, detail) if event: - self.result = JobPhaseResult( + return JobPhaseResult( exit_code=exit_code, event=event, detail=detail, ) else: - self.result = JobPhaseResult( + return JobPhaseResult( exit_code=exit_code, event=f"{self.phase_id}_fail", detail=self.parse_error_logs(), @@ -250,7 +250,7 @@ def run_core(self): # failed phase run due to an exception detail = f"{type(error).__name__}: {error}" logger.exception(detail) - self.result = JobPhaseResult( + return JobPhaseResult( exit_code=100, event=f"{self.phase_id}_fail", detail=detail, @@ -640,6 +640,8 @@ def check_end(self) -> bool: phase = self.phases[self.current_phase] if phase.result.exit_code and self.current_phase != TestPhase.TEST: logger.debug("Phase %s failed, aborting job" % phase) + # set these here because self.end() is called after cleanup + self.end_phase = phase.phase_id self.end_reason = phase.result.event return True return False @@ -661,14 +663,11 @@ def cancel(self): def go(self, phase_id: TestPhase) -> bool: return self.phases[phase_id].go() - def run(self, phase_id: TestPhase): - """Run the specified test phase in rundir + def run(self, phase_id: TestPhase) -> JobPhaseResult: + """Run the specified test phase :param phase: Name of the test phase (setup, provision, test, ...) - :return: - Returncode from the command that was executed, 0 will be returned - if there was no command to run """ self.current_phase = phase_id self.params.client.post_job_state(self.job_id, phase_id) @@ -677,6 +676,7 @@ def run(self, phase_id: TestPhase): phase.run() # self.params.client.post_influx(phase.id, phase.result.exit_code) self.emitter.emit_event(phase.result.event, phase.result.detail) + return phase.result def secure_filter(self, member, path): """Combine the `data` filter with custom attachment filtering diff --git a/agent/testflinger_agent/tests/test_job.py b/agent/testflinger_agent/tests/test_job.py index 962c6fef..f3e5cc1e 100644 --- a/agent/testflinger_agent/tests/test_job.py +++ b/agent/testflinger_agent/tests/test_job.py @@ -188,9 +188,8 @@ def test_run_test_phase_with_run_exception( # mock response to result requests mocker.get(re.compile(r"/v1/result/"), status_code=200) mocker.post(re.compile(r"/v1/result/"), status_code=200) - job.run(TestPhase.SETUP) + exit_code, exit_event, exit_reason = job.run(TestPhase.SETUP) - exit_code, exit_event, exit_reason = job.phases[TestPhase.SETUP].result assert exit_code == 100 assert exit_event == TestEvent.SETUP_FAIL assert exit_reason == "Exception: failed" From 914c2c25f041f6deb03471306ddb57d8af0f5111 Mon Sep 17 00:00:00 2001 From: George Boukeas Date: Wed, 9 Oct 2024 18:05:54 +0100 Subject: [PATCH 06/12] chore: clean up extraneous comments and commented-out code --- agent/testflinger_agent/job.py | 91 --------------------- agent/testflinger_agent/tests/test_agent.py | 2 - 2 files changed, 93 deletions(-) diff --git a/agent/testflinger_agent/job.py b/agent/testflinger_agent/job.py index ee26ef7f..c65fb781 100644 --- a/agent/testflinger_agent/job.py +++ b/agent/testflinger_agent/job.py @@ -283,97 +283,6 @@ def parse_error_logs(self): return "" -''' -class UnpackPhase(JobPhase, phase_id=TestPhase.UNPACK): - - # phases for which attachments are allowed - supported_phases = ( - TestPhase.PROVISION, - TestPhase.FIRMWARE_UPDATE, - TestPhase.TEST, - ) - - def register(self): - self.runner.register_stop_condition_checker( - GlobalTimeoutChecker(self.params.get_global_timeout()) - ) - - def go(self) -> bool: - if self.params.job_data.get("attachments_status") != "complete": - # the phase is "go" if attachments have been provided - logger.info("No attachments provided in job data, skipping...") - return False - return True - - def run_core(self): - try: - self.unpack_attachments() - except Exception as error: - # use the runner to display the error - # (so that the output is also included in the phase results) - for line in f"{type(error).__name__}: {error}".split("\n"): - self.runner.run(f"echo '{line}'") - # propagate the error (`run` uniformly handles fail cases) - raise - return 0, TestEvent.UNPACK_SUCCESS, None - - def secure_filter(self, member, path): - """Combine the `data` filter with custom attachment filtering - - Makes sure that the starting folder for all attachments coincides - with one of the supported phases, i.e. that the attachment archive - has been created properly and no attachment will be extracted to an - unexpected location. - """ - try: - resolved = Path(member.name).resolve().relative_to(Path.cwd()) - except ValueError as error: - # essentially trying to extract higher than the attachments folder - raise tarfile.OutsideDestinationError(member, path) from error - if not str(resolved).startswith( - tuple(f"{phase}/" for phase in self.supported_phases) - ): - # trying to extract in invalid folder under the attachments folder - raise tarfile.OutsideDestinationError(member, path) - return tarfile.data_filter(member, path) - - def unpack_attachments(self): - """Download and unpack the attachments associated with a job""" - job_id = self.params.job_data["job_id"] - - with tempfile.NamedTemporaryFile(suffix=".tar.gz") as archive_tmp: - archive_path = Path(archive_tmp.name) - # download attachment archive - logger.info(f"Downloading attachments for {job_id}") - self.params.client.get_attachments(job_id, path=archive_path) - # extract archive into the attachments folder - logger.info(f"Unpacking attachments for {job_id}") - with tarfile.open(archive_path, "r:gz") as tar: - tar.extractall( - self.params.rundir / ATTACHMENTS_DIR, - filter=self.secure_filter, - ) - - # side effect: remove all attachment data from `self.params.job_data` - # (so there is no interference with existing processes, especially - # provisioning or firmware update, which are triggered when these - # sections are not empty) - for phase in self.supported_phases: - phase_str = f"{phase}_data" - try: - phase_data = self.params.job_data[phase_str] - except KeyError: - pass - else: - # delete attachments, if they exist - phase_data.pop("attachments", None) - # it may be the case that attachments were the only data - # included for this phase, so the phase can now be removed - if not phase_data: - del self.params.job_data[phase_str] -''' - - class SetupPhase(ExternalCommandPhase, phase_id=TestPhase.SETUP): def register(self): diff --git a/agent/testflinger_agent/tests/test_agent.py b/agent/testflinger_agent/tests/test_agent.py index 4079b906..d9610148 100644 --- a/agent/testflinger_agent/tests/test_agent.py +++ b/agent/testflinger_agent/tests/test_agent.py @@ -613,7 +613,6 @@ def test_provision_error_in_event_detail(self, agent, requests_mock): } } - # patch def run_core_provision_patch(self): provision_log_path = ( self.params.rundir / "device-connector-error.json" @@ -678,7 +677,6 @@ def test_provision_error_no_cause(self, agent, requests_mock): } } - # patch def run_core_provision_patch(self): provision_log_path = ( self.params.rundir / "device-connector-error.json" From 41f8a6d2281eda3e505f803f3768cbd024c5ee5b Mon Sep 17 00:00:00 2001 From: George Boukeas Date: Wed, 9 Oct 2024 18:22:23 +0100 Subject: [PATCH 07/12] refactor: minor changes to the way the phase banner is displayed --- agent/testflinger_agent/job.py | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/agent/testflinger_agent/job.py b/agent/testflinger_agent/job.py index c65fb781..a13389a0 100644 --- a/agent/testflinger_agent/job.py +++ b/agent/testflinger_agent/job.py @@ -154,11 +154,7 @@ def run(self): self.register() # display phase banner in the Testflinger output - for line in self.banner( - f"Starting testflinger {self.phase_id} phase " - f"on {self.params.client.config.get('agent_id')}" - ): - self.runner.run(f"echo '{line}'") + self.output_banner() # run the "core" of the phase and store the result self.result = self.run_core() @@ -189,15 +185,12 @@ def update_results_file(self): results.seek(0) json.dump(outcome_data, results) - def banner(self, line): - """Yield text lines to print a banner around a string - - :param line: - Line of text to print a banner around - """ - yield "*" * (len(line) + 4) - yield "* {} *".format(line) - yield "*" * (len(line) + 4) + def output_banner(self): + for line in banner( + f"Starting testflinger {self.phase_id} phase " + f"on {self.params.client.config.get('agent_id')}" + ): + self.runner.run(f"echo '{line}'") class ExternalCommandPhase(JobPhase): @@ -652,6 +645,17 @@ def unpack_attachments(self): del self.params.job_data[phase_str] +def banner(line: str): + """Yield text lines to print a banner around a string + + :param line: + Line of text to print a banner around + """ + yield "*" * (len(line) + 4) + yield f"* {line} *" + yield "*" * (len(line) + 4) + + def set_nonblock(fd): """Set the specified fd to nonblocking output From 51e04032843f6061ba6325cf597eda7ecb4aa902 Mon Sep 17 00:00:00 2001 From: George Boukeas Date: Wed, 9 Oct 2024 19:02:42 +0100 Subject: [PATCH 08/12] fix: set the `end_reason` when cancelling, to be included in the end event --- agent/testflinger_agent/job.py | 1 + 1 file changed, 1 insertion(+) diff --git a/agent/testflinger_agent/job.py b/agent/testflinger_agent/job.py index a13389a0..3ead3713 100644 --- a/agent/testflinger_agent/job.py +++ b/agent/testflinger_agent/job.py @@ -561,6 +561,7 @@ def check_cancel(self) -> bool: def cancel(self): logger.info("Job cancellation was requested, exiting.") self.emitter.emit_event(TestEvent.CANCELLED) + self.end_reason = TestEvent.CANCELLED def go(self, phase_id: TestPhase) -> bool: return self.phases[phase_id].go() From 53498ffba6efa3559161ed0f5b43b59f9da0c612 Mon Sep 17 00:00:00 2001 From: George Boukeas Date: Mon, 28 Oct 2024 15:41:52 +0000 Subject: [PATCH 09/12] fix: make sure the execution rundir is created recursively --- agent/testflinger_agent/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent/testflinger_agent/job.py b/agent/testflinger_agent/job.py index 3ead3713..6fd589bc 100644 --- a/agent/testflinger_agent/job.py +++ b/agent/testflinger_agent/job.py @@ -502,7 +502,7 @@ def __init__(self, job_data: dict, client: TestflingerClient): self.job_id = job_data["job_id"] rundir = Path(client.config.get("execution_basedir")) / self.job_id - rundir.mkdir() + rundir.mkdir(parents=True) # specify directories and result files # Dump the job data to testflinger.json in our execution dir with open(rundir / "testflinger.json", "w") as f: From 2134b192405d3b285f9cc7adfef8efd0d43bb674 Mon Sep 17 00:00:00 2001 From: George Boukeas Date: Fri, 1 Nov 2024 09:56:36 +0000 Subject: [PATCH 10/12] refactor: treat attachment unpacking as a phase --- agent/testflinger_agent/agent.py | 3 - agent/testflinger_agent/job.py | 191 ++++++++++---------- agent/testflinger_agent/tests/test_agent.py | 158 +++++++++++++++- common/testflinger_common/enums.py | 5 + server/src/api/schemas.py | 3 + 5 files changed, 256 insertions(+), 104 deletions(-) diff --git a/agent/testflinger_agent/agent.py b/agent/testflinger_agent/agent.py index 571d988f..6780b049 100644 --- a/agent/testflinger_agent/agent.py +++ b/agent/testflinger_agent/agent.py @@ -134,9 +134,6 @@ def process_jobs(self): self.client.post_agent_data({"job_id": job_id}) job.start() - if job.check_attachments(): - job.unpack_attachments() - # Go through the job phases for phase in job.phase_sequence: diff --git a/agent/testflinger_agent/job.py b/agent/testflinger_agent/job.py index a853cb5b..eabff992 100644 --- a/agent/testflinger_agent/job.py +++ b/agent/testflinger_agent/job.py @@ -124,10 +124,11 @@ def go(self) -> bool: """Return True if the phase should run or False if skipping""" raise NotImplementedError - @abstractmethod def register(self): """Perform all necessary registrations with the phase runner""" - raise NotImplementedError + self.runner.register_stop_condition_checker( + GlobalTimeoutChecker(self.params.get_global_timeout()) + ) @abstractmethod def run_core(self) -> JobPhaseResult: @@ -217,7 +218,7 @@ def run_core(self) -> JobPhaseResult: exit_code, event, detail = self.runner.run(self.cmd) # make sure the exit code is within the expected 0-255 range # (this also handles negative numbers) - exitcode = exitcode % 256 + exit_code = exit_code % 256 except Exception as error: # failed phase run due to an exception detail = f"{type(error).__name__}: {error}" @@ -227,7 +228,7 @@ def run_core(self) -> JobPhaseResult: event=f"{self.phase_id}_fail", detail=detail, ) - + if exit_code == 0: # complete, successful run return JobPhaseResult( @@ -253,7 +254,6 @@ def run_core(self) -> JobPhaseResult: detail=self.parse_error_logs(), ) - def parse_error_logs(self): # [TODO] Move filenames used to pass information to the common module with open( @@ -281,12 +281,93 @@ def parse_error_logs(self): return "" -class SetupPhase(ExternalCommandPhase, phase_id=TestPhase.SETUP): +class UnpackPhase(JobPhase, phase_id=TestPhase.UNPACK): + + # phases for which attachments are allowed + supported_phases = ( + TestPhase.PROVISION, + TestPhase.FIRMWARE_UPDATE, + TestPhase.TEST, + ) + + def go(self) -> bool: + if self.params.job_data.get("attachments_status") != "complete": + # the phase is "go" if attachments have been provided + logger.info("No attachments provided in job data, skipping...") + return False + return True + + def run_core(self) -> JobPhaseResult: + try: + self.unpack_attachments() + except Exception as error: + error_str = f"{type(error).__name__}: {error}" + # use the runner to display the error + # (so that the output is also included in the phase results) + for line in error_str.split("\n"): + self.runner.run(f"echo '{line}'") + return JobPhaseResult( + exit_code=1, event=TestEvent.UNPACK_FAIL, detail=error_str + ) + return JobPhaseResult(exit_code=0, event=TestEvent.UNPACK_SUCCESS) + + def secure_filter(self, member, path): + """Combine the `data` filter with custom attachment filtering + + Makes sure that the starting folder for all attachments coincides + with one of the supported phases, i.e. that the attachment archive + has been created properly and no attachment will be extracted to an + unexpected location. + """ + try: + resolved = Path(member.name).resolve().relative_to(Path.cwd()) + except ValueError as error: + # essentially trying to extract higher than the attachments folder + raise tarfile.OutsideDestinationError(member, path) from error + if not str(resolved).startswith( + tuple(f"{phase}/" for phase in self.supported_phases) + ): + # extracting in invalid folder, under the attachments folder + raise tarfile.OutsideDestinationError(member, path) + return tarfile.data_filter(member, path) + + def unpack_attachments(self): + """Download and unpack the attachments associated with a job""" + job_id = self.params.job_data["job_id"] + with tempfile.NamedTemporaryFile(suffix=".tar.gz") as archive_tmp: + archive_path = Path(archive_tmp.name) + # download attachment archive + logger.info(f"Downloading attachments for {job_id}") + self.params.client.get_attachments(job_id, path=archive_path) + # extract archive into the attachments folder + logger.info(f"Unpacking attachments for {job_id}") + with tarfile.open(archive_path, "r:gz") as tar: + tar.extractall( + Path(self.params.rundir) / ATTACHMENTS_DIR, + filter=self.secure_filter, + ) + + # side effect: remove all attachment data from `job_data` + # (so there is no interference with existing processes, especially + # provisioning or firmware update, which are triggered when these + # sections are not empty) + for phase in self.supported_phases: + phase_str = f"{phase}_data" + try: + phase_data = self.params.job_data[phase_str] + except KeyError: + pass + else: + # delete attachments, if they exist + phase_data.pop("attachments", None) + # it may be the case that attachments were the only data + # included for this phase, so the phase can now be removed + if not phase_data: + del self.params.job_data[phase_str] - def register(self): - self.runner.register_stop_condition_checker( - GlobalTimeoutChecker(self.params.get_global_timeout()) - ) + +class SetupPhase(ExternalCommandPhase, phase_id=TestPhase.SETUP): + pass class FirmwarePhase(ExternalCommandPhase, phase_id=TestPhase.FIRMWARE_UPDATE): @@ -302,11 +383,6 @@ def go(self) -> bool: return False return True - def register(self): - self.runner.register_stop_condition_checker( - GlobalTimeoutChecker(self.params.get_global_timeout()) - ) - class ProvisionPhase(ExternalCommandPhase, phase_id=TestPhase.PROVISION): @@ -322,9 +398,7 @@ def go(self) -> bool: return True def register(self): - self.runner.register_stop_condition_checker( - GlobalTimeoutChecker(self.params.get_global_timeout()) - ) + super().register() # Do not allow cancellation during provision for safety reasons self.runner.register_stop_condition_checker( JobCancelledChecker( @@ -359,9 +433,7 @@ def go(self) -> bool: return True def register(self): - self.runner.register_stop_condition_checker( - GlobalTimeoutChecker(self.params.get_global_timeout()) - ) + super().register() # We only need to check for output timeouts during the test phase output_timeout_checker = OutputTimeoutChecker( self.params.get_output_timeout() @@ -383,11 +455,6 @@ def go(self) -> bool: return False return True - def register(self): - self.runner.register_stop_condition_checker( - GlobalTimeoutChecker(self.params.get_global_timeout()) - ) - def process_results(self): """ Read the json dict from "device-info.json" and send it to the server @@ -467,16 +534,13 @@ def register(self): class CleanupPhase(ExternalCommandPhase, phase_id=TestPhase.CLEANUP): - - def register(self): - self.runner.register_stop_condition_checker( - GlobalTimeoutChecker(self.params.get_global_timeout()) - ) + pass class TestflingerJob: phase_sequence = ( + TestPhase.UNPACK, TestPhase.SETUP, TestPhase.PROVISION, TestPhase.FIRMWARE_UPDATE, @@ -486,6 +550,7 @@ class TestflingerJob: ) phase_cls_map = { + TestPhase.UNPACK: UnpackPhase, TestPhase.SETUP: SetupPhase, TestPhase.PROVISION: ProvisionPhase, TestPhase.FIRMWARE_UPDATE: FirmwarePhase, @@ -586,70 +651,6 @@ def run(self, phase_id: TestPhase) -> JobPhaseResult: self.emitter.emit_event(phase.result.event, phase.result.detail) return phase.result - def secure_filter(self, member, path): - """Combine the `data` filter with custom attachment filtering - - Makes sure that the starting folder for all attachments coincides - with one of the supported phases, i.e. that the attachment archive - has been created properly and no attachment will be extracted to an - unexpected location. - """ - try: - resolved = Path(member.name).resolve().relative_to(Path.cwd()) - except ValueError as error: - # essentially trying to extract higher than the attachments folder - raise tarfile.OutsideDestinationError(member, path) from error - if not str(resolved).startswith( - tuple( - f"{phase}/" - for phase in ( - TestPhase.PROVISION, - TestPhase.FIRMWARE_UPDATE, - TestPhase.TEST, - ) - ) - ): - # trying to extract in invalid folder under the attachments folder - raise tarfile.OutsideDestinationError(member, path) - return tarfile.data_filter(member, path) - - def check_attachments(self) -> bool: - return self.params.job_data.get("attachments_status") == "complete" - - def unpack_attachments(self): - """Download and unpack the attachments associated with a job""" - - with tempfile.NamedTemporaryFile(suffix=".tar.gz") as archive_tmp: - archive_path = Path(archive_tmp.name) - # download attachment archive - logger.info(f"Downloading attachments for {self.job_id}") - self.params.client.get_attachments(self.job_id, path=archive_path) - # extract archive into the attachments folder - logger.info(f"Unpacking attachments for {self.job_id}") - with tarfile.open(archive_path, "r:gz") as tar: - tar.extractall( - self.params.rundir / ATTACHMENTS_DIR, - filter=self.secure_filter, - ) - - # side effect: remove all attachment data from `self.params.job_data` - # (so there is no interference with existing processes, especially - # provisioning or firmware update, which are triggered when these - # sections are not empty) - for phase in self.supported_phases: - phase_str = f"{phase}_data" - try: - phase_data = self.params.job_data[phase_str] - except KeyError: - pass - else: - # delete attachments, if they exist - phase_data.pop("attachments", None) - # it may be the case that attachments were the only data - # included for this phase, so the phase can now be removed - if not phase_data: - del self.params.job_data[phase_str] - def banner(line: str): """Yield text lines to print a banner around a string diff --git a/agent/testflinger_agent/tests/test_agent.py b/agent/testflinger_agent/tests/test_agent.py index 76580319..a415508a 100644 --- a/agent/testflinger_agent/tests/test_agent.py +++ b/agent/testflinger_agent/tests/test_agent.py @@ -146,8 +146,56 @@ def test_attachments(self, agent, tmp_path): # - there is a request to the job retrieval endpoint # - there a request to the attachment retrieval endpoint history = mocker.request_history - assert history[0].path == "/v1/job" - assert history[2].path == f"/v1/job/{job_id}/attachments" + + for entry in history: + print(entry) + + # client.check_jobs() + assert ( + history[0].path == "/v1/job" + and history[0].query == "queue=test" + and history[0].method == "GET" + ) + # client.post_agent_data({"job_id": job_id}) + assert ( + history[1].path.startswith("/v1/agents/data") + and history[1].method == "POST" + ) + # client.check_job_state(job_id) + assert ( + history[2].path == f"/v1/result/{job_id}" + and history[2].method == "GET" + ) + # self.set_agent_state(phase) + assert ( + history[3].path.startswith("/v1/agents/data") + and history[3].method == "POST" + ) + # self.client.post_job_state(job_id, phase) + assert ( + history[4].path == f"/v1/result/{job_id}" + and history[4].method == "POST" + ) + # check that attachments have been requested + # client.get_attachments(job_id, path=archive_path) + attachment_requests = list( + filter( + lambda request: request.path + == f"/v1/job/{job_id}/attachments" + and request.method == "GET", + history, + ) + ) + assert len(attachment_requests) == 1 + # check the results to confirm that the unpack phase succeeded + result_request = list( + filter( + lambda request: request.path == f"/v1/result/{job_id}" + and request.method == "POST", + history, + ) + )[-1] + assert result_request.json()["unpack_status"] == 0 # check that the attachment is where it's supposed to be basepath = Path(self.tmpdir) / mock_job_data["job_id"] @@ -203,8 +251,57 @@ def test_attachments_insecure_no_phase(self, agent, tmp_path): # - there is a request to the job retrieval endpoint # - there a request to the attachment retrieval endpoint history = mocker.request_history - assert history[0].path == "/v1/job" - assert history[2].path == f"/v1/job/{job_id}/attachments" + + for entry in history: + print(entry) + + # client.check_jobs() + assert ( + history[0].path == "/v1/job" + and history[0].query == "queue=test" + and history[0].method == "GET" + ) + # client.post_agent_data({"job_id": job_id}) + assert ( + history[1].path.startswith("/v1/agents/data") + and history[1].method == "POST" + ) + # client.check_job_state(job_id) + assert ( + history[2].path == f"/v1/result/{job_id}" + and history[2].method == "GET" + ) + # self.set_agent_state(phase) + assert ( + history[3].path.startswith("/v1/agents/data") + and history[3].method == "POST" + ) + # self.client.post_job_state(job_id, phase) + assert ( + history[4].path == f"/v1/result/{job_id}" + and history[4].method == "POST" + ) + + # check that attachments have been requested + # client.get_attachments(job_id, path=archive_path) + attachment_requests = list( + filter( + lambda request: request.path + == f"/v1/job/{job_id}/attachments" + and request.method == "GET", + history, + ) + ) + assert len(attachment_requests) == 1 + # check the results to confirm that the unpack phase failed + result_request = list( + filter( + lambda request: request.path == f"/v1/result/{job_id}" + and request.method == "POST", + history, + ) + )[-1] + assert result_request.json()["unpack_status"] != 0 # check that the attachment is *not* where it's supposed to be basepath = Path(self.tmpdir) / mock_job_data["job_id"] @@ -260,8 +357,57 @@ def test_attachments_insecure_out_of_hierarchy(self, agent, tmp_path): # - there is a request to the job retrieval endpoint # - there a request to the attachment retrieval endpoint history = mocker.request_history - assert history[0].path == "/v1/job" - assert history[2].path == f"/v1/job/{job_id}/attachments" + + for entry in history: + print(entry) + + # client.check_jobs() + assert ( + history[0].path == "/v1/job" + and history[0].query == "queue=test" + and history[0].method == "GET" + ) + # client.post_agent_data({"job_id": job_id}) + assert ( + history[1].path.startswith("/v1/agents/data") + and history[1].method == "POST" + ) + # client.check_job_state(job_id) + assert ( + history[2].path == f"/v1/result/{job_id}" + and history[2].method == "GET" + ) + # self.set_agent_state(phase) + assert ( + history[3].path.startswith("/v1/agents/data") + and history[3].method == "POST" + ) + # self.client.post_job_state(job_id, phase) + assert ( + history[4].path == f"/v1/result/{job_id}" + and history[4].method == "POST" + ) + + # check that attachments have been requested + # client.get_attachments(job_id, path=archive_path) + attachment_requests = list( + filter( + lambda request: request.path + == f"/v1/job/{job_id}/attachments" + and request.method == "GET", + history, + ) + ) + assert len(attachment_requests) == 1 + # check the results to confirm that the unpack phase failed + result_request = list( + filter( + lambda request: request.path == f"/v1/result/{job_id}" + and request.method == "POST", + history, + ) + )[-1] + assert result_request.json()["unpack_status"] != 0 # check that the attachment is *not* where it's supposed to be basepath = Path(self.tmpdir) / mock_job_data["job_id"] diff --git a/common/testflinger_common/enums.py b/common/testflinger_common/enums.py index 4265cb2f..f5a46860 100644 --- a/common/testflinger_common/enums.py +++ b/common/testflinger_common/enums.py @@ -22,6 +22,7 @@ class JobState(StrEnum): WAITING = "waiting" + UNPACK = "unpack" SETUP = "setup" PROVISION = "provision" FIRMWARE_UPDATE = "firmware_update" @@ -35,6 +36,7 @@ class JobState(StrEnum): class TestPhase(StrEnum): + UNPACK = "unpack" SETUP = "setup" PROVISION = "provision" FIRMWARE_UPDATE = "firmware_update" @@ -45,6 +47,7 @@ class TestPhase(StrEnum): class TestEvent(StrEnum): + UNPACK_START = "unpack_start" SETUP_START = "setup_start" PROVISION_START = "provision_start" FIRMWARE_UPDATE_START = "firmware_update_start" @@ -53,6 +56,7 @@ class TestEvent(StrEnum): RESERVE_START = "reserve_start" CLEANUP_START = "cleanup_start" + UNPACK_SUCCESS = "unpack_success" SETUP_SUCCESS = "setup_success" PROVISION_SUCCESS = "provision_success" FIRMWARE_UPDATE_SUCCESS = "firmware_update_success" @@ -61,6 +65,7 @@ class TestEvent(StrEnum): RESERVE_SUCCESS = "reserve_success" CLEANUP_SUCCESS = "cleanup_success" + UNPACK_FAIL = "unpack_fail" SETUP_FAIL = "setup_fail" PROVISION_FAIL = "provision_fail" FIRMWARE_UPDATE_FAIL = "firmware_update_fail" diff --git a/server/src/api/schemas.py b/server/src/api/schemas.py index 13722f76..ac0edd8d 100644 --- a/server/src/api/schemas.py +++ b/server/src/api/schemas.py @@ -147,6 +147,9 @@ class JobSearchResponse(Schema): class Result(Schema): """Result schema""" + unpack_status = fields.Integer(required=False) + unpack_output = fields.String(required=False) + unpack_serial = fields.String(required=False) setup_status = fields.Integer(required=False) setup_output = fields.String(required=False) setup_serial = fields.String(required=False) From cfb87260fd0d9a9794609a2e7b812b3057b5d94b Mon Sep 17 00:00:00 2001 From: George Boukeas Date: Fri, 1 Nov 2024 10:29:01 +0000 Subject: [PATCH 11/12] refactor: handle negative process return codes in the runner --- agent/testflinger_agent/runner.py | 5 ++++- agent/testflinger_agent/tests/test_job.py | 4 ++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/agent/testflinger_agent/runner.py b/agent/testflinger_agent/runner.py index 3fc067de..189fa1ef 100644 --- a/agent/testflinger_agent/runner.py +++ b/agent/testflinger_agent/runner.py @@ -154,7 +154,10 @@ def run(self, cmd: str) -> Tuple[int, Optional[TestEvent], str]: if stop_reason == "": stop_reason = get_stop_reason(self.process.returncode, "") - return self.process.returncode, stop_event, stop_reason + # make sure the returned exit code is within the expected 0-255 range + # (handles the negative numbers that may arise from signal handling) + # https://docs.python.org/3.8/library/subprocess.html#subprocess.CompletedProcess.returncode + return self.process.returncode % 256, stop_event, stop_reason def get_stop_reason(returncode: int, stop_reason: str) -> str: diff --git a/agent/testflinger_agent/tests/test_job.py b/agent/testflinger_agent/tests/test_job.py index f3e5cc1e..fe51f5e8 100644 --- a/agent/testflinger_agent/tests/test_job.py +++ b/agent/testflinger_agent/tests/test_job.py @@ -90,7 +90,7 @@ def test_job_global_timeout(self, tmp_path): log_data = log.read() assert timeout_str in log_data assert exit_reason == timeout_str - assert exit_code == -9 + assert exit_code == 247 assert exit_event == TestEvent.GLOBAL_TIMEOUT def test_config_global_timeout(self, client): @@ -120,7 +120,7 @@ def test_job_output_timeout(self, tmp_path): log_data = log.read() assert timeout_str in log_data assert exit_reason == timeout_str - assert exit_code == -9 + assert exit_code == 247 assert exit_event == TestEvent.OUTPUT_TIMEOUT def test_config_output_timeout(self, client): From a9f92ff3d927a6d84e0ddc8eed3cc3d3272c1a76 Mon Sep 17 00:00:00 2001 From: George Boukeas Date: Fri, 1 Nov 2024 10:30:01 +0000 Subject: [PATCH 12/12] chore: minor cleanup and comments --- agent/testflinger_agent/job.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/agent/testflinger_agent/job.py b/agent/testflinger_agent/job.py index eabff992..e5c7f225 100644 --- a/agent/testflinger_agent/job.py +++ b/agent/testflinger_agent/job.py @@ -140,9 +140,10 @@ def process_results(self): pass def run(self): - """This is the generic structure of every phase""" + """This is the generic structure of every phase - assert self.go() + This method should only be called if the `go` method has returned True. + """ # register the output handlers with the runner self.runner.register_output_handler(LogUpdateHandler(self.output_log)) @@ -215,10 +216,11 @@ def run_core(self) -> JobPhaseResult: # execute the external command logger.info("Running %s_command: %s", self.phase_id, self.cmd) try: + # a non-zero exit code can arise because: + # - the command run has been interrupted by a condition checker + # (in which case the event and detail will have been set) + # - the command run has completed with a non-zero exit code exit_code, event, detail = self.runner.run(self.cmd) - # make sure the exit code is within the expected 0-255 range - # (this also handles negative numbers) - exit_code = exit_code % 256 except Exception as error: # failed phase run due to an exception detail = f"{type(error).__name__}: {error}" @@ -236,18 +238,16 @@ def run_core(self) -> JobPhaseResult: event=f"{self.phase_id}_success", ) - # [NOTE] This is a deviation from the current approach - # where a separate event is emitted when stop events are - # returned from the runner and then a fail event on top - # Here we *only* emit the stop event - # self.emitter.emit_event(event, detail) if event: + # a condition checker has caused an interruption event + # (attention: no separate/additional fail event is returned) return JobPhaseResult( exit_code=exit_code, event=event, detail=detail, ) + # non-zero exit code with no event results in a fail event return JobPhaseResult( exit_code=exit_code, event=f"{self.phase_id}_fail", @@ -610,6 +610,8 @@ def start(self): def check_end(self) -> bool: phase = self.phases[self.current_phase] + # if any phase has exited with a non-zero exit code the job must end; + # the test phase is excluded: its exit code is open to interpretation if phase.result.exit_code and self.current_phase != TestPhase.TEST: logger.debug("Phase %s failed, aborting job" % phase) # set these here because self.end() is called after cleanup @@ -647,7 +649,6 @@ def run(self, phase_id: TestPhase) -> JobPhaseResult: self.emitter.emit_event(TestEvent(f"{phase_id}_start")) phase = self.phases[phase_id] phase.run() - # self.params.client.post_influx(phase.id, phase.result.exit_code) self.emitter.emit_event(phase.result.event, phase.result.detail) return phase.result