diff --git a/VERSION.in b/VERSION.in index c068b2447..c239c60cb 100644 --- a/VERSION.in +++ b/VERSION.in @@ -1 +1 @@ -1.4 +1.5 diff --git a/proto/rqd.proto b/proto/rqd.proto index 327233bb0..e02d1c636 100644 --- a/proto/rqd.proto +++ b/proto/rqd.proto @@ -117,6 +117,7 @@ message RunFrame { string os = 25; int64 soft_memory_limit = 26; int64 hard_memory_limit = 27; + int32 pid = 28; } message RunFrameSeq { diff --git a/rqd/rqd/rqconstants.py b/rqd/rqd/rqconstants.py index 64116710e..34ea71d0a 100644 --- a/rqd/rqd/rqconstants.py +++ b/rqd/rqd/rqconstants.py @@ -162,6 +162,11 @@ DOCKER_MOUNTS = [] DOCKER_SHELL_PATH = "/bin/sh" +# Backup running frames cache. Backup cache is turned off if this path is set to +# None or "" +BACKUP_CACHE_PATH = "" +BACKUP_CACHE_TIME_TO_LIVE_SECONDS = 60 + try: if os.path.isfile(CONFIG_FILE): # Hostname can come from here: rqutil.getHostname() @@ -243,6 +248,12 @@ if config.has_section(__host_env_var_section): RQD_HOST_ENV_VARS = config.options(__host_env_var_section) + if config.has_option(__override_section, "BACKUP_CACHE_PATH"): + BACKUP_CACHE_PATH = config.get(__override_section, "BACKUP_CACHE_PATH") + if config.has_option(__override_section, "BACKUP_CACHE_TIME_TO_LIVE_SECONDS"): + BACKUP_CACHE_TIME_TO_LIVE_SECONDS = config.getint( + __override_section, "BACKUP_CACHE_TIME_TO_LIVE_SECONDS") + __docker_mounts = "docker.mounts" __docker_config = "docker.config" __docker_images = "docker.images" diff --git a/rqd/rqd/rqcore.py b/rqd/rqd/rqcore.py index 4c3a04199..a0ab06d78 100644 --- a/rqd/rqd/rqcore.py +++ b/rqd/rqd/rqcore.py @@ -41,6 +41,7 @@ import rqd.compiled_proto.host_pb2 import rqd.compiled_proto.report_pb2 +import rqd.compiled_proto.rqd_pb2 import rqd.rqconstants import rqd.rqexceptions import rqd.rqmachine @@ -83,7 +84,7 @@ def __init__(self, optNimbyoff=False): self.__cache = {} self.updateRssThread = None self.onIntervalThread = None - self.intervalStartTime = None + self.intervalStartTime = 0 self.intervalSleepTime = rqd.rqconstants.RQD_MIN_PING_INTERVAL_SEC # pylint: disable=unused-private-member @@ -103,6 +104,17 @@ def __init__(self, optNimbyoff=False): self.docker_mounts = rqd.rqconstants.DOCKER_MOUNTS self.handleFrameImages() + self.backup_cache_path = None + if rqd.rqconstants.BACKUP_CACHE_PATH: + if not rqd.rqconstants.RUN_ON_DOCKER: + log.warning("Cache backup is currently only available " + "when RUN_ON_DOCKER mode") + else: + self.backup_cache_path = rqd.rqconstants.BACKUP_CACHE_PATH + if not os.path.exists(os.path.dirname(self.backup_cache_path)): + os.makedirs(os.path.dirname(self.backup_cache_path)) + self.recoverCache() + signal.signal(signal.SIGINT, self.handleExit) signal.signal(signal.SIGTERM, self.handleExit) @@ -178,11 +190,65 @@ def updateRss(self): if self.__cache: try: self.machine.rssUpdate(self.__cache) + if self.backup_cache_path: + self.backupCache() finally: self.updateRssThread = threading.Timer( rqd.rqconstants.RSS_UPDATE_INTERVAL, self.updateRss) self.updateRssThread.start() + def backupCache(self): + """Backs up a copy of the running frames cache for a possible recovery. + each backup is destructive and erases the last known state""" + if not self.backup_cache_path: + return + with open(self.backup_cache_path, "wb") as f: + for item in list(self.__cache.values()): + serialized = item.runFrame.SerializeToString() + f.write(len(serialized).to_bytes(4, byteorder="big")) + f.write(serialized) + + def recoverCache(self): + """Reload the running frames from the latest backup. The backup file + will be rejected if it hasn't been updated recently + (rqconstants.BACKUP_CACHE_TIME_TO_LIVE_SECONDS) + """ + if not self.backup_cache_path or \ + not os.path.exists(self.backup_cache_path) or \ + (time.time() - os.path.getmtime(self.backup_cache_path) > \ + rqd.rqconstants.BACKUP_CACHE_TIME_TO_LIVE_SECONDS): + return + with open(self.backup_cache_path, "rb") as f: + while True: + # Read length (4 bytes) + length_bytes = f.read(4) + if not length_bytes: + break # End of file + + length = int.from_bytes(length_bytes, byteorder='big') + # Read the message data + message_data = f.read(length) + + run_frame = rqd.compiled_proto.rqd_pb2.RunFrame() + # Ignore frames that failed to be parsed + try: + run_frame.ParseFromString(message_data) + log.warning("Recovered frame %s.%s", run_frame.job_name, run_frame.frame_name) + running_frame = rqd.rqnetwork.RunningFrame(self, run_frame) + running_frame.frameAttendantThread = FrameAttendantThread( + self, run_frame, running_frame, recovery_mode=True) + # Make sure cores are accounted for + # pylint: disable=no-member + self.cores.idle_cores -= run_frame.num_cores + self.cores.booked_cores += run_frame.num_cores + # pylint: enable=no-member + + running_frame.frameAttendantThread.start() + # pylint: disable=broad-except + except Exception: + pass + # Ignore frames that got corrupted + def getFrame(self, frameId): """Gets a frame from the cache based on frameId @type frameId: string @@ -654,7 +720,7 @@ class FrameAttendantThread(threading.Thread): """Once a frame has been received and checked by RQD, this class handles the launching, waiting on, and cleanup work related to running the frame.""" - def __init__(self, rqCore: RqCore, runFrame, frameInfo): + def __init__(self, rqCore: RqCore, runFrame, frameInfo, recovery_mode=False): """FrameAttendantThread class initialization @type rqCore: RqCore @param rqCore: Main RQD Object @@ -662,6 +728,8 @@ def __init__(self, rqCore: RqCore, runFrame, frameInfo): @param runFrame: rqd_pb2.RunFrame @type frameInfo: rqd.rqnetwork.RunningFrame @param frameInfo: Servant for running frame + @type recovery_mode: bool + @param recovery_mode: Run in frame recovery mode """ threading.Thread.__init__(self) self.rqCore = rqCore @@ -672,6 +740,7 @@ def __init__(self, rqCore: RqCore, runFrame, frameInfo): self.frameInfo = frameInfo self._tempLocations = [] self.rqlog = None + self.recovery_mode = recovery_mode def __createEnvVariables(self): """Define the environmental variables for the frame""" @@ -921,7 +990,7 @@ def runLinux(self): finally: rqd.rqutil.permissionsLow() - frameInfo.pid = frameInfo.forkedCommand.pid + frameInfo.pid = runFrame.pid = frameInfo.forkedCommand.pid if not self.rqCore.updateRssThread.is_alive(): self.rqCore.updateRssThread = threading.Timer(rqd.rqconstants.RSS_UPDATE_INTERVAL, @@ -1090,7 +1159,7 @@ def runDocker(self): # Docker SDK type hint states that `top` returns an str # when in reality it returns a Dict {"Processes": [[]], "Columns": [[]]} container_top: dict = container.top() - frameInfo.pid = int(container_top["Processes"][0][1]) + frameInfo.pid = runFrame.pid = int(container_top["Processes"][0][1]) except (APIError, TypeError): for first_line in log_stream: frameInfo.pid = int(first_line) @@ -1113,6 +1182,8 @@ def runDocker(self): self.rqCore.updateRss) self.rqCore.updateRssThread.start() + # Store container id in case this frame needs to be restored from the backup + runFrame.attributes["container_id"] = container.short_id # Atatch to the job and follow the logs for line in log_stream: self.rqlog.write(line, prependTimestamp=rqd.rqconstants.RQD_PREPEND_TIMESTAMP) @@ -1227,7 +1298,7 @@ def runWindows(self): "Failed subprocess.Popen: Due to: \n%s", ''.join(traceback.format_exception(*sys.exc_info()))) - frameInfo.pid = frameInfo.forkedCommand.pid + frameInfo.pid = runFrame.pid = frameInfo.forkedCommand.pid if not self.rqCore.updateRssThread.is_alive(): self.rqCore.updateRssThread = threading.Timer(rqd.rqconstants.RSS_UPDATE_INTERVAL, @@ -1310,11 +1381,50 @@ def runDarwin(self): self.__writeFooter() self.__cleanup() + def setup(self): + """Setup for running or recovering a frame""" + runFrame = self.runFrame + run_on_docker = self.rqCore.docker is not None + + runFrame.job_temp_dir = os.path.join(self.rqCore.machine.getTempPath(), + runFrame.job_name) + runFrame.frame_temp_dir = os.path.join(runFrame.job_temp_dir, + runFrame.frame_name) + runFrame.log_file = "%s.%s.rqlog" % (runFrame.job_name, + runFrame.frame_name) + runFrame.log_dir_file = os.path.join(runFrame.log_dir, runFrame.log_file) + + # Ensure permissions return to Low after this block + try: + if rqd.rqconstants.RQD_CREATE_USER_IF_NOT_EXISTS and runFrame.HasField("uid"): + rqd.rqutil.checkAndCreateUser(runFrame.user_name, + runFrame.uid, + runFrame.gid) + if not run_on_docker: + # Do everything as launching user: + runFrame.gid = rqd.rqconstants.LAUNCH_FRAME_USER_GID + rqd.rqutil.permissionsUser(runFrame.uid, runFrame.gid) + + # Setup frame logging + self.rqlog = rqd.rqlogging.RqdLogger(runFrame.log_dir_file) + self.rqlog.waitForFile() + # pylint: disable=broad-except + except Exception as e: + err = "Unable to write to %s due to %s" % (runFrame.log_dir_file, e) + raise RuntimeError(err) + finally: + rqd.rqutil.permissionsLow() + + def runUnknown(self): """The steps required to handle a frame under an unknown OS.""" def run(self): """Thread initialization""" + if self.recovery_mode: + self.runRecovery() + return + log.info("Monitor frame started for frameId=%s", self.frameId) runFrame = self.runFrame @@ -1322,74 +1432,215 @@ def run(self): # pylint: disable=too-many-nested-blocks try: - runFrame.job_temp_dir = os.path.join(self.rqCore.machine.getTempPath(), - runFrame.job_name) - runFrame.frame_temp_dir = os.path.join(runFrame.job_temp_dir, - runFrame.frame_name) - runFrame.log_file = "%s.%s.rqlog" % (runFrame.job_name, - runFrame.frame_name) - runFrame.log_dir_file = os.path.join(runFrame.log_dir, runFrame.log_file) - - try: # Exception block for all exceptions - # Ensure permissions return to Low after this block - try: - if rqd.rqconstants.RQD_CREATE_USER_IF_NOT_EXISTS and runFrame.HasField("uid"): - rqd.rqutil.checkAndCreateUser(runFrame.user_name, - runFrame.uid, - runFrame.gid) - if not run_on_docker: - # Do everything as launching user: - runFrame.gid = rqd.rqconstants.LAUNCH_FRAME_USER_GID - rqd.rqutil.permissionsUser(runFrame.uid, runFrame.gid) - - # Setup frame logging - try: - self.rqlog = rqd.rqlogging.RqdLogger(runFrame.log_dir_file) - self.rqlog.waitForFile() - # pylint: disable=broad-except - except Exception as e: - err = "Unable to write to %s due to %s" % (runFrame.log_dir_file, e) - raise RuntimeError(err) - - finally: - rqd.rqutil.permissionsLow() - - # Store frame in cache and register servant - self.rqCore.storeFrame(runFrame.frame_id, self.frameInfo) - - if run_on_docker: - self.runDocker() - elif platform.system() == "Linux": - self.runLinux() - elif platform.system() == "Windows": - self.runWindows() - elif platform.system() == "Darwin": - self.runDarwin() - else: - self.runUnknown() + self.setup() + # Store frame in cache and register servant + self.rqCore.storeFrame(runFrame.frame_id, self.frameInfo) + + if run_on_docker: + self.runDocker() + elif platform.system() == "Linux": + self.runLinux() + elif platform.system() == "Windows": + self.runWindows() + elif platform.system() == "Darwin": + self.runDarwin() + else: + self.runUnknown() - # pylint: disable=broad-except - except Exception: - log.critical( - "Failed launchFrame: For %s due to: \n%s", - runFrame.frame_id, ''.join(traceback.format_exception(*sys.exc_info()))) - # Notifies the cuebot that there was an error launching - self.frameInfo.exitStatus = rqd.rqconstants.EXITSTATUS_FOR_FAILED_LAUNCH - # Delay keeps the cuebot from spamming failing booking requests - time.sleep(10) + # pylint: disable=broad-except + except Exception: + log.critical( + "Failed launchFrame: For %s due to: \n%s", + runFrame.frame_id, ''.join(traceback.format_exception(*sys.exc_info()))) + # Notifies the cuebot that there was an error launching + self.frameInfo.exitStatus = rqd.rqconstants.EXITSTATUS_FOR_FAILED_LAUNCH + # Delay keeps the cuebot from spamming failing booking requests + time.sleep(10) finally: - self.rqCore.releaseCores(self.runFrame.num_cores, runFrame.attributes.get('CPU_LIST'), - runFrame.attributes.get('GPU_LIST') + self.postFrameAction() + + def postFrameAction(self): + """Action to be executed after a frame completes its execution""" + self.rqCore.releaseCores(self.runFrame.num_cores, + self.runFrame.attributes.get('CPU_LIST'), + self.runFrame.attributes.get('GPU_LIST') if 'GPU_LIST' in self.runFrame.attributes else None) - self.rqCore.deleteFrame(self.runFrame.frame_id) + self.rqCore.deleteFrame(self.runFrame.frame_id) - self.rqCore.sendFrameCompleteReport(self.frameInfo) - time_till_next = ( - (self.rqCore.intervalStartTime + self.rqCore.intervalSleepTime) - time.time()) - if time_till_next > (2 * rqd.rqconstants.RQD_MIN_PING_INTERVAL_SEC): - self.rqCore.onIntervalThread.cancel() - self.rqCore.onInterval(rqd.rqconstants.RQD_MIN_PING_INTERVAL_SEC) + self.rqCore.sendFrameCompleteReport(self.frameInfo) + time_till_next = ( + (self.rqCore.intervalStartTime + self.rqCore.intervalSleepTime) - time.time()) + if time_till_next > (2 * rqd.rqconstants.RQD_MIN_PING_INTERVAL_SEC): + self.rqCore.onIntervalThread.cancel() + self.rqCore.onInterval(rqd.rqconstants.RQD_MIN_PING_INTERVAL_SEC) - log.info("Monitor frame ended for frameId=%s", - self.runFrame.frame_id) + log.info("Monitor frame ended for frameId=%s", + self.runFrame.frame_id) + + def recoverDocker(self): + """The steps required to handle a frame under a docker container""" + frameInfo = self.frameInfo + runFrame = self.runFrame + container = None + docker_client = self.rqCore.docker.from_env() + + # Ensure Nullable attributes have been initialized + if not self.rqlog: + raise RuntimeError("Invalid state. rqlog has not been initialized") + if not self.rqCore.docker: + raise RuntimeError("Invalid state: docker_client must have been initialized.") + if not runFrame.attributes.get("container_id"): + raise RuntimeError("Invalid state: recovered frame does't contain a container id") + container_id = runFrame.attributes.get("container_id") + + # Recovered frame will stream back logs into a new file, therefore write a new header + self.__createEnvVariables() + self.__writeHeader() + + tempStatFile = "%srqd-stat-%s-%s" % (self.rqCore.machine.getTempPath(), + frameInfo.frameId, + time.time()) + self._tempLocations.append(tempStatFile) + + try: + log_stream = None + with self.rqCore.docker_lock: + container = docker_client.containers.get(container_id) + log_stream = container.logs(stream=True) + + if not container or not log_stream: + raise RuntimeError("Failed to recover container for %s.%s(%s)" % ( + runFrame.job_name, + runFrame.frame_name, + frameInfo.frameId)) + + # Log frame start info + msg = "Container %s recovered for %s.%s(%s) with pid %s" % ( + container.short_id, + runFrame.job_name, + runFrame.frame_name, + frameInfo.frameId, + frameInfo.pid) + + log.info(msg) + self.rqlog.write(msg, prependTimestamp=rqd.rqconstants.RQD_PREPEND_TIMESTAMP) + + # Ping rss thread on rqCore + if self.rqCore.updateRssThread and not self.rqCore.updateRssThread.is_alive(): + self.rqCore.updateRssThread = threading.Timer(rqd.rqconstants.RSS_UPDATE_INTERVAL, + self.rqCore.updateRss) + self.rqCore.updateRssThread.start() + + # Attach to the job and follow the logs + for line in log_stream: + self.rqlog.write(line, prependTimestamp=rqd.rqconstants.RQD_PREPEND_TIMESTAMP) + + output = container.wait() + returncode = output["StatusCode"] + except StopIteration: + # This exception can happen when a container is interrupted + # If frame pid is set it means the container has started successfully + if frameInfo.pid and container: + output = container.wait() + returncode = output["StatusCode"] + else: + returncode = -1 + container_id = container.short_id if container else -1 + msg = "Failed to read frame container logs on %s for %s.%s(%s)" % ( + container_id, + runFrame.job_name, + runFrame.frame_name, + frameInfo.frameId) + logging.error(msg) + self.rqlog.write(msg, prependTimestamp=rqd.rqconstants.RQD_PREPEND_TIMESTAMP) + # pylint: disable=broad-except + except Exception as e: + returncode = -1 + msg = "Failed to recover frame container" + logging.warning(msg) + self.rqlog.write("%s - The frame might have finishes during rqd's reinitialization " + "- %s" % (msg, e), + prependTimestamp=rqd.rqconstants.RQD_PREPEND_TIMESTAMP) + finally: + # Clear up container after if finishes + if container: + container_id = container.short_id + container.remove() + docker_client.close() + + if container: + # Find exitStatus and exitSignal + if returncode < 0: + # Exited with a signal + frameInfo.exitStatus = 1 + frameInfo.exitSignal = -returncode + else: + frameInfo.exitStatus = returncode + frameInfo.exitSignal = 0 + + # Log frame start info + log.warning( + "Frame %s.%s(%s) with pid %s finished on container %s with exitStatus %s %s", + runFrame.job_name, + runFrame.frame_name, + frameInfo.frameId, + frameInfo.pid, + container_id, + frameInfo.exitStatus, + "" if frameInfo.exitStatus == 0 else " - " + runFrame.log_dir_file) + + try: + with open(tempStatFile, "r", encoding='utf-8') as statFile: + frameInfo.realtime = statFile.readline().split()[1] + frameInfo.utime = statFile.readline().split()[1] + frameInfo.stime = statFile.readline().split()[1] + statFile.close() + # pylint: disable=broad-except + except Exception: + pass # This happens when frames are killed + + self.__writeFooter() + self.__cleanup() + + def runRecovery(self): + """Recover a frame that was running before this instance started""" + if not self.recovery_mode: + return + + log.info("Monitor recovered frame started for frameId=%s", self.frameId) + + runFrame = self.runFrame + run_on_docker = self.rqCore.docker is not None + + # pylint: disable=too-many-nested-blocks + try: + self.setup() + # Store frame in cache and register servant + self.rqCore.storeFrame(runFrame.frame_id, self.frameInfo) + + if run_on_docker: + self.recoverDocker() + elif platform.system() == "Linux": + # TODO + pass + elif platform.system() == "Windows": + # TODO + pass + elif platform.system() == "Darwin": + # TODO + pass + else: + self.runUnknown() + + # pylint: disable=broad-except + except Exception: + log.critical( + "Failed launchFrame: For %s due to: \n%s", + runFrame.frame_id, ''.join(traceback.format_exception(*sys.exc_info()))) + # Notifies the cuebot that there was an error launching + self.frameInfo.exitStatus = rqd.rqconstants.EXITSTATUS_FOR_FAILED_LAUNCH + # Delay keeps the cuebot from spamming failing booking requests + time.sleep(10) + finally: + self.postFrameAction() diff --git a/rqd/rqd/rqmachine.py b/rqd/rqd/rqmachine.py index 8694afee4..50c19720b 100644 --- a/rqd/rqd/rqmachine.py +++ b/rqd/rqd/rqmachine.py @@ -355,7 +355,8 @@ def rssUpdate(self, frames): pidPcpu = totalTime / seconds pcpu += pidPcpu pidData[pid] = totalTime, seconds, pidPcpu - # only keep the highest recorded rss value + # If children was already accounted for, only keep the highest + # recorded rss value if pid in frame.childrenProcs: childRss = (int(data["rss"]) * resource.getpagesize()) // 1024 if childRss > frame.childrenProcs[pid]['rss']: diff --git a/rqd/rqd/rqnetwork.py b/rqd/rqd/rqnetwork.py index b4b955b10..948a94f68 100644 --- a/rqd/rqd/rqnetwork.py +++ b/rqd/rqd/rqnetwork.py @@ -57,7 +57,7 @@ def __init__(self, rqCore, runFrame): self.killMessage = "" - self.pid = None + self.pid = runFrame.pid self.exitStatus = None self.frameAttendantThread = None self.exitSignal = 0 diff --git a/rqd/tests/rqcore_test.py b/rqd/tests/rqcore_test.py index 16ea9426b..61b323ad1 100644 --- a/rqd/tests/rqcore_test.py +++ b/rqd/tests/rqcore_test.py @@ -159,6 +159,7 @@ def test_updateRss(self, timerMock): self.rqcore.storeFrame( "frame-id", mock.MagicMock(spec=rqd.rqnetwork.RunningFrame) ) + self.rqcore.backup_cache_path = None self.rqcore.updateRss() @@ -587,6 +588,103 @@ def test_sendFrameCompleteReport(self): ) +class RqCoreBackupTests(pyfakefs.fake_filesystem_unittest.TestCase): + def setUp(self): + self.rqcore = rqd.rqcore.RqCore() + self.setUpPyfakefs() + + @mock.patch('builtins.open', new_callable=mock.mock_open) + def test_backupCache_withPath(self, mockOpen): + """Test backupCache writes frame data when backup path is configured""" + self.rqcore.backup_cache_path = '/tmp/rqd/cache.dat' + frameId = 'frame123' + runningFrame = mock.MagicMock() + runningFrame.runFrame = mock.MagicMock() + runningFrame.runFrame.SerializeToString.return_value = b'serialized_frame_data' + self.rqcore.storeFrame(frameId, runningFrame) + + self.rqcore.backupCache() + + mockOpen.assert_called_once_with('/tmp/rqd/cache.dat', 'wb') + handle = mockOpen() + handle.write.assert_called_with(b'serialized_frame_data') + + def test_backupCache_noPath(self): + """Test backupCache does nothing when no backup path configured""" + self.rqcore.backup_cache_path = None + frameId = 'frame123' + runFrame = mock.MagicMock() + self.rqcore.storeFrame(frameId, runFrame) + + self.rqcore.backupCache() + + runFrame.SerializeToString.assert_not_called() + + def test_recoverCache_noPath(self): + """Test recoverCache does nothing when no backup path configured""" + self.rqcore.backup_cache_path = None + + self.rqcore.recoverCache() + + self.assertEqual(len(self.rqcore._RqCore__cache), 0) + + @mock.patch('os.path.exists') + def test_recoverCache_noFile(self, mockExists): + """Test recoverCache does nothing when backup file doesn't exist""" + self.rqcore.backup_cache_path = '/tmp/rqd/cache.dat' + mockExists.return_value = False + + self.rqcore.recoverCache() + + self.assertEqual(len(self.rqcore._RqCore__cache), 0) + + @mock.patch('os.path.getmtime') + @mock.patch('time.time') + @mock.patch('os.path.exists') + def test_recoverCache_expiredFile(self, mockExists, mockTime, mockGetmtime): + """Test recoverCache does nothing when backup file is too old""" + self.rqcore.backup_cache_path = '/tmp/rqd/cache.dat' + mockExists.return_value = True + mockTime.return_value = 1000 + mockGetmtime.return_value = 1 # Very old file + + self.rqcore.recoverCache() + + self.assertEqual(len(self.rqcore._RqCore__cache), 0) + + @mock.patch("rqd.rqcore.FrameAttendantThread", autospec=True) + def test_recoverCache_validBackup(self, attendant_patch): + """Test recoverCache skips frames that fail to parse""" + self.rqcore.backup_cache_path = 'cache.dat' + + frameId = 'frame123' + frame = rqd.compiled_proto.rqd_pb2.RunFrame( + job_id = "job_id", + job_name = "job_name", + frame_id = frameId, + frame_name = "frame_name", + num_cores = 4 + ) + running_frame = rqd.rqnetwork.RunningFrame(self.rqcore, frame) + self.rqcore.storeFrame(frameId, running_frame) + self.rqcore.cores.idle_cores = 8 + self.rqcore.cores.booked_cores = 0 + self.rqcore.backupCache() + self.rqcore._RqCore__cache = {} + self.rqcore.recoverCache() + self.assertEqual(4, self.rqcore.cores.idle_cores) + self.assertEqual(4, self.rqcore.cores.booked_cores) + + def test_recoverCache_invalidFrame(self): + """Test recoverCache loads frame data from valid backup file""" + self.rqcore.backup_cache_path = 'cache.dat' + with open(self.rqcore.backup_cache_path, "w", encoding='utf-8') as f: + f.write("this is not a run frame") + + self.rqcore.recoverCache() + + self.assertNotIn('frame123', self.rqcore._RqCore__cache) + @mock.patch("rqd.rqutil.checkAndCreateUser", new=mock.MagicMock()) @mock.patch("rqd.rqutil.permissionsHigh", new=mock.MagicMock()) @mock.patch("rqd.rqutil.permissionsLow", new=mock.MagicMock())