Skip to content

Commit

Permalink
Support helm and some fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Tyler Gu <[email protected]>
  • Loading branch information
tylergu committed Sep 5, 2024
1 parent 915d233 commit 73ad3d9
Show file tree
Hide file tree
Showing 7 changed files with 247 additions and 82 deletions.
108 changes: 55 additions & 53 deletions acto/deploy.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,36 @@
import logging
import time

import kubernetes
import yaml

import acto.utils as utils
from acto.common import *
from acto.common import kubernetes_client, print_event
from acto.kubectl_client.kubectl import KubectlClient
from acto.lib.operator_config import DELEGATED_NAMESPACE, DeployConfig
from acto.utils import get_thread_logger
from acto.utils.preprocess import add_acto_label


def wait_for_pod_ready(apiclient: kubernetes.client.ApiClient):
logger = get_thread_logger(with_prefix=True)
logger.debug("Waiting for all pods to be ready")
time.sleep(5)
pod_ready = False
for tick in range(600):
# check if all pods are ready
pods = kubernetes.client.CoreV1Api(
apiclient).list_pod_for_all_namespaces().items

all_pods_ready = True
for pod in pods:
if pod.status.phase == "Succeeded":
continue
if not utils.is_pod_ready(pod):
all_pods_ready = False

if all_pods_ready:
logger.info("Operator ready")
pod_ready = True
break
time.sleep(5)
logger.info("All pods took %d seconds to get ready" % (tick * 5))
if not pod_ready:
logger.error("Some pods failed to be ready within timeout")
def wait_for_pod_ready(kubectl_client: KubectlClient) -> bool:
"""Wait for all pods to be ready"""
now = time.time()
p = kubectl_client.wait_for_all_pods(timeout=600)
if p.returncode != 0:
logging.error(
"Failed to wait for all pods to be ready due to error from kubectl"
+ f" (returncode={p.returncode})"
+ f" (stdout={p.stdout})"
+ f" (stderr={p.stderr})"
)
return False
else:
return True
logging.info(
"Waited for all pods to be ready for %d seconds", time.time() - now
)
return True


class Deploy():
class Deploy:
"""Deploy the operator using the deploy config"""

def __init__(self, deploy_config: DeployConfig) -> None:
self._deploy_config = deploy_config
Expand All @@ -52,24 +41,30 @@ def __init__(self, deploy_config: DeployConfig) -> None:
self._operator_yaml = step.apply.file
break
else:
raise Exception("No operator yaml found in deploy config")
raise RuntimeError("No operator yaml found in deploy config")

# Extract the operator_container_name from config
self._operator_container_name = None
for step in self._deploy_config.steps:
if step.apply and step.apply.operator:
self._operator_container_name = step.apply.operator_container_name
self._operator_container_name = (
step.apply.operator_container_name
)
break

@property
def operator_yaml(self) -> str:
"""Get the operator yaml file path"""
return self._operator_yaml

def deploy(self,
kubeconfig: str,
context_name: str,
kubectl_client: KubectlClient,
namespace: str):
def deploy(
self,
kubeconfig: str,
context_name: str,
kubectl_client: KubectlClient,
namespace: str,
):
"""Deploy the operator using the deploy config"""
logger = get_thread_logger(with_prefix=True)
print_event("Deploying operator...")
api_client = kubernetes_client(kubeconfig, context_name)
Expand Down Expand Up @@ -97,22 +92,24 @@ def deploy(self,
p = kubectl_client.kubectl(args, capture_output=True)
if p.returncode != 0:
logger.error(
"Failed to deploy operator due to error from kubectl" +
f" (returncode={p.returncode})" +
f" (stdout={p.stdout})" +
f" (stderr={p.stderr})")
"Failed to deploy operator due to error from kubectl"
+ f" (returncode={p.returncode})"
+ f" (stdout={p.stdout})"
+ f" (stderr={p.stderr})"
)
return False
elif not wait_for_pod_ready(api_client):
elif not wait_for_pod_ready(kubectl_client):
logger.error(
"Failed to deploy operator due to timeout waiting for pod to be ready")
"Failed to deploy operator due to timeout waiting for pod to be ready"
)
return False
elif step.wait:
# Simply wait for the specified duration
time.sleep(step.wait.duration)

