Skip to content

Commit

Permalink
Protecting against initial total machine failures with initialization…
Browse files Browse the repository at this point in the history
… retries.
  • Loading branch information
ChrisThibodeaux committed Sep 21, 2024
1 parent 9e3c1df commit 99114f2
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 48 deletions.
3 changes: 3 additions & 0 deletions conf/default/az.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ spot_instances = false
# start pulling tasks off of the stack
wait_for_agent_before_starting = true

# This integer value is used to determine how many times a VMSS that does not initialize properly can retry
init_retries = 2

# These are the value(s) of the DNS server(s) that you want the scale sets to use. (E.g. 1.1.1.1,8.8.8.8)
# NOTE: NO SPACES
dns_server_ips = <dns_server_ip>
Expand Down
144 changes: 96 additions & 48 deletions modules/machinery/az.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
CuckooDependencyError,
CuckooGuestCriticalTimeout,
CuckooMachineError,
CuckooOperationalError,
)
from lib.cuckoo.core.database import TASK_PENDING, Machine

Expand Down Expand Up @@ -110,6 +109,10 @@ def set_options(self, options: dict) -> None:
@param options: machine manager options dict.
"""
self.options = options
# Using "scale_sets" here instead of "machines" to avoid KeyError
mmanager_opts = self.options.get(self.module_name)
if not isinstance(mmanager_opts["scale_sets"], list):
mmanager_opts["scale_sets"] = str(mmanager_opts["scale_sets"]).strip().split(",")

def _initialize(self):
"""
Expand All @@ -118,11 +121,7 @@ def _initialize(self):
@param module_name: module name
@raise CuckooDependencyError: if there is a problem with the dependencies call
"""
# Using "scale_sets" here instead of "machines" to avoid KeyError
mmanager_opts = self.options.get(self.module_name)
if not isinstance(mmanager_opts["scale_sets"], list):
mmanager_opts["scale_sets"] = str(mmanager_opts["scale_sets"]).strip().split(",")

# Replace a list of IDs with dictionary representations
scale_sets = mmanager_opts.pop("scale_sets")
mmanager_opts["scale_sets"] = {}
Expand Down Expand Up @@ -150,7 +149,7 @@ def _initialize(self):
# Insert the scale_set_opts into the module.scale_sets attribute
mmanager_opts["scale_sets"][scale_set_id] = scale_set_opts

except (AttributeError, CuckooOperationalError) as e:
except (AttributeError, CuckooCriticalError) as e:
log.warning(f"Configuration details about scale set {scale_set_id.strip()} are missing: {e}")
continue

Expand All @@ -168,19 +167,21 @@ def _initialize_check(self):

# We will be using this as a source of truth for the VMSS configs
self.required_vmsss = {
vmss_name: {"exists": False, "image": None, "platform": None, "tag": None, "initial_pool_size": None}
vmss_name: {
"exists": False, "image": None, "platform": None, "tag": None, "initial_pool_size": None, "retries": self.options.az.init_retries
}
for vmss_name in self.options.az.scale_sets
}

# Starting the thread that sets API clients periodically
self._thr_refresh_clients()

# Starting the thread that scales the machine pools periodically
self._thr_machine_pool_monitor()

# Initialize the VMSSs that we will be using and not using
self._set_vmss_stage()

# Starting the thread that scales the machine pools periodically
self._thr_machine_pool_monitor()

# Set the flag that indicates that the system is not initializing
self.initializing = False

Expand All @@ -189,7 +190,6 @@ def _get_credentials(self):
Used to instantiate the Azure ClientSecretCredential object.
@return: an Azure ClientSecretCredential object
"""

credentials = None
if self.options.az.secret and self.options.az.secret != "<secret>":
# Instantiates the ClientSecretCredential object using
Expand Down Expand Up @@ -279,6 +279,19 @@ def _set_vmss_stage(self):
elif required_vmss_values["initial_pool_size"] is None:
raise CuckooCriticalError(f"The VMSS '{required_vmss_name}' does not have an initial pool size.")

self._process_existing_vmsss()
self._process_pre_existing_vmsss(self.required_vmsss)
self._create_batch_threads()

