Skip to content
This repository has been archived by the owner on Sep 14, 2020. It is now read-only.

Commit

Permalink
Merge pull request #110 from nolar/pykube
Browse files Browse the repository at this point in the history
Switch to pykube-ng to handle arbitrary resources (pods, jobs, etc)
  • Loading branch information
Sergey Vasilyev authored Jul 9, 2019
2 parents 394ef18 + f105def commit 700c7d1
Show file tree
Hide file tree
Showing 37 changed files with 1,080 additions and 699 deletions.
8 changes: 5 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ env:
- MINIKUBE_HOME=$HOME
- MINIKUBE_VERSION=1.0.1
matrix:
- KUBERNETES_VERSION=1.14.0
- KUBERNETES_VERSION=1.13.0
- KUBERNETES_VERSION=1.12.0
- KUBERNETES_VERSION=1.14.0 CLIENT=yes # only one "yes" is enough
- KUBERNETES_VERSION=1.14.0 CLIENT=no
- KUBERNETES_VERSION=1.13.0 CLIENT=no
- KUBERNETES_VERSION=1.12.0 CLIENT=no
# - KUBERNETES_VERSION=1.11.10 # Minikube fails on CRI preflight checks
# - KUBERNETES_VERSION=1.10.13 # CRDs require spec.version, which fails on 1.14

Expand All @@ -32,6 +33,7 @@ matrix:

before_script:
- tools/minikube-for-travis.sh
- tools/kubernetes-client.sh

script:
- pytest -v
Expand Down
13 changes: 10 additions & 3 deletions examples/09-testing/test_example_09.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,25 @@

@pytest.fixture(autouse=True)
def crd_exists():
subprocess.run(f"kubectl apply -f {crd_yaml}", shell=True, check=True)
subprocess.run(f"kubectl apply -f {crd_yaml}",
check=True, timeout=10, capture_output=True, shell=True)


@pytest.fixture(autouse=True)
def obj_absent():
subprocess.run(f"kubectl delete -f {obj_yaml}", shell=True, check=False)
# Operator is not running in fixtures, so we need a force-delete (or this patch).
subprocess.run(['kubectl', 'patch', '-f', obj_yaml,
'-p', '{"metadata":{"finalizers":[]}}',
'--type', 'merge'],
check=False, timeout=10, capture_output=True)
subprocess.run(f"kubectl delete -f {obj_yaml}",
check=False, timeout=10, capture_output=True, shell=True)


def test_resource_lifecycle(mocker):

# To prevent lengthy threads in the loop executor when the process exits.
mocker.patch('kopf.clients.watching.DEFAULT_STREAM_TIMEOUT', 10)
mocker.patch('kopf.config.WatchersConfig.default_stream_timeout', 10)

# Run an operator and simulate some activity with the operated resource.
with kopf.testing.KopfRunner(['run', '--verbose', '--standalone', example_py]) as runner:
Expand Down
39 changes: 39 additions & 0 deletions examples/10-builtins/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Kopf example for built-in resources

Kopf can also handle the built-in resources, such as Pods, Jobs, etc.

In this example, we take control all over the pods (namespaced/cluster-wide),
and allow the pods to exist for no longer than 30 seconds --
either after creation or after the operator restart.

For no specific reason, just for fun. Maybe, as a way of Chaos Engineering
to force making the resilient applications (tolerant to pod killing).

However, the system namespaces (kube-system, etc) are explicitly protected --
to prevent killing the cluster itself.

Start the operator:

```bash
kopf run example.py --verbose
```

Start a sample pod:

```bash
kubectl run -it --image=ubuntu expr1 -- bash -i
# wait for 30s
```

Since `kubectl run` creates a Deployment, not just a Pod,
a new pod will be created every 30 seconds. Observe with:

```bash
kubectl get pods --watch
```

Cleanup in the end:

```bash
$ kubectl delete deployment expr1
```
47 changes: 47 additions & 0 deletions examples/10-builtins/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import asyncio

import kopf
import pykube

tasks = {} # dict{namespace: dict{name: asyncio.Task}}

try:
cfg = pykube.KubeConfig.from_service_account()
except FileNotFoundError:
cfg = pykube.KubeConfig.from_file()
api = pykube.HTTPClient(cfg)


