Skip to content
This repository has been archived by the owner on Sep 14, 2020. It is now read-only.

Switch to pykube-ng to handle arbitrary resources (pods, jobs, etc) #110

Merged
merged 16 commits into from
Jul 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ env:
- MINIKUBE_VERSION=1.0.1
- E2E=true # to not skip the e2e tests in pytest
matrix:
- KUBERNETES_VERSION=1.14.0
- KUBERNETES_VERSION=1.13.0
- KUBERNETES_VERSION=1.12.0
- KUBERNETES_VERSION=1.14.0 CLIENT=yes # only one "yes" is enough
- KUBERNETES_VERSION=1.14.0 CLIENT=no
- KUBERNETES_VERSION=1.13.0 CLIENT=no
- KUBERNETES_VERSION=1.12.0 CLIENT=no
# - KUBERNETES_VERSION=1.11.10 # Minikube fails on CRI preflight checks
# - KUBERNETES_VERSION=1.10.13 # CRDs require spec.version, which fails on 1.14

Expand All @@ -33,6 +34,7 @@ matrix:

before_script:
- tools/minikube-for-travis.sh
- tools/kubernetes-client.sh

script:
- pytest -v
Expand Down
13 changes: 10 additions & 3 deletions examples/09-testing/test_example_09.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,25 @@

@pytest.fixture(autouse=True)
def crd_exists():
subprocess.run(f"kubectl apply -f {crd_yaml}", shell=True, check=True)
subprocess.run(f"kubectl apply -f {crd_yaml}",
check=True, timeout=10, capture_output=True, shell=True)


@pytest.fixture(autouse=True)
def obj_absent():
subprocess.run(f"kubectl delete -f {obj_yaml}", shell=True, check=False)
# Operator is not running in fixtures, so we need a force-delete (or this patch).
subprocess.run(['kubectl', 'patch', '-f', obj_yaml,
'-p', '{"metadata":{"finalizers":[]}}',
'--type', 'merge'],
check=False, timeout=10, capture_output=True)
subprocess.run(f"kubectl delete -f {obj_yaml}",
check=False, timeout=10, capture_output=True, shell=True)


def test_resource_lifecycle(mocker):

# To prevent lengthy threads in the loop executor when the process exits.
mocker.patch('kopf.clients.watching.DEFAULT_STREAM_TIMEOUT', 10)
mocker.patch('kopf.config.WatchersConfig.default_stream_timeout', 10)

# Run an operator and simulate some activity with the operated resource.
with kopf.testing.KopfRunner(['run', '--verbose', '--standalone', example_py]) as runner:
Expand Down
39 changes: 39 additions & 0 deletions examples/10-builtins/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Kopf example for built-in resources

Kopf can also handle the built-in resources, such as Pods, Jobs, etc.

In this example, we take control all over the pods (namespaced/cluster-wide),
and allow the pods to exist for no longer than 30 seconds --
either after creation or after the operator restart.

For no specific reason, just for fun. Maybe, as a way of Chaos Engineering
to force making the resilient applications (tolerant to pod killing).

However, the system namespaces (kube-system, etc) are explicitly protected --
to prevent killing the cluster itself.

Start the operator:

```bash
kopf run example.py --verbose
```

Start a sample pod:

```bash
kubectl run -it --image=ubuntu expr1 -- bash -i
# wait for 30s
```

Since `kubectl run` creates a Deployment, not just a Pod,
a new pod will be created every 30 seconds. Observe with:

```bash
kubectl get pods --watch
```

Cleanup in the end:

```bash
$ kubectl delete deployment expr1
```
47 changes: 47 additions & 0 deletions examples/10-builtins/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import asyncio

import kopf
import pykube

tasks = {} # dict{namespace: dict{name: asyncio.Task}}

try:
cfg = pykube.KubeConfig.from_service_account()
except FileNotFoundError:
cfg = pykube.KubeConfig.from_file()
api = pykube.HTTPClient(cfg)


@kopf.on.resume('', 'v1', 'pods')
@kopf.on.create('', 'v1', 'pods')
async def pod_in_sight(namespace, name, logger, **kwargs):
if namespace.startswith('kube-'):
return
else:
task = asyncio.create_task(pod_killer(namespace, name, logger))
tasks.setdefault(namespace, {})
tasks[namespace][name] = task


