Skip to content

Commit

Permalink
refactor: changed __next__ iterator returning a callable
Browse files Browse the repository at this point in the history
  • Loading branch information
SteBaum committed Jan 29, 2024
1 parent f8c5713 commit 6d915ea
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 101 deletions.
11 changes: 6 additions & 5 deletions tdp/cli/commands/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
)
from tdp.core.cluster_status import ClusterStatus
from tdp.core.deployment import DeploymentRunner, Executor
from tdp.core.models import DeploymentStateEnum
from tdp.core.models import DeploymentStateEnum, OperationStateEnum

Check failure on line 25 in tdp/cli/commands/deploy.py

View workflow job for this annotation

GitHub Actions / Check linting with Ruff

Ruff (F401)

tdp/cli/commands/deploy.py:25:50: F401 `tdp.core.models.OperationStateEnum` imported but unused
from tdp.core.variables import ClusterVariables

if TYPE_CHECKING:
Expand Down Expand Up @@ -90,14 +90,15 @@ def deploy(

if dry:
for _ in deployment_iterator:
pass
_()
return

# deployment and operations records are mutated by the iterator so we need to
# commit them before iterating and at each iteration
session.commit() # Update deployment status to RUNNING, operations to PENDING
for cluster_status_logs in deployment_iterator:
session.commit() # Update operation status to RUNNING
for cluster_status_logs_process in deployment_iterator:
session.commit() # Update deployment and current operation status to RUNNING and next operations to PENDING
cluster_status_logs = cluster_status_logs_process()
cluster_status_logs
if cluster_status_logs and any(cluster_status_logs):
session.add_all(cluster_status_logs)
session.commit() # Update operation status to SUCCESS, FAILURE or HELD
Expand Down
194 changes: 98 additions & 96 deletions tdp/core/deployment/deployment_iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,115 +106,117 @@ def __init__(
except NothingToReconfigureError:
self._reconfigure_operations = None

def __next__(self) -> Optional[list[SCHStatusLogModel]]:
def __next__(self) -> Callable[[], Optional[list[SCHStatusLogModel]]]:
try:
while True:
operation_rec: OperationModel = next(self._iter)
operation_rec.state = OperationStateEnum.RUNNING

# Return early if deployment failed
if self.deployment.state == DeploymentStateEnum.FAILURE:
operation_rec.state = OperationStateEnum.HELD
return
else:
operation_rec.state = OperationStateEnum.RUNNING

# Retrieve operation to access parsed attributes and playbook
operation = self._collections.get_operation(operation_rec.operation)

# Run the operation
if operation.noop:
# A noop operation is always successful
operation_rec.state = OperationStateEnum.SUCCESS
else:
self._run_operation(operation_rec)

# Set deployment status to failure if the operation failed
if operation_rec.state != OperationStateEnum.SUCCESS:
self.deployment.end_time = datetime.utcnow()
self.deployment.state = DeploymentStateEnum.FAILURE
# Return early as status is not updated
return

# ===== Update the cluster status if success =====

# Skip sleep operation
if operation.name == OPERATION_SLEEP_NAME:
return

sch_status_logs: list[SCHStatusLogModel] = []
sc_name = ServiceComponentName(
service_name=operation.service_name,
component_name=operation.component_name,
)
is_sc_stale = self._cluster_status.is_sc_stale(
sc_name, sc_hosts=operation.host_names
)
def process_operation_rec() -> Optional[list[SCHStatusLogModel]]:
# Return early if deployment failed
if self.deployment.state == DeploymentStateEnum.FAILURE:
operation_rec.state = OperationStateEnum.HELD
return
else:
operation_rec.state = OperationStateEnum.RUNNING

if is_sc_stale:
# Get the first reconfigure operation if any
if self._reconfigure_operations:
try:
first_reconfigure_operation = next(
iter(self._reconfigure_operations)
)
except StopIteration:
first_reconfigure_operation = None
# Retrieve operation to access parsed attributes and playbook
operation = self._collections.get_operation(operation_rec.operation)

# Run the operation
if operation.noop:
# A noop operation is always successful
operation_rec.state = OperationStateEnum.SUCCESS
else:
first_reconfigure_operation = None
self._run_operation(operation_rec)

# Set deployment status to failure if the operation failed
if operation_rec.state != OperationStateEnum.SUCCESS:
self.deployment.end_time = datetime.utcnow()
self.deployment.state = DeploymentStateEnum.FAILURE
# Return early as status is not updated
return

# ===== Update the cluster status if success =====

can_update_stale = self.force_stale_update or (
operation_rec.operation == first_reconfigure_operation
# Skip sleep operation
if operation.name == OPERATION_SLEEP_NAME:
return

sch_status_logs: list[SCHStatusLogModel] = []
sc_name = ServiceComponentName(
service_name=operation.service_name,
component_name=operation.component_name,
)
is_sc_stale = self._cluster_status.is_sc_stale(
sc_name, sc_hosts=operation.host_names
)

# Log a warning if the operation affect a stale SCH which is not the first reconfigure operation (if any)
if not can_update_stale:
logger.warning(
f"can't update stale {sc_name} with {operation_rec.operation}\n"
+ "first operation is {first_reconfigure_operation}"
if is_sc_stale:
# Get the first reconfigure operation if any
if self._reconfigure_operations:
try:
first_reconfigure_operation = next(
iter(self._reconfigure_operations)
)
except StopIteration:
first_reconfigure_operation = None
else:
first_reconfigure_operation = None

can_update_stale = self.force_stale_update or (
operation_rec.operation == first_reconfigure_operation
)
else:
can_update_stale = False

# fmt: off
hosts = (
[None] if operation.noop # A noop operation doesn't have any host
else [operation_rec.host] if operation_rec.host # Only one operation is launched on a single host
else operation.host_names # Host is not specified, hence the operation is launched on all host
)
# fmt: on

# Update the cluster status for each host
for host in hosts:
sch_status_log = self._cluster_status.update_sch(
ServiceComponentHostName(sc_name, host),
action_name=operation.action_name,
version=self._cluster_variables[operation.service_name].version,
can_update_stale=can_update_stale,

# Log a warning if the operation affect a stale SCH which is not the first reconfigure operation (if any)
if not can_update_stale:
logger.warning(
f"can't update stale {sc_name} with {operation_rec.operation}\n"
+ "first operation is {first_reconfigure_operation}"
)
else:
can_update_stale = False

# fmt: off
hosts = (
[None] if operation.noop # A noop operation doesn't have any host
else [operation_rec.host] if operation_rec.host # Only one operation is launched on a single host
else operation.host_names # Host is not specified, hence the operation is launched on all host
)
if sch_status_log:
sch_status_log.deployment_id = self.deployment.id
sch_status_log.source = (
SCHStatusLogSourceEnum.FORCED
if self.force_stale_update
else SCHStatusLogSourceEnum.DEPLOYMENT
# fmt: on

# Update the cluster status for each host
for host in hosts:
sch_status_log = self._cluster_status.update_sch(
ServiceComponentHostName(sc_name, host),
action_name=operation.action_name,
version=self._cluster_variables[operation.service_name].version,
can_update_stale=can_update_stale,
)
sch_status_logs.append(sch_status_log)
if sch_status_log:
sch_status_log.deployment_id = self.deployment.id
sch_status_log.source = (
SCHStatusLogSourceEnum.FORCED
if self.force_stale_update
else SCHStatusLogSourceEnum.DEPLOYMENT
)
sch_status_logs.append(sch_status_log)

# Update the reconfigure_operations dict
if self._reconfigure_operations:
hosts = self._reconfigure_operations.get(
operation_rec.operation, set()
)
# If host is defined and needed to be reconfigured,
# remove it from the reconfigure_operations dict
if operation_rec.host and operation_rec.host in hosts:
hosts.remove(operation_rec.host)
# If no host is defined, or no host is left,
# remove the entire operation from the reconfigure_operations dict
if not operation_rec.host or len(hosts) == 0:
self._reconfigure_operations.pop(operation_rec.operation, None)

return sch_status_logs
# Update the reconfigure_operations dict
if self._reconfigure_operations:
hosts = self._reconfigure_operations.get(
operation_rec.operation, set()
)
# If host is defined and needed to be reconfigured,
# remove it from the reconfigure_operations dict
if operation_rec.host and operation_rec.host in hosts:
hosts.remove(operation_rec.host)
# If no host is defined, or no host is left,
# remove the entire operation from the reconfigure_operations dict
if not operation_rec.host or len(hosts) == 0:
self._reconfigure_operations.pop(operation_rec.operation, None)
return sch_status_logs
return process_operation_rec
# StopIteration is a "normal" exception raised when the iteration has stopped
except StopIteration as e:
self.deployment.end_time = datetime.utcnow()
Expand Down

0 comments on commit 6d915ea

Please sign in to comment.