@kopf.on.resume('', 'v1', 'pods')
@kopf.on.create('', 'v1', 'pods')
async def pod_in_sight(namespace, name, logger, **kwargs):
if namespace.startswith('kube-'):
return
else:
task = asyncio.create_task(pod_killer(namespace, name, logger))
tasks.setdefault(namespace, {})
tasks[namespace][name] = task


@kopf.on.delete('', 'v1', 'pods')
async def pod_deleted(namespace, name, **kwargs):
if namespace in tasks and name in tasks[namespace]:
task = tasks[namespace][name]
task.cancel() # it will also remove from `tasks`


async def pod_killer(namespace, name, logger, timeout=30):
try:
logger.info(f"=== Pod killing happens in {timeout}s.")
await asyncio.sleep(timeout)
logger.info(f"=== Pod killing happens NOW!")

pod = pykube.Pod.objects(api, namespace=namespace).get_by_name(name)
pod.delete()

except asyncio.CancelledError:
logger.info(f"=== Pod killing is cancelled!")

finally:
if namespace in tasks and name in tasks[namespace]:
del tasks[namespace][name]
46 changes: 46 additions & 0 deletions examples/10-builtins/test_example_10.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import os.path
import time

import pykube

import kopf.testing


def test_pods_reacted():

example_py = os.path.join(os.path.dirname(__file__), 'example.py')
with kopf.testing.KopfRunner(['run', '--verbose', example_py]) as runner:
_create_pod()
time.sleep(5) # give it some time to react
_delete_pod()
time.sleep(1) # give it some time to react

assert runner.exception is None
assert runner.exit_code == 0

assert '[default/kopf-pod-1] Creation event:' in runner.stdout
assert '[default/kopf-pod-1] === Pod killing happens in 30s.' in runner.stdout
assert '[default/kopf-pod-1] Deletion event:' in runner.stdout
assert '[default/kopf-pod-1] === Pod killing is cancelled!' in runner.stdout


def _create_pod():
api = pykube.HTTPClient(pykube.KubeConfig.from_file())
pod = pykube.Pod(api, {
'apiVersion': 'v1',
'kind': 'Pod',
'metadata': {'name': 'kopf-pod-1', 'namespace': 'default'},
'spec': {
'containers': [{
'name': 'the-only-one',
'image': 'busybox',
'command': ["sh", "-x", "-c", "sleep 1"],
}]},
})
pod.create()


def _delete_pod():
api = pykube.HTTPClient(pykube.KubeConfig.from_file())
pod = pykube.Pod.objects(api, namespace='default').get_by_name('kopf-pod-1')
pod.delete()
1 change: 1 addition & 0 deletions examples/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
kopf
kubernetes
pykube-ng
pyyaml
4 changes: 3 additions & 1 deletion kopf/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@

def cli_login():
try:
auth.login()
auth.login(verify=True)
except auth.LoginError as e:
raise click.ClickException(str(e))
except auth.AccessError as e:
raise click.ClickException(str(e))


def logging_options(fn):
Expand Down
126 changes: 111 additions & 15 deletions kopf/clients/auth.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,138 @@
import logging
from typing import Optional

import kubernetes
import pykube
import requests
import urllib3.exceptions

logger = logging.getLogger(__name__)

# Set in login(), consumed in get_pykube_cfg() and all API calls.
_pykube_cfg: Optional[pykube.KubeConfig] = None


class LoginError(Exception):
""" Raised when the operator cannot login to the API. """


def login():
class AccessError(Exception):
""" Raised when the operator cannot access the cluster API. """


def login(verify=False):
"""
Login the the Kubernetes cluster, locally or remotely.
Login to Kubernetes cluster, locally or remotely.
Keep the logged in state or config object in the global variables,
so that it can be available for future calls via the same function call.
Automatic refresh/reload of the tokens or objects also should be done here.
"""

# Configure the default client credentials for all possible environments.
# Pykube login is mandatory. If it fails, the framework will not run at all.
try:
import pykube
except ImportError:
raise # mandatory
else:
login_pykube(verify=verify)

# We keep the official client library auto-login only because it was
# an implied behavior before switching to pykube -- to keep it so (implied).
try:
import kubernetes
except ImportError:
pass # optional
else:
login_client(verify=verify)


