Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions tests/integration-tests/configs/develop.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,12 @@ test-suites:
instances: {{ common.INSTANCES_DEFAULT_X86 }}
oss: [{{ OS_X86_5 }}]
schedulers: ["slurm"]
test_slurm.py::test_expedited_requeue:
dimensions:
- regions: ["ap-east-1"]
instances: {{ common.INSTANCES_DEFAULT_X86 }}
oss: [{{ OS_X86_5 }}]
schedulers: ["slurm"]
test_slurm.py::test_slurm_config_update:
dimensions:
- regions: [ "ap-east-1" ]
Expand Down
69 changes: 69 additions & 0 deletions tests/integration-tests/tests/common/scaling_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
from utils import get_compute_nodes_instance_count

SCALING_COMMON_DATADIR = pathlib.Path(__file__).parent / "scaling"
RUN_INSTANCES_OVERRIDES_PATH = "/opt/slurm/etc/pcluster/run_instances_overrides.json"
CREATE_FLEET_OVERRIDES_PATH = "/opt/slurm/etc/pcluster/create_fleet_overrides.json"


def validate_and_get_scaling_test_config(scaling_test_config_file):
Expand Down Expand Up @@ -364,3 +366,70 @@ def setup_ec2_launch_override_to_emulate_ice(
run_as_root=True,
)
# fmt: on


def _write_json_override(remote_command_executor, path, content):
"""Write a JSON override file on the head node."""
json_str = json.dumps(content)
remote_command_executor.run_remote_command(
f"echo '{json_str}' | sudo tee {path}",
raise_on_error=True,
)


def _build_create_fleet_override(cluster_name, queue, compute_resource, instance_types, subnet_id):
"""Build a create_fleet_overrides dict with LaunchTemplateSpecification, InstanceTypes and SubnetId."""
lt_name = f"{cluster_name}-{queue}-{compute_resource}"
overrides_list = [{"InstanceType": it, "SubnetId": subnet_id} for it in instance_types]
return {
queue: {
compute_resource: {
"LaunchTemplateConfigs": [
{
"LaunchTemplateSpecification": {
"LaunchTemplateName": lt_name,
"Version": "$Latest",
},
"Overrides": overrides_list,
}
],
}
}
}


def setup_create_fleet_override_to_emulate_ice(
remote_command_executor, cluster_name, queue, compute_resource, instance_types, subnet_id
):
"""
Write create_fleet_overrides.json with invalid InstanceTypes to emulate ICE.

This targets multi-instance-type CRs that use the create_fleet API. The override includes
LaunchTemplateSpecification and SubnetId to avoid MissingParameter errors. The invalid
InstanceTypes (prefixed with "ICE-") cause create_fleet to return no instances, which is
detected as InsufficientInstanceCapacity.

To recover, call recover_create_fleet_override_from_ice() which replaces the invalid
InstanceTypes with real ones.
"""
ice_instance_types = [f"ICE-{it}" for it in instance_types]
overrides = _build_create_fleet_override(cluster_name, queue, compute_resource, ice_instance_types, subnet_id)
logging.info("Writing create_fleet_overrides.json with invalid InstanceTypes to emulate ICE "
"for queue=%s, cr=%s", queue, compute_resource)
_write_json_override(remote_command_executor, CREATE_FLEET_OVERRIDES_PATH, overrides)


def recover_create_fleet_override_from_ice(
remote_command_executor, cluster_name, queue, compute_resource, real_instance_types, subnet_id
):
"""
Recover from simulated ICE by changing InstanceTypes in create_fleet_overrides.json back to real ones.

This is the "change instance type in JSON to recover" approach — the invalid "ICE-*" prefixed
InstanceTypes are replaced with real ones (with correct SubnetId), so the next create_fleet
call succeeds.
"""
overrides = _build_create_fleet_override(cluster_name, queue, compute_resource, real_instance_types, subnet_id)
logging.info("Recovering from ICE: writing real InstanceTypes=%s in create_fleet_overrides.json "
"for queue=%s, cr=%s", real_instance_types, queue, compute_resource)
_write_json_override(remote_command_executor, CREATE_FLEET_OVERRIDES_PATH, overrides)
152 changes: 151 additions & 1 deletion tests/integration-tests/tests/schedulers/test_slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@
wait_for_compute_nodes_states,
wait_for_num_nodes_in_scheduler,
)
from tests.common.scaling_common import setup_ec2_launch_override_to_emulate_ice
from tests.common.scaling_common import (
recover_create_fleet_override_from_ice,
setup_create_fleet_override_to_emulate_ice,
setup_ec2_launch_override_to_emulate_ice,
)
from tests.common.schedulers_common import SlurmCommands