def _process_pre_existing_vmsss(self):
"""
Delete a VMSS if it does NOT have:
- the expected tag AND has one of the required names for a VMSS we plan to create
- one of the required names AND has the expected tag AND az.config's multiple_capes_in_sandbox_rg is 'false'
Update a VMSS if it:
- does not have the required image reference
- has a capacity (current size) different from its required 'initial_pool_size'
"""
# Get all VMSSs in Resource Group
existing_vmsss = Azure._azure_api_call(
self.options.az.sandbox_resource_group,
Expand All @@ -292,7 +305,7 @@ def _set_vmss_stage(self):
# Cuckoo (AUTO_SCALE_CAPE key-value pair), ignore
if not vmss.tags or not vmss.tags.get(Azure.AUTO_SCALE_CAPE_KEY) == Azure.AUTO_SCALE_CAPE_VALUE:

# Unless! They have one of the required names of the VMSSs that we are going to create
# Ignoring... unless! They have one of the required names of the VMSSs that we are going to create
if vmss.name in self.required_vmsss.keys():
async_delete_vmss = Azure._azure_api_call(
self.options.az.sandbox_resource_group,
Expand Down Expand Up @@ -352,22 +365,9 @@ def _set_vmss_stage(self):
operation=self.compute_client.virtual_machine_scale_sets.begin_delete,
)

try:
self.subnet_id = Azure._azure_api_call(
self.options.az.vnet_resource_group,
self.options.az.vnet,
self.options.az.subnet,
operation=self.network_client.subnets.get,
).id # note the id attribute here
except CuckooMachineError:
raise CuckooCriticalError(
f"Subnet '{self.options.az.subnet}' does not exist in Virtual Network '{self.options.az.vnet}'"
)

# Initialize the platform scaling state monitor
is_platform_scaling.update({Azure.WINDOWS_PLATFORM: False, Azure.LINUX_PLATFORM: False})

# Let's get the number of CPUs associated with the SKU (instance_type)
# If we want to programmatically determine the number of cores for the sku
if self.options.az.find_number_of_cores_for_sku or self.options.az.instance_type_cores == 0:
resource_skus = Azure._azure_api_call(
Expand All @@ -394,10 +394,14 @@ def _set_vmss_stage(self):
else:
self.instance_type_cpus = self.options.az.instance_type_cores

# Create required VMSSs that don't exist yet
def _get_or_upsert_vmsss(self, vmsss_dict):
"""
Reimage or scale up existing VMSSs. Create non-existant required VMSSs.
"""

vmss_creation_threads = []
vmss_reimage_threads = []
for vmss, vals in self.required_vmsss.items():
for vmss, vals in vmsss_dict.items():
if vals["exists"] and not self.options.az.just_start:
if machine_pools[vmss]["size"] == 0:
self._thr_scale_machine_pool(self.options.az.scale_sets[vmss].pool_tag, True if vals["platform"] else False),
Expand All @@ -419,6 +423,10 @@ def _set_vmss_stage(self):
for thr in vmss_reimage_threads + vmss_creation_threads:
thr.join()

def _create_batch_threads(self):
"""
Create batch reimage and delete threads.
"""
# Initialize the batch reimage threads. We want at most 4 batch reimaging threads
# so that if no VMSS scaling or batch deleting is taking place (aka we are receiving constant throughput of
# tasks and have the appropriate number of VMs created) then we'll perform batch reimaging at an optimal rate.
Expand Down Expand Up @@ -583,8 +591,8 @@ def _add_machines_to_db(self, vmss_name):
resultserver_port=self.options.az.resultserver_port,
reserved=False,
)
# When we aren't initializing the system, the machine will immediately become available in DB
# When we are initializing, we're going to wait for the machine to be have the Cuckoo agent all set up
# We always wait for Cuckoo agent to finish setting up if 'wait_for_agent_before_starting' is true or if we are initializing.
# Else, the machine should become immediately available in DB.
if self.initializing or self.options.az.wait_for_agent_before_starting:
thr = threading.Thread(
target=Azure._thr_wait_for_ready_machine,
Expand All @@ -607,6 +615,24 @@ def _add_machines_to_db(self, vmss_name):
except Exception as e:
log.error(repr(e), exc_info=True)

# If no machines on any VMSSs are in the db when we leave this method, CAPE will crash.
if not self.machines() and self.required_vmsss[vmss_name]["retries"] > 0:
log.warning(f"No available VMs after initializing {vmss_name}. Attempting to reinitialize VMSS.")
self.required_vmsss[vmss_name]["retries"] -= 1
start_time = timeit.default_timer()

while ((timeit.default_timer() - start_time) < 120):
with vms_currently_being_deleted_lock:
if any(
failed_vm in vms_currently_being_deleted
for failed_vm in ready_vmss_vm_threads
):
# VMs not deleted from VMSS yet.
continue
self._get_or_upsert_vmsss(vmsss_dict={vmss_name: self.required_vmsss[vmss_name]})
return
log.debug(f"{vmss_name} initialize retry failed. Timed out waiting for VMs to be deleted.")

def _delete_machines_from_db_if_missing(self, vmss_name):
"""
Delete machine from database if it does not exist in the VMSS.
Expand Down Expand Up @@ -636,8 +662,8 @@ def delete_machine(self, label, delete_from_vmss=True):
super(Azure, self).delete_machine(label)

if delete_from_vmss:
vmss_name, instance_id = label.split("_")
# Only add vm to the lists if it isn't there already
vmss_name, instance_id = label.split("_")
with vms_currently_being_deleted_lock:
if not label in vms_currently_being_deleted:
vms_currently_being_deleted.append(label)
Expand Down Expand Up @@ -665,13 +691,13 @@ def _thr_wait_for_ready_machine(machine_name, machine_ip):
log.debug(f"{machine_name}: Initializing...")
except socket.error:
log.debug(f"{machine_name}: Initializing...")
time.sleep(10)

if timeit.default_timer() - start >= timeout:
# We didn't do it :(
raise CuckooGuestCriticalTimeout(
f"Machine {machine_name}: the guest initialization hit the critical " "timeout, analysis aborted."
)
else:
if (timeit.default_timer() - start) >= timeout:
# We didn't do it :(
raise CuckooGuestCriticalTimeout(
f"Machine {machine_name}: the guest initialization hit the critical timeout, analysis aborted."
)
time.sleep(10)
log.debug(f"Machine {machine_name} was created and available in {round(timeit.default_timer() - start)}s")

@staticmethod
Expand Down Expand Up @@ -723,6 +749,18 @@ def _thr_create_vmss(self, vmss_name, vmss_image_ref, vmss_image_os):
@param vmss_tag: the tag used that represents the OS image
"""

try:
self.subnet_id = Azure._azure_api_call(
self.options.az.vnet_resource_group,
self.options.az.vnet,
self.options.az.subnet,
operation=self.network_client.subnets.get,
).id # note the id attribute here
except CuckooMachineError:
raise CuckooCriticalError(
f"Subnet '{self.options.az.subnet}' does not exist in Virtual Network '{self.options.az.vnet}'"
)

vmss_managed_disk = models.VirtualMachineScaleSetManagedDiskParameters(
storage_account_type=self.options.az.storage_account_type
)
Expand Down Expand Up @@ -800,6 +838,7 @@ def _thr_create_vmss(self, vmss_name, vmss_image_ref, vmss_image_os):
"is_scaling_down": False,
"wait": False,
}
self.required_vmsss[vmss_name]["exists"] = True
with self.db.session.begin():
self._add_machines_to_db(vmss_name)

Expand Down Expand Up @@ -986,7 +1025,7 @@ def _scale_machine_pool(self, tag, per_platform=False):
)