def login_pykube(verify=False):
global _pykube_cfg
try:
_pykube_cfg = pykube.KubeConfig.from_service_account()
logger.debug("Pykube is configured in cluster with service account.")
except FileNotFoundError:
try:
_pykube_cfg = pykube.KubeConfig.from_file()
logger.debug("Pykube is configured via kubeconfig file.")
except (pykube.PyKubeError, FileNotFoundError):
raise LoginError(f"Cannot authenticate pykube neither in-cluster, nor via kubeconfig.")

if verify:
verify_pykube()


def login_client(verify=False):
import kubernetes.client
try:
kubernetes.config.load_incluster_config() # cluster env vars
logger.debug("configured in cluster with service account")
logger.debug("Client is configured in cluster with service account.")
except kubernetes.config.ConfigException as e1:
try:
kubernetes.config.load_kube_config() # developer's config files
logger.debug("configured via kubeconfig file")
logger.debug("Client is configured via kubeconfig file.")
except kubernetes.config.ConfigException as e2:
raise LoginError(f"Cannot authenticate neither in-cluster, nor via kubeconfig.")
raise LoginError(f"Cannot authenticate client neither in-cluster, nor via kubeconfig.")

# Make a sample API call to ensure the login is successful,
# and convert some of the known exceptions to the CLI hints.
if verify:
verify_client()


def verify_pykube():
"""
Verify if login has succeeded, and the access configuration is still valid.
All other errors (e.g. 403, 404) are ignored: it means, the host and port
are configured and are reachable, the authentication token is accepted,
and the rest are authorization or configuration errors (not a showstopper).
"""
try:
api = get_pykube_api()
rsp = api.get(version="", base="/")
rsp.raise_for_status()
api.raise_for_status(rsp) # replaces requests's HTTPError with its own.
except requests.exceptions.ConnectionError as e:
raise AccessError("Cannot connect to the Kubernetes API. "
"Please configure the cluster access.")
except pykube.exceptions.HTTPError as e:
if e.code == 401:
raise AccessError("Cannot authenticate to the Kubernetes API. "
"Please login or configure the tokens.")
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
raise AccessError("Cannot authenticate to the Kubernetes API. "
"Please login or configure the tokens.")


def verify_client():
"""
Verify if login has succeeded, and the access configuration is still valid.
All other errors (e.g. 403, 404) are ignored: it means, the host and port
are configured and are reachable, the authentication token is accepted,
and the rest are authorization or configuration errors (not a showstopper).
"""
import kubernetes.client.rest
try:
api = kubernetes.client.CoreApi()
api.get_api_versions()
except urllib3.exceptions.HTTPError as e:
raise LoginError("Cannot connect to the Kubernetes API. "
"Please configure the cluster access.")
raise AccessError("Cannot connect to the Kubernetes API. "
"Please configure the cluster access.")
except kubernetes.client.rest.ApiException as e:
if e.status == 401:
raise LoginError("Cannot authenticate to the Kubernetes API. "
"Please login or configure the tokens.")
else:
raise
raise AccessError("Cannot authenticate to the Kubernetes API. "
"Please login or configure the tokens.")


def get_pykube_cfg() -> pykube.KubeConfig:
if _pykube_cfg is None:
raise LoginError("Not logged in with PyKube.")
return _pykube_cfg


# TODO: add some caching, but keep kwargs in mind. Maybe add a key= for purpose/use-place?
def get_pykube_api(timeout=None) -> pykube.HTTPClient:
kwargs = dict(timeout=timeout) if timeout is not None else dict()
return pykube.HTTPClient(get_pykube_cfg(), **kwargs)
23 changes: 23 additions & 0 deletions kopf/clients/classes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from typing import Type

import pykube

from kopf.clients import auth


def _make_cls(resource) -> Type[pykube.objects.APIObject]:
api = auth.get_pykube_api()
api_resources = api.resource_list(resource.api_version)['resources']
resource_kind = next((r['kind'] for r in api_resources if r['name'] == resource.plural), None)
is_namespaced = next((r['namespaced'] for r in api_resources if r['name'] == resource.plural), None)
if not resource_kind:
raise pykube.ObjectDoesNotExist(f"No such CRD: {resource.name}")

cls_name = resource.plural
cls_base = pykube.objects.NamespacedAPIObject if is_namespaced else pykube.objects.APIObject
cls = type(cls_name, (cls_base,), {
'version': resource.api_version,
'endpoint': resource.plural,
'kind': resource_kind,
})
return cls
Loading

0 comments on commit 700c7d1

Please sign in to comment.