Expand Down Expand Up @@ -616,6 +620,152 @@ def test_fast_capacity_failover(
)


@pytest.mark.usefixtures("region", "os", "instance", "scheduler")
@pytest.mark.expedited_requeue
def test_expedited_requeue(
pcluster_config_reader,
clusters_factory,
scheduler_commands_factory,
vpc_stack,
):
"""
Test Slurm 25.11+ expedited requeue behavior with recoverable ICE simulation.

Uses create_fleet_overrides.json to simulate ICE (invalid "ICE-" prefixed InstanceTypes),
then recovers by changing them back to real ones.
Verifies that expedited requeue jobs are treated as highest priority after ICE recovery.
"""
cluster_config = pcluster_config_reader()
cluster = clusters_factory(cluster_config)
remote_command_executor = RemoteCommandExecutor(cluster)
clustermgtd_conf_path = retrieve_clustermgtd_conf_path(remote_command_executor)
scheduler_commands = scheduler_commands_factory(remote_command_executor)

partition = "queue"
ice_cr = "ice-cr"
real_instance_types = ["t3.medium", "c5.large"]
subnet_id = vpc_stack.get_private_subnet()

# Set up ICE simulation via create_fleet_overrides.json with invalid InstanceTypes
setup_create_fleet_override_to_emulate_ice(
remote_command_executor,
cluster_name=cluster.cfn_name,
queue=partition,
compute_resource=ice_cr,
instance_types=real_instance_types,
subnet_id=subnet_id,
)

# Set insufficient_capacity_timeout to 180s for quicker reset
_set_insufficient_capacity_timeout(remote_command_executor, 180, clustermgtd_conf_path)

# Get node lists — all nodes in ice-cr are dynamic
nodes_in_scheduler = scheduler_commands.get_compute_nodes(partition, all_nodes=True)
_, dynamic_nodes = get_partition_nodes(nodes_in_scheduler)
ice_dynamic_nodes = [n for n in dynamic_nodes]
logging.info("ICE CR dynamic nodes: %s", ice_dynamic_nodes)
# Pick a specific dynamic node to target
target_node = ice_dynamic_nodes[0]
logging.info("Target dynamic node for ICE test: %s", target_node)

# Clear logs for clean state
remote_command_executor.clear_slurm_resume_log()
remote_command_executor.clear_clustermgtd_log()

# Submit job1 with --requeue=expedite, targeting the specific ICE dynamic node
# Output epoch timestamp for reliable start time parsing from slurm output file
job1_id = scheduler_commands.submit_command_and_assert_job_accepted(
submit_command_args={
"command": 'echo "START_TIME=$(date +%s)"; sleep 30; echo "Job1 done"',
"nodes": 1,
"partition": partition,
"host": target_node,
"other_options": "--requeue=expedite",
}
)
logging.info("Submitted job1 (expedited requeue) ID: %s", job1_id)

# Submit job2 (normal), targeting the same ICE dynamic node
job2_id = scheduler_commands.submit_command_and_assert_job_accepted(
submit_command_args={
"command": 'echo "START_TIME=$(date +%s)"; sleep 30; echo "Job2 done"',
"nodes": 1,
"partition": partition,
"host": target_node,
}
)
logging.info("Submitted job2 (normal) ID: %s", job2_id)

# Wait for ICE to be detected
retry(wait_fixed=seconds(20), stop_max_delay=minutes(3))(assert_lines_in_logs)(
remote_command_executor,
["/var/log/parallelcluster/clustermgtd"],
["The following compute resources are in down state due to insufficient capacity"],
)