# We don't want to be stuck in this for longer than the timeout specified
if timeit.default_timer() - start_time > AZURE_TIMEOUT:
if (timeit.default_timer() - start_time) > AZURE_TIMEOUT:
log.debug(f"Breaking out of the while loop within the scale down section for {vmss_name}.")
break
# Get the updated number of relevant machines required
Expand Down Expand Up @@ -1250,9 +1289,12 @@ def _thr_reimage_list_reader(self):
vms_currently_being_reimaged.remove(f"{vmss_to_reimage}_{instance_id}")
continue

reimaged = True
# We wait because we want the machine to be fresh before another task is assigned to it
while not async_reimage_some_machines.done():
if (timeit.default_timer() - start_time) > AZURE_TIMEOUT:
reimaged = False

log.debug(
f"Reimaging machines {instance_ids} in {vmss_to_reimage} took too long, deleting them from the DB and the VMSS."
)
Expand All @@ -1270,7 +1312,7 @@ def _thr_reimage_list_reader(self):
with current_operations_lock:
current_vmss_operations -= 1
timediff = timeit.default_timer() - start_time
log.debug(f"Reimaging instances {instance_ids} in {vmss_to_reimage} took {round(timediff)}s")
log.debug(f"{'S' if reimaged else 'Uns'}uccessfully reimaging instances {instance_ids} in {vmss_to_reimage} took {round(timediff)}s")
except Exception as e:
log.error(f"Exception occurred in the reimage thread: {e}. Trying again...")