# Add acto label to the operator pod
add_acto_label(api_client, namespace)
if not wait_for_pod_ready(api_client):
if not wait_for_pod_ready(kubectl_client):
logger.error("Failed to deploy operator")
return False

Expand All @@ -121,22 +118,26 @@ def deploy(self,
print_event("Operator deployed")
return True

def deploy_with_retry(self,
kubeconfig: str,
context_name: str,
kubectl_client: KubectlClient,
namespace: str,
retry_count: int = 3):
def deploy_with_retry(
self,
kubeconfig: str,
context_name: str,
kubectl_client: KubectlClient,
namespace: str,
retry_count: int = 3,
):
"""Deploy the operator with retry"""
logger = get_thread_logger(with_prefix=True)
for i in range(retry_count):
for _ in range(retry_count):
if self.deploy(kubeconfig, context_name, kubectl_client, namespace):
return True
else:
logger.error("Failed to deploy operator, retrying...")
return False

def operator_name(self) -> str:
with open(self._operator_yaml) as f:
"""Get the name of the operator deployment"""
with open(self._operator_yaml, "r", encoding="utf-8") as f:
operator_yamls = yaml.load_all(f, Loader=yaml.FullLoader)
for yaml_ in operator_yamls:
if yaml_["kind"] == "Deployment":
Expand All @@ -145,4 +146,5 @@ def operator_name(self) -> str:

@property
def operator_container_name(self) -> str:
"""Get the name of the operator container"""
return self._operator_container_name
46 changes: 46 additions & 0 deletions acto/kubectl_client/helm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import subprocess
from typing import Optional


class Helm:
"""Helm client class"""

def __init__(self, kubeconfig: str, context_name: str) -> None:
self.kubeconfig = kubeconfig
self.context_name = context_name

def helm(self, args: list) -> subprocess.CompletedProcess:
"""Executes a helm command"""
cmd = ["helm"]
cmd.extend(args)
cmd.extend(["--kubeconfig", self.kubeconfig])
cmd.extend(["--kube-context", self.context_name])
return subprocess.run(cmd, capture_output=True, text=True, check=False)

def repo_add(self, name: str, url: str) -> subprocess.CompletedProcess:
"""Adds a helm repository"""
cmd = ["repo", "add", name, url]
return self.helm(cmd)

def install(
self,
release_name: str,
chart: str,
namespace: str,
repo: Optional[str] = None,
args: Optional[list] = None,
) -> subprocess.CompletedProcess:
"""Installs a helm chart"""
cmd = [
"install",
release_name,
chart,
"--namespace",
namespace,
"--create-namespace",
]
if repo:
cmd.extend(["--repo", repo])
if args:
cmd.extend(args)
return self.helm(cmd)
97 changes: 74 additions & 23 deletions acto/kubectl_client/kubectl.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,95 @@
import logging
import subprocess
from typing import Optional


class KubectlClient:
"""Kubectl client class"""

def __init__(self, kubeconfig: str, context_name: str):

if not kubeconfig:
raise ValueError('kubeconfig is required')
raise ValueError("kubeconfig is required")
if not context_name:
raise ValueError('context_name is required')
raise ValueError("context_name is required")

self.kubeconfig = kubeconfig
self.context_name = context_name

def exec(self,
pod: str,
namespace: str,
commands: list,
capture_output=False,
text=False) -> subprocess.CompletedProcess:
'''Executes a command in a pod'''
cmd = ['exec']
def exec(
self,
pod: str,
namespace: str,
commands: list,
capture_output=False,
text=False,
) -> subprocess.CompletedProcess:
"""Executes a command in a pod"""
cmd = ["exec"]
cmd.extend([pod])
cmd.extend(['--namespace', namespace])
cmd.extend(['--'])
cmd.extend(["--namespace", namespace])
cmd.extend(["--"])
cmd.extend(commands)

return self.kubectl(cmd, capture_output, text)

def kubectl(self,
args: list,
capture_output=False,
text=False,
timeout: int = 600) -> subprocess.CompletedProcess:
'''Executes a kubectl command'''
cmd = ['kubectl']
cmd.extend(['--kubeconfig', self.kubeconfig])
cmd.extend(['--context', self.context_name])
def kubectl(
self, args: list, capture_output=False, text=False, timeout: int = 600
) -> subprocess.CompletedProcess:
"""Executes a kubectl command"""
cmd = ["kubectl"]
cmd.extend(["--kubeconfig", self.kubeconfig])
cmd.extend(["--context", self.context_name])

cmd.extend(args)

p = subprocess.run(cmd, capture_output=capture_output, text=text, timeout=timeout)
return p
logging.info("Running kubectl command: %s", " ".join(cmd))
p = subprocess.run(
cmd,
capture_output=capture_output,
text=text,
timeout=timeout,
check=False,
)
return p

def wait(
self,
file: str,
for_condition: str,
timeout: int = 600,
namespace: Optional[str] = None,
) -> subprocess.CompletedProcess:
"""Waits for a condition to be true"""
cmd = [
"wait",
"-f",
file,
"--for",
for_condition,
"--timeout",
f"{timeout}s",
]
if namespace:
cmd.extend(["-n", namespace])
else:
cmd.extend(["--all-namespaces"])
return self.kubectl(cmd, capture_output=True, text=True)

def wait_for_all_pods(
self, timeout: int = 600, namespace: Optional[str] = None
) -> subprocess.CompletedProcess:
"""Waits for all pods to be ready"""
cmd = [
"wait",
"--for=condition=Ready",
"--timeout",
f"{timeout}s",
"pods",
"--all",
]
if namespace:
cmd.extend(["-n", namespace])
else:
cmd.extend(["--all-namespaces"])
return self.kubectl(cmd, capture_output=True, text=True)
9 changes: 5 additions & 4 deletions acto/kubernetes_engine/kind.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
import subprocess
import time
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional

import kubernetes
import yaml
Expand All @@ -22,7 +22,7 @@ def __init__(
posthooks: List[base.KubernetesEnginePostHookType] = None,
feature_gates: Dict[str, bool] = None,
num_nodes=1,
version="",
version: Optional[str] = None,
):
self._config_path = os.path.join(
CONST.CLUSTER_CONFIG_FOLDER, f"KIND-{acto_namespace}.yaml"
Expand All @@ -39,7 +39,7 @@ def __init__(
extra_mounts.append(
{"hostPath": "profile/data", "containerPath": "/tmp/profile"}
)
for _ in range(num_nodes - 1):
for _ in range(num_nodes):
config_dict["nodes"].append(
{
"role": "worker",
Expand Down Expand Up @@ -108,7 +108,8 @@ def create_cluster(self, name: str, kubeconfig: str):

cmd.extend(["--config", self._config_path])

cmd.extend(["--image", f"kindest/node:{self._k8s_version}"])
if self._k8s_version:
cmd.extend(["--image", f"kindest/node:{self._k8s_version}"])

p = subprocess.run(cmd, check=False)
i = 0
Expand Down
3 changes: 2 additions & 1 deletion acto/lib/operator_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
class ApplyStep(pydantic.BaseModel, extra="forbid"):
"""Configuration for each step of kubectl apply"""

file: str = pydantic.Field(description="Path to the file for kubectl apply")
file: str = pydantic.Field(
description="Path to the file for kubectl apply")
operator: bool = pydantic.Field(
description="If the file contains the operator deployment",
default=False,
Expand Down
Loading

0 comments on commit 73ad3d9

Please sign in to comment.