@kopf.on.delete('', 'v1', 'pods')
async def pod_deleted(namespace, name, **kwargs):
if namespace in tasks and name in tasks[namespace]:
task = tasks[namespace][name]
task.cancel() # it will also remove from `tasks`


async def pod_killer(namespace, name, logger, timeout=30):
try:
logger.info(f"=== Pod killing happens in {timeout}s.")
await asyncio.sleep(timeout)
logger.info(f"=== Pod killing happens NOW!")

pod = pykube.Pod.objects(api, namespace=namespace).get_by_name(name)
pod.delete()

except asyncio.CancelledError:
logger.info(f"=== Pod killing is cancelled!")

finally:
if namespace in tasks and name in tasks[namespace]:
del tasks[namespace][name]
46 changes: 46 additions & 0 deletions examples/10-builtins/test_example_10.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import os.path
import time

import pykube

import kopf.testing


def test_pods_reacted():

example_py = os.path.join(os.path.dirname(__file__), 'example.py')
with kopf.testing.KopfRunner(['run', '--verbose', example_py]) as runner:
_create_pod()
time.sleep(5) # give it some time to react
_delete_pod()
time.sleep(1) # give it some time to react

assert runner.exception is None
assert runner.exit_code == 0

assert '[default/kopf-pod-1] Creation event:' in runner.stdout
assert '[default/kopf-pod-1] === Pod killing happens in 30s.' in runner.stdout
assert '[default/kopf-pod-1] Deletion event:' in runner.stdout
assert '[default/kopf-pod-1] === Pod killing is cancelled!' in runner.stdout


def _create_pod():
api = pykube.HTTPClient(pykube.KubeConfig.from_file())
pod = pykube.Pod(api, {
'apiVersion': 'v1',
'kind': 'Pod',
'metadata': {'name': 'kopf-pod-1', 'namespace': 'default'},
'spec': {
'containers': [{
'name': 'the-only-one',
'image': 'busybox',
'command': ["sh", "-x", "-c", "sleep 1"],
}]},
})
pod.create()


def _delete_pod():
api = pykube.HTTPClient(pykube.KubeConfig.from_file())
pod = pykube.Pod.objects(api, namespace='default').get_by_name('kopf-pod-1')
pod.delete()
1 change: 1 addition & 0 deletions examples/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
kopf
kubernetes
thilp marked this conversation as resolved.
Show resolved Hide resolved
pykube-ng
pyyaml
4 changes: 3 additions & 1 deletion kopf/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@

def cli_login():
try:
auth.login()
auth.login(verify=True)
except auth.LoginError as e:
raise click.ClickException(str(e))
except auth.AccessError as e:
raise click.ClickException(str(e))


def logging_options(fn):
Expand Down
126 changes: 111 additions & 15 deletions kopf/clients/auth.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,138 @@
import logging
from typing import Optional

import kubernetes
import pykube
import requests
import urllib3.exceptions

logger = logging.getLogger(__name__)

# Set in login(), consumed in get_pykube_cfg() and all API calls.
_pykube_cfg: Optional[pykube.KubeConfig] = None


class LoginError(Exception):
""" Raised when the operator cannot login to the API. """


def login():
class AccessError(Exception):
""" Raised when the operator cannot access the cluster API. """


def login(verify=False):
"""
Login the the Kubernetes cluster, locally or remotely.
Login to Kubernetes cluster, locally or remotely.

Keep the logged in state or config object in the global variables,
so that it can be available for future calls via the same function call.

Automatic refresh/reload of the tokens or objects also should be done here.
"""

# Configure the default client credentials for all possible environments.
# Pykube login is mandatory. If it fails, the framework will not run at all.
try:
import pykube
except ImportError:
raise # mandatory
thilp marked this conversation as resolved.
Show resolved Hide resolved
else:
login_pykube(verify=verify)

# We keep the official client library auto-login only because it was
# an implied behavior before switching to pykube -- to keep it so (implied).
thilp marked this conversation as resolved.
Show resolved Hide resolved
try:
import kubernetes
except ImportError:
pass # optional
else:
login_client(verify=verify)


def login_pykube(verify=False):
global _pykube_cfg
try:
_pykube_cfg = pykube.KubeConfig.from_service_account()
logger.debug("Pykube is configured in cluster with service account.")
except FileNotFoundError:
try:
_pykube_cfg = pykube.KubeConfig.from_file()
logger.debug("Pykube is configured via kubeconfig file.")
except (pykube.PyKubeError, FileNotFoundError):
raise LoginError(f"Cannot authenticate pykube neither in-cluster, nor via kubeconfig.")