# Verify the target dynamic node is down due to ICE
assert_compute_node_states(scheduler_commands, [target_node], expected_states=["down#", "down~"])

# Recover from ICE: change InstanceTypes in JSON override back to real ones
recover_create_fleet_override_from_ice(
remote_command_executor,
cluster_name=cluster.cfn_name,
queue=partition,
compute_resource=ice_cr,
real_instance_types=real_instance_types,
subnet_id=subnet_id,
)

# Wait for insufficient_capacity_timeout to expire and nodes to reset
retry(wait_fixed=seconds(20), stop_max_delay=minutes(4))(assert_lines_in_logs)(
remote_command_executor,
["/var/log/parallelcluster/clustermgtd"],
["Reset the following compute resources because insufficient capacity timeout expired"],
)

# Wait for the target dynamic node to be power-saved (reset)
wait_for_compute_nodes_states(scheduler_commands, [target_node], expected_states=["idle~"], stop_max_delay_secs=600)

# Wait for job1 to run and enter REQUEUE_HOLD.
# Known Slurm 25.11 bug: jobs with --requeue=expedite enter REQUEUE_HOLD after successful
# completion instead of COMPLETED, because _set_job_requeue_exit_value() unconditionally
# triggers expedited requeue without checking exit_code.
# We wait for REQUEUE_HOLD directly and read start time from the slurm output file,
# since StartTime in scontrol resets to Unknown in REQUEUE_HOLD state.
# TODO: Change to wait_job_completed + assert_job_succeeded once the Slurm bug is fixed.
def _assert_job_in_requeue_hold():
result = remote_command_executor.run_remote_command(f"scontrol show jobs -o {job1_id}")
assert_that(result.stdout).contains("JobState=REQUEUE_HOLD")

retry(wait_fixed=seconds(10), stop_max_delay=minutes(15))(_assert_job_in_requeue_hold)()
logging.info("Job1 entered REQUEUE_HOLD as expected (known Slurm 25.11 bug)")
scheduler_commands.cancel_job(job1_id)

# Wait for job2 to complete normally
scheduler_commands.wait_job_completed(job2_id, timeout=15)
scheduler_commands.assert_job_succeeded(job2_id)

# Read start times from slurm output files (epoch timestamps)
job1_output = remote_command_executor.run_remote_command(f"cat ~/slurm-{job1_id}.out").stdout
job2_output = remote_command_executor.run_remote_command(f"cat ~/slurm-{job2_id}.out").stdout
job1_start_epoch = int(re.search(r"START_TIME=(\d+)", job1_output).group(1))
job2_start_epoch = int(re.search(r"START_TIME=(\d+)", job2_output).group(1))
logging.info("Job1 output: %s", job1_output)
logging.info("Job2 output: %s", job2_output)

# Verify expedited requeue job (job1) started before normal job (job2)
logging.info("Job1 start_epoch=%s, Job2 start_epoch=%s", job1_start_epoch, job2_start_epoch)
assert_that(job1_start_epoch).is_less_than_or_equal_to(job2_start_epoch)
logging.info("Verified: expedited requeue job started before normal job (highest priority)")

# Verify no protected mode triggered
assert_no_msg_in_logs(
remote_command_executor,
["/var/log/parallelcluster/clustermgtd"],
["Node bootstrap error"],
)


@pytest.mark.usefixtures("region", "os", "instance", "scheduler")
@pytest.mark.slurm_config_update
def test_slurm_config_update(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
Image:
Os: {{ os }}
HeadNode:
InstanceType: {{ instance }}
Networking:
SubnetId: {{ public_subnet_id }}
Ssh:
KeyName: {{ key_name }}
Scheduling:
Scheduler: slurm
SlurmQueues:
- Name: queue
Networking:
SubnetIds:
- {{ private_subnet_id }}
ComputeResources:
- Name: ice-cr
Instances:
- InstanceType: t3.medium
- InstanceType: c5.large
- Name: normal-cr
InstanceType: {{ instance }}
SharedStorage:
- MountDir: /shared
Name: name1
StorageType: Ebs
Loading