Expand All @@ -1297,8 +1339,8 @@ def _thr_delete_list_reader(self):
for vmss_name, count in vmss_vm_delete_counts.items():
if count > max:
max = count
vmss_to_delete = vmss_name
vms_to_delete_from_same_vmss = [vm for vm in delete_vm_list if vm["vmss"] == vmss_to_delete]
vmss_to_delete_from = vmss_name
vms_to_delete_from_same_vmss = [vm for vm in delete_vm_list if vm["vmss"] == vmss_to_delete_from]

for vm in vms_to_delete_from_same_vmss:
delete_vm_list.remove(vm)
Expand All @@ -1309,7 +1351,7 @@ def _thr_delete_list_reader(self):
start_time = timeit.default_timer()
async_delete_some_machines = Azure._azure_api_call(
self.options.az.sandbox_resource_group,
vmss_to_delete,
vmss_to_delete_from,
models.VirtualMachineScaleSetVMInstanceIDs(instance_ids=instance_ids),
polling_interval=1,
operation=self.compute_client.virtual_machine_scale_sets.begin_delete_instances,
Expand All @@ -1320,24 +1362,30 @@ def _thr_delete_list_reader(self):
current_vmss_operations -= 1
with vms_currently_being_deleted_lock:
for instance_id in instance_ids:
vms_currently_being_deleted.remove(f"{vmss_to_delete}_{instance_id}")
vms_currently_being_deleted.remove(f"{vmss_to_delete_from}_{instance_id}")
continue

# We wait because we want the machine to be fresh before another task is assigned to it
while not async_delete_some_machines.done():
deleted = True
if (timeit.default_timer() - start_time) > AZURE_TIMEOUT:
log.debug(f"Deleting machines {instance_ids} in {vmss_to_delete} took too long.")
log.debug(f"Deleting machines {instance_ids} in {vmss_to_delete_from} took too long.")
deleted = False
break
time.sleep(2)

if self.initializing and deleted:
# All machines should have been removed from the db and the VMSS at this point.
# To force the VMSS to scale to initial_pool_size, set the size to zero here.
log.debug(f"Setting size to 0 for VMSS {vmss_to_delete_from} after successful deletion")
machine_pools[vmss_to_delete_from]["size"] = 0

with vms_currently_being_deleted_lock:
for instance_id in instance_ids:
vms_currently_being_deleted.remove(f"{vmss_to_delete}_{instance_id}")
vms_currently_being_deleted.remove(f"{vmss_to_delete_from}_{instance_id}")

with current_operations_lock:
current_vmss_operations -= 1
log.debug(
f"Deleting instances {instance_ids} in {vmss_to_delete} took {round(timeit.default_timer() - start_time)}s"
)
log.debug(f"{'S' if deleted else 'Uns'}uccessfully deleting instances {instance_ids} in {vmss_to_delete_from} took {round(timeit.default_timer() - start_time)}s")
except Exception as e:
log.error(f"Exception occurred in the delete thread: {e}. Trying again...")

0 comments on commit 99114f2

Please sign in to comment.