Skip to content

Commit

Permalink
Handle existing, locked machines at start up. Use thread-agnostic mac…
Browse files Browse the repository at this point in the history
…hine deletion on reimage failures.
  • Loading branch information
ChrisThibodeaux committed Sep 22, 2024
1 parent a14027a commit ae69060
Showing 1 changed file with 37 additions and 12 deletions.
49 changes: 37 additions & 12 deletions modules/machinery/az.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@
# This is hard cap of 4 given the maximum preemption chain length of 4
MAX_CONCURRENT_VMSS_OPERATIONS = 4

# These global lists will be used for maintaining lists of machines that failed during reimaging
vms_absent_from_vmss = []
vms_timed_out_being_reimaged = []

# These global lists will be used for maintaining lists of ongoing operations on specific machines
vms_currently_being_reimaged = []
vms_currently_being_deleted = []
Expand Down Expand Up @@ -104,10 +108,6 @@ class Azure(Machinery):
WINDOWS_PLATFORM = "windows"
LINUX_PLATFORM = "linux"

# Statuses for machines
ABSENT = "absent_vm"
REIMAGE_FAILING = "failed_reimaging"

def set_options(self, options: dict) -> None:
"""Set machine manager options.
@param options: machine manager options dict.
Expand All @@ -118,11 +118,20 @@ def set_options(self, options: dict) -> None:
if not isinstance(mmanager_opts["scale_sets"], list):
mmanager_opts["scale_sets"] = str(mmanager_opts["scale_sets"]).strip().split(",")

def initialize(self):
"""
Overloading abstracts.py:_initialize()
"""
# Load.
self._initialize()

# Run initialization checks.
self._initialize_check()

def _initialize(self):
"""
Overloading abstracts.py:_initialize()
Read configuration.
@param module_name: module name
@raise CuckooDependencyError: if there is a problem with the dependencies call
"""
mmanager_opts = self.options.get(self.module_name)
Expand Down Expand Up @@ -285,7 +294,8 @@ def _set_vmss_stage(self):

self._process_pre_existing_vmsss()
self._check_cpu_cores()
self._get_or_upsert_vmsss(self.required_vmsss)
self._update_or_create_vmsss(self.required_vmsss)
self._check_locked_machines()
self._create_batch_threads()

def _process_pre_existing_vmsss(self):
Expand Down Expand Up @@ -403,7 +413,7 @@ def _check_cpu_cores(self):
else:
self.instance_type_cpus = self.options.az.instance_type_cores

def _get_or_upsert_vmsss(self, vmsss_dict):
def _update_or_create_vmsss(self, vmsss_dict):
"""
Reimage or scale up existing VMSSs. Create non-existant required VMSSs.
"""
Expand Down Expand Up @@ -432,6 +442,17 @@ def _get_or_upsert_vmsss(self, vmsss_dict):
for thr in vmss_reimage_threads + vmss_creation_threads:
thr.join()

def _check_locked_machines(self):
"""
In the case of CAPE unexpectedly restarting, release any locked machines.
They will have been reimaged and their tasks rescheduled before reaching this code.
"""
running = self.running()
if len(running) > 0:
log.info("%d machines found locked on initialize, unlocking.", len(running))
for machine in running:
self.db.unlock_machine(machine)

def _create_batch_threads(self):
"""
Create batch reimage and delete threads.
Expand Down Expand Up @@ -491,9 +512,13 @@ def release(self, machine: Machine):
@param label: machine label.
"""
vmss_name = machine.label.split("_")[0]
if machine.status == Azure.ABSENT:
if machine.label in vms_absent_from_vmss:
self.delete_machine(machine.label, delete_from_vmss=False)
elif machine.status == Azure.REIMAGE_FAILING or machine_pools[vmss_name]["is_scaling_down"]:
vms_absent_from_vmss.remove(machine.label)
elif machine.label in vms_timed_out_being_reimaged:
self.delete_machine(machine.label)
vms_timed_out_being_reimaged.remove(machine.label)
elif machine_pools[vmss_name]["is_scaling_down"]:
self.delete_machine(machine.label)
else:
_ = super(Azure, self).release(machine)
Expand Down Expand Up @@ -640,7 +665,7 @@ def _add_machines_to_db(self, vmss_name):
):
# VMs not deleted from VMSS yet.
continue
self._get_or_upsert_vmsss(vmsss_dict={vmss_name: self.required_vmsss[vmss_name]})
self._update_or_create_vmsss(vmsss_dict={vmss_name: self.required_vmsss[vmss_name]})
return
log.debug(f"{vmss_name} initialize retry failed. Timed out waiting for VMs to be deleted.")

Expand Down Expand Up @@ -1288,7 +1313,7 @@ def _thr_reimage_list_reader(self):
vms_currently_being_deleted.append(f"{vmss_to_reimage}_{instance_id}")
with delete_lock:
delete_vm_list.append({"vmss": vmss_to_reimage, "id": instance_id, "time_added": time.time()})
self.set_status(f"{vmss_to_reimage}_{instance_id}", Azure.ABSENT)
vms_absent_from_vmss.append(f"{vmss_to_reimage}_{instance_id}")
vms_currently_being_reimaged.remove(f"{vmss_to_reimage}_{instance_id}")
instance_ids.remove(instance_id)

Expand All @@ -1309,7 +1334,7 @@ def _thr_reimage_list_reader(self):
)
# That sucks, now we have mark each one for deletion
for instance_id in instance_ids:
self.set_status(f"{vmss_to_reimage}_{instance_id}", Azure.REIMAGE_FAILING)
vms_timed_out_being_reimaged.append(f"{vmss_to_reimage}_{instance_id}")
break
time.sleep(2)

Expand Down

0 comments on commit ae69060

Please sign in to comment.