Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract kubernetes cluster collection functionality to a different file #265

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion acto/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from acto.input.value_with_schema import (ValueWithSchema,
attach_schema_to_value)
from acto.input.valuegenerator import ArrayGenerator
from acto.kubectl_client import KubectlClient
from acto.kubernetes_io import KubectlClient
from acto.kubernetes_engine import base, k3d, kind
from acto.oracle_handle import OracleHandle
from acto.runner import Runner
Expand Down
44 changes: 0 additions & 44 deletions acto/kubectl_client/kubectl.py

This file was deleted.

File renamed without changes.
72 changes: 72 additions & 0 deletions acto/kubernetes_io/kubectl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import subprocess
import tempfile

import kubernetes
import yaml


class KubectlClient:

def __init__(self, kubeconfig: str, context_name: str):

if not kubeconfig:
raise ValueError('kubeconfig is required')
if not context_name:
raise ValueError('context_name is required')

self.kubeconfig = kubeconfig
self.context_name = context_name

def exec(self,
pod: str,
namespace: str,
commands: list,
capture_output=False,
text=False) -> subprocess.CompletedProcess:
"""Executes a command in a pod"""
cmd = ['exec']
cmd.extend([pod])
cmd.extend(['--namespace', namespace])
cmd.extend(['--'])
cmd.extend(commands)

return self.kubectl(cmd, capture_output, text)

def kubectl(self,
args: list,
capture_output=False,
text=False,
timeout: int = 600) -> subprocess.CompletedProcess:
"""Executes a kubectl command"""
cmd = ['kubectl']
cmd.extend(['--kubeconfig', self.kubeconfig])
cmd.extend(['--context', self.context_name])

cmd.extend(args)

p = subprocess.run(cmd, capture_output=capture_output, text=text, timeout=timeout)
return p

def apply(self, file_content: dict, **kwargs) -> subprocess.CompletedProcess:
args = {
'namespace': '-n',
'server_side': '--server-side',
}
with tempfile.NamedTemporaryFile(mode='w+', suffix='.yaml') as f:
if isinstance(file_content, list):
yaml.safe_dump_all(file_content, f)
else:
yaml.safe_dump(file_content, f)
cmd = ['apply', '-f', f.name]
for k, v in kwargs.items():
if k in args:
cmd.extend([args[k]])
if v:
cmd.extend([v])
else:
raise ValueError(f'Invalid argument {k}')
return self.kubectl(cmd, capture_output=True, text=True)

def init_kube_api_client(self) -> kubernetes.client.ApiClient:
return kubernetes.config.kube_config.new_client_from_config(config_file=self.kubeconfig,
context=self.context_name)
Loading