if verify:
verify_pykube()


def login_client(verify=False):
import kubernetes.client
try:
kubernetes.config.load_incluster_config() # cluster env vars
logger.debug("configured in cluster with service account")
logger.debug("Client is configured in cluster with service account.")
except kubernetes.config.ConfigException as e1:
try:
kubernetes.config.load_kube_config() # developer's config files
logger.debug("configured via kubeconfig file")
logger.debug("Client is configured via kubeconfig file.")
except kubernetes.config.ConfigException as e2:
raise LoginError(f"Cannot authenticate neither in-cluster, nor via kubeconfig.")
raise LoginError(f"Cannot authenticate client neither in-cluster, nor via kubeconfig.")

# Make a sample API call to ensure the login is successful,
# and convert some of the known exceptions to the CLI hints.
if verify:
verify_client()


def verify_pykube():
"""
Verify if login has succeeded, and the access configuration is still valid.

All other errors (e.g. 403, 404) are ignored: it means, the host and port
are configured and are reachable, the authentication token is accepted,
and the rest are authorization or configuration errors (not a showstopper).
"""
try:
api = get_pykube_api()
rsp = api.get(version="", base="/")
rsp.raise_for_status()
api.raise_for_status(rsp) # replaces requests's HTTPError with its own.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the rsp.raise_for_status() can be removed, because api.raise_for_status(rsp) already does it and you are already catching everything.

Basically if the first line raises, the second would as well (but will not be run because the first raised), and if the first line doesn't raise, then the second won't either, so nothing is gained by having both.

except requests.exceptions.ConnectionError as e:
raise AccessError("Cannot connect to the Kubernetes API. "
"Please configure the cluster access.")
except pykube.exceptions.HTTPError as e:
if e.code == 401:
raise AccessError("Cannot authenticate to the Kubernetes API. "
"Please login or configure the tokens.")
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
raise AccessError("Cannot authenticate to the Kubernetes API. "
"Please login or configure the tokens.")


def verify_client():
"""
Verify if login has succeeded, and the access configuration is still valid.

All other errors (e.g. 403, 404) are ignored: it means, the host and port
are configured and are reachable, the authentication token is accepted,
and the rest are authorization or configuration errors (not a showstopper).
"""
import kubernetes.client.rest
try:
api = kubernetes.client.CoreApi()
api.get_api_versions()
except urllib3.exceptions.HTTPError as e:
raise LoginError("Cannot connect to the Kubernetes API. "
"Please configure the cluster access.")
raise AccessError("Cannot connect to the Kubernetes API. "
"Please configure the cluster access.")
except kubernetes.client.rest.ApiException as e:
if e.status == 401:
raise LoginError("Cannot authenticate to the Kubernetes API. "
"Please login or configure the tokens.")
else:
raise
raise AccessError("Cannot authenticate to the Kubernetes API. "
"Please login or configure the tokens.")


def get_pykube_cfg() -> pykube.KubeConfig:
if _pykube_cfg is None:
raise LoginError("Not logged in with PyKube.")
return _pykube_cfg


# TODO: add some caching, but keep kwargs in mind. Maybe add a key= for purpose/use-place?
def get_pykube_api(timeout=None) -> pykube.HTTPClient:
kwargs = dict(timeout=timeout) if timeout is not None else dict()
return pykube.HTTPClient(get_pykube_cfg(), **kwargs)
23 changes: 23 additions & 0 deletions kopf/clients/classes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from typing import Type

import pykube

from kopf.clients import auth


def _make_cls(resource) -> Type[pykube.objects.APIObject]:
api = auth.get_pykube_api()
api_resources = api.resource_list(resource.api_version)['resources']
resource_kind = next((r['kind'] for r in api_resources if r['name'] == resource.plural), None)
is_namespaced = next((r['namespaced'] for r in api_resources if r['name'] == resource.plural), None)
if not resource_kind:
raise pykube.ObjectDoesNotExist(f"No such CRD: {resource.name}")

cls_name = resource.plural
cls_base = pykube.objects.NamespacedAPIObject if is_namespaced else pykube.objects.APIObject
cls = type(cls_name, (cls_base,), {
'version': resource.api_version,
'endpoint': resource.plural,
'kind': resource_kind,
})
return cls
Loading