diff --git a/.travis.yml b/.travis.yml index 2b0b6757..66b17410 100644 --- a/.travis.yml +++ b/.travis.yml @@ -17,9 +17,10 @@ env: - MINIKUBE_VERSION=1.0.1 - E2E=true # to not skip the e2e tests in pytest matrix: - - KUBERNETES_VERSION=1.14.0 - - KUBERNETES_VERSION=1.13.0 - - KUBERNETES_VERSION=1.12.0 + - KUBERNETES_VERSION=1.14.0 CLIENT=yes # only one "yes" is enough + - KUBERNETES_VERSION=1.14.0 CLIENT=no + - KUBERNETES_VERSION=1.13.0 CLIENT=no + - KUBERNETES_VERSION=1.12.0 CLIENT=no # - KUBERNETES_VERSION=1.11.10 # Minikube fails on CRI preflight checks # - KUBERNETES_VERSION=1.10.13 # CRDs require spec.version, which fails on 1.14 @@ -33,6 +34,7 @@ matrix: before_script: - tools/minikube-for-travis.sh + - tools/kubernetes-client.sh script: - pytest -v diff --git a/examples/09-testing/test_example_09.py b/examples/09-testing/test_example_09.py index 1f0a2cee..57b28811 100644 --- a/examples/09-testing/test_example_09.py +++ b/examples/09-testing/test_example_09.py @@ -13,18 +13,25 @@ @pytest.fixture(autouse=True) def crd_exists(): - subprocess.run(f"kubectl apply -f {crd_yaml}", shell=True, check=True) + subprocess.run(f"kubectl apply -f {crd_yaml}", + check=True, timeout=10, capture_output=True, shell=True) @pytest.fixture(autouse=True) def obj_absent(): - subprocess.run(f"kubectl delete -f {obj_yaml}", shell=True, check=False) + # Operator is not running in fixtures, so we need a force-delete (or this patch). + subprocess.run(['kubectl', 'patch', '-f', obj_yaml, + '-p', '{"metadata":{"finalizers":[]}}', + '--type', 'merge'], + check=False, timeout=10, capture_output=True) + subprocess.run(f"kubectl delete -f {obj_yaml}", + check=False, timeout=10, capture_output=True, shell=True) def test_resource_lifecycle(mocker): # To prevent lengthy threads in the loop executor when the process exits. - mocker.patch('kopf.clients.watching.DEFAULT_STREAM_TIMEOUT', 10) + mocker.patch('kopf.config.WatchersConfig.default_stream_timeout', 10) # Run an operator and simulate some activity with the operated resource. with kopf.testing.KopfRunner(['run', '--verbose', '--standalone', example_py]) as runner: diff --git a/examples/10-builtins/README.md b/examples/10-builtins/README.md new file mode 100644 index 00000000..6f9c29a1 --- /dev/null +++ b/examples/10-builtins/README.md @@ -0,0 +1,39 @@ +# Kopf example for built-in resources + +Kopf can also handle the built-in resources, such as Pods, Jobs, etc. + +In this example, we take control all over the pods (namespaced/cluster-wide), +and allow the pods to exist for no longer than 30 seconds -- +either after creation or after the operator restart. + +For no specific reason, just for fun. Maybe, as a way of Chaos Engineering +to force making the resilient applications (tolerant to pod killing). + +However, the system namespaces (kube-system, etc) are explicitly protected -- +to prevent killing the cluster itself. + +Start the operator: + +```bash +kopf run example.py --verbose +``` + +Start a sample pod: + +```bash +kubectl run -it --image=ubuntu expr1 -- bash -i +# wait for 30s +``` + +Since `kubectl run` creates a Deployment, not just a Pod, +a new pod will be created every 30 seconds. Observe with: + +```bash +kubectl get pods --watch +``` + +Cleanup in the end: + +```bash +$ kubectl delete deployment expr1 +``` diff --git a/examples/10-builtins/example.py b/examples/10-builtins/example.py new file mode 100644 index 00000000..01d99b7a --- /dev/null +++ b/examples/10-builtins/example.py @@ -0,0 +1,47 @@ +import asyncio + +import kopf +import pykube + +tasks = {} # dict{namespace: dict{name: asyncio.Task}} + +try: + cfg = pykube.KubeConfig.from_service_account() +except FileNotFoundError: + cfg = pykube.KubeConfig.from_file() +api = pykube.HTTPClient(cfg) + + +@kopf.on.resume('', 'v1', 'pods') +@kopf.on.create('', 'v1', 'pods') +async def pod_in_sight(namespace, name, logger, **kwargs): + if namespace.startswith('kube-'): + return + else: + task = asyncio.create_task(pod_killer(namespace, name, logger)) + tasks.setdefault(namespace, {}) + tasks[namespace][name] = task + + +@kopf.on.delete('', 'v1', 'pods') +async def pod_deleted(namespace, name, **kwargs): + if namespace in tasks and name in tasks[namespace]: + task = tasks[namespace][name] + task.cancel() # it will also remove from `tasks` + + +async def pod_killer(namespace, name, logger, timeout=30): + try: + logger.info(f"=== Pod killing happens in {timeout}s.") + await asyncio.sleep(timeout) + logger.info(f"=== Pod killing happens NOW!") + + pod = pykube.Pod.objects(api, namespace=namespace).get_by_name(name) + pod.delete() + + except asyncio.CancelledError: + logger.info(f"=== Pod killing is cancelled!") + + finally: + if namespace in tasks and name in tasks[namespace]: + del tasks[namespace][name] diff --git a/examples/10-builtins/test_example_10.py b/examples/10-builtins/test_example_10.py new file mode 100644 index 00000000..564355d1 --- /dev/null +++ b/examples/10-builtins/test_example_10.py @@ -0,0 +1,46 @@ +import os.path +import time + +import pykube + +import kopf.testing + + +def test_pods_reacted(): + + example_py = os.path.join(os.path.dirname(__file__), 'example.py') + with kopf.testing.KopfRunner(['run', '--verbose', example_py]) as runner: + _create_pod() + time.sleep(5) # give it some time to react + _delete_pod() + time.sleep(1) # give it some time to react + + assert runner.exception is None + assert runner.exit_code == 0 + + assert '[default/kopf-pod-1] Creation event:' in runner.stdout + assert '[default/kopf-pod-1] === Pod killing happens in 30s.' in runner.stdout + assert '[default/kopf-pod-1] Deletion event:' in runner.stdout + assert '[default/kopf-pod-1] === Pod killing is cancelled!' in runner.stdout + + +def _create_pod(): + api = pykube.HTTPClient(pykube.KubeConfig.from_file()) + pod = pykube.Pod(api, { + 'apiVersion': 'v1', + 'kind': 'Pod', + 'metadata': {'name': 'kopf-pod-1', 'namespace': 'default'}, + 'spec': { + 'containers': [{ + 'name': 'the-only-one', + 'image': 'busybox', + 'command': ["sh", "-x", "-c", "sleep 1"], + }]}, + }) + pod.create() + + +def _delete_pod(): + api = pykube.HTTPClient(pykube.KubeConfig.from_file()) + pod = pykube.Pod.objects(api, namespace='default').get_by_name('kopf-pod-1') + pod.delete() diff --git a/examples/requirements.txt b/examples/requirements.txt index e7d4c85d..140c5635 100644 --- a/examples/requirements.txt +++ b/examples/requirements.txt @@ -1,3 +1,4 @@ kopf kubernetes +pykube-ng pyyaml diff --git a/kopf/cli.py b/kopf/cli.py index 73a367b0..e0fba667 100644 --- a/kopf/cli.py +++ b/kopf/cli.py @@ -12,9 +12,11 @@ def cli_login(): try: - auth.login() + auth.login(verify=True) except auth.LoginError as e: raise click.ClickException(str(e)) + except auth.AccessError as e: + raise click.ClickException(str(e)) def logging_options(fn): diff --git a/kopf/clients/auth.py b/kopf/clients/auth.py index 746c2068..aab51483 100644 --- a/kopf/clients/auth.py +++ b/kopf/clients/auth.py @@ -1,42 +1,138 @@ import logging +from typing import Optional -import kubernetes +import pykube +import requests import urllib3.exceptions logger = logging.getLogger(__name__) +# Set in login(), consumed in get_pykube_cfg() and all API calls. +_pykube_cfg: Optional[pykube.KubeConfig] = None + class LoginError(Exception): """ Raised when the operator cannot login to the API. """ -def login(): +class AccessError(Exception): + """ Raised when the operator cannot access the cluster API. """ + + +def login(verify=False): """ - Login the the Kubernetes cluster, locally or remotely. + Login to Kubernetes cluster, locally or remotely. + + Keep the logged in state or config object in the global variables, + so that it can be available for future calls via the same function call. + + Automatic refresh/reload of the tokens or objects also should be done here. """ - # Configure the default client credentials for all possible environments. + # Pykube login is mandatory. If it fails, the framework will not run at all. + try: + import pykube + except ImportError: + raise # mandatory + else: + login_pykube(verify=verify) + + # We keep the official client library auto-login only because it was + # an implied behavior before switching to pykube -- to keep it so (implied). + try: + import kubernetes + except ImportError: + pass # optional + else: + login_client(verify=verify) + + +def login_pykube(verify=False): + global _pykube_cfg + try: + _pykube_cfg = pykube.KubeConfig.from_service_account() + logger.debug("Pykube is configured in cluster with service account.") + except FileNotFoundError: + try: + _pykube_cfg = pykube.KubeConfig.from_file() + logger.debug("Pykube is configured via kubeconfig file.") + except (pykube.PyKubeError, FileNotFoundError): + raise LoginError(f"Cannot authenticate pykube neither in-cluster, nor via kubeconfig.") + + if verify: + verify_pykube() + + +def login_client(verify=False): + import kubernetes.client try: kubernetes.config.load_incluster_config() # cluster env vars - logger.debug("configured in cluster with service account") + logger.debug("Client is configured in cluster with service account.") except kubernetes.config.ConfigException as e1: try: kubernetes.config.load_kube_config() # developer's config files - logger.debug("configured via kubeconfig file") + logger.debug("Client is configured via kubeconfig file.") except kubernetes.config.ConfigException as e2: - raise LoginError(f"Cannot authenticate neither in-cluster, nor via kubeconfig.") + raise LoginError(f"Cannot authenticate client neither in-cluster, nor via kubeconfig.") - # Make a sample API call to ensure the login is successful, - # and convert some of the known exceptions to the CLI hints. + if verify: + verify_client() + + +def verify_pykube(): + """ + Verify if login has succeeded, and the access configuration is still valid. + + All other errors (e.g. 403, 404) are ignored: it means, the host and port + are configured and are reachable, the authentication token is accepted, + and the rest are authorization or configuration errors (not a showstopper). + """ + try: + api = get_pykube_api() + rsp = api.get(version="", base="/") + rsp.raise_for_status() + api.raise_for_status(rsp) # replaces requests's HTTPError with its own. + except requests.exceptions.ConnectionError as e: + raise AccessError("Cannot connect to the Kubernetes API. " + "Please configure the cluster access.") + except pykube.exceptions.HTTPError as e: + if e.code == 401: + raise AccessError("Cannot authenticate to the Kubernetes API. " + "Please login or configure the tokens.") + except requests.exceptions.HTTPError as e: + if e.response.status_code == 401: + raise AccessError("Cannot authenticate to the Kubernetes API. " + "Please login or configure the tokens.") + + +def verify_client(): + """ + Verify if login has succeeded, and the access configuration is still valid. + + All other errors (e.g. 403, 404) are ignored: it means, the host and port + are configured and are reachable, the authentication token is accepted, + and the rest are authorization or configuration errors (not a showstopper). + """ + import kubernetes.client.rest try: api = kubernetes.client.CoreApi() api.get_api_versions() except urllib3.exceptions.HTTPError as e: - raise LoginError("Cannot connect to the Kubernetes API. " - "Please configure the cluster access.") + raise AccessError("Cannot connect to the Kubernetes API. " + "Please configure the cluster access.") except kubernetes.client.rest.ApiException as e: if e.status == 401: - raise LoginError("Cannot authenticate to the Kubernetes API. " - "Please login or configure the tokens.") - else: - raise + raise AccessError("Cannot authenticate to the Kubernetes API. " + "Please login or configure the tokens.") + + +def get_pykube_cfg() -> pykube.KubeConfig: + if _pykube_cfg is None: + raise LoginError("Not logged in with PyKube.") + return _pykube_cfg + + +# TODO: add some caching, but keep kwargs in mind. Maybe add a key= for purpose/use-place? +def get_pykube_api(timeout=None) -> pykube.HTTPClient: + kwargs = dict(timeout=timeout) if timeout is not None else dict() + return pykube.HTTPClient(get_pykube_cfg(), **kwargs) diff --git a/kopf/clients/classes.py b/kopf/clients/classes.py new file mode 100644 index 00000000..b4e5c54a --- /dev/null +++ b/kopf/clients/classes.py @@ -0,0 +1,23 @@ +from typing import Type + +import pykube + +from kopf.clients import auth + + +def _make_cls(resource) -> Type[pykube.objects.APIObject]: + api = auth.get_pykube_api() + api_resources = api.resource_list(resource.api_version)['resources'] + resource_kind = next((r['kind'] for r in api_resources if r['name'] == resource.plural), None) + is_namespaced = next((r['namespaced'] for r in api_resources if r['name'] == resource.plural), None) + if not resource_kind: + raise pykube.ObjectDoesNotExist(f"No such CRD: {resource.name}") + + cls_name = resource.plural + cls_base = pykube.objects.NamespacedAPIObject if is_namespaced else pykube.objects.APIObject + cls = type(cls_name, (cls_base,), { + 'version': resource.api_version, + 'endpoint': resource.plural, + 'kind': resource_kind, + }) + return cls diff --git a/kopf/clients/events.py b/kopf/clients/events.py index 5554f67b..b598f8d7 100644 --- a/kopf/clients/events.py +++ b/kopf/clients/events.py @@ -1,11 +1,12 @@ import asyncio import datetime -import functools import logging -import kubernetes.client.rest +import pykube +import requests from kopf import config +from kopf.clients import auth from kopf.structs import hierarchies logger = logging.getLogger(__name__) @@ -41,38 +42,36 @@ async def post_event(*, obj=None, ref=None, type, reason, message=''): suffix = message[-MAX_MESSAGE_LENGTH // 2 + (len(infix) - len(infix) // 2):] message = f'{prefix}{infix}{suffix}' - meta = kubernetes.client.V1ObjectMeta( - namespace=namespace, - generate_name='kopf-event-', - ) - body = kubernetes.client.V1Event( - metadata=meta, + body = { + 'metadata': { + 'namespace': namespace, + 'generateName': 'kopf-event-', + }, - action='Action?', - type=type, - reason=reason, - message=message, + 'action': 'Action?', + 'type': type, + 'reason': reason, + 'message': message, - reporting_component='kopf', - reporting_instance='dev', - source=kubernetes.client.V1EventSource(component='kopf'), # used in the "From" column in `kubectl describe`. + 'reportingComponent': 'kopf', + 'reportingInstance': 'dev', + 'source' : {'component': 'kopf'}, # used in the "From" column in `kubectl describe`. - involved_object=ref, + 'involvedObject': ref, - first_timestamp=now.isoformat() + 'Z', # '2019-01-28T18:25:03.000000Z' -- seen in `kubectl describe ...` - last_timestamp=now.isoformat() + 'Z', # '2019-01-28T18:25:03.000000Z' - seen in `kubectl get events` - event_time=now.isoformat() + 'Z', # '2019-01-28T18:25:03.000000Z' - ) - - api = kubernetes.client.CoreV1Api() - loop = asyncio.get_running_loop() + 'firstTimestamp': now.isoformat() + 'Z', # '2019-01-28T18:25:03.000000Z' -- seen in `kubectl describe ...` + 'lastTimestamp': now.isoformat() + 'Z', # '2019-01-28T18:25:03.000000Z' - seen in `kubectl get events` + 'eventTime': now.isoformat() + 'Z', # '2019-01-28T18:25:03.000000Z' + } try: - await loop.run_in_executor( - config.WorkersConfig.get_syn_executor(), - functools.partial(api.create_namespaced_event, **{'namespace': namespace, 'body': body}) - ) - except kubernetes.client.rest.ApiException as e: + api = auth.get_pykube_api() + obj = pykube.Event(api, body) + + loop = asyncio.get_running_loop() + await loop.run_in_executor(config.WorkersConfig.get_syn_executor(), obj.create) + + except (requests.exceptions.HTTPError, pykube.exceptions.HTTPError) as e: # Events are helpful but auxiliary, they should not fail the handling cycle. # Yet we want to notice that something went wrong (in logs). logger.warning("Failed to post an event. Ignoring and continuing. " diff --git a/kopf/clients/fetching.py b/kopf/clients/fetching.py index 716a7e05..57b6b313 100644 --- a/kopf/clients/fetching.py +++ b/kopf/clients/fetching.py @@ -1,44 +1,42 @@ -import functools +import pykube +import requests -import kubernetes +from kopf.clients import auth +from kopf.clients import classes _UNSET_ = object() def read_crd(*, resource, default=_UNSET_): try: - name = f'{resource.plural}.{resource.group}' - api = kubernetes.client.ApiextensionsV1beta1Api() - rsp = api.read_custom_resource_definition(name=name) - return rsp - except kubernetes.client.rest.ApiException as e: - if e.status in [404, 403] and default is not _UNSET_: + api = auth.get_pykube_api() + cls = pykube.CustomResourceDefinition + obj = cls.objects(api, namespace=None).get_by_name(name=resource.name) + return obj.obj + + except pykube.ObjectDoesNotExist: + if default is not _UNSET_: + return default + raise + except requests.exceptions.HTTPError as e: + if e.response.status_code in [403, 404] and default is not _UNSET_: return default raise def read_obj(*, resource, namespace=None, name=None, default=_UNSET_): try: - if namespace is None: - api = kubernetes.client.CustomObjectsApi() - rsp = api.get_cluster_custom_object( - group=resource.group, - version=resource.version, - plural=resource.plural, - name=name, - ) - else: - api = kubernetes.client.CustomObjectsApi() - rsp = api.get_namespaced_custom_object( - group=resource.group, - version=resource.version, - plural=resource.plural, - namespace=namespace, - name=name, - ) - return rsp - except kubernetes.client.rest.ApiException as e: - if e.status == 404 and default is not _UNSET_: + api = auth.get_pykube_api() + cls = classes._make_cls(resource=resource) + namespace = namespace if issubclass(cls, pykube.objects.NamespacedAPIObject) else None + obj = cls.objects(api, namespace=namespace).get_by_name(name=name) + return obj.obj + except pykube.ObjectDoesNotExist: + if default is not _UNSET_: + return default + raise + except requests.exceptions.HTTPError as e: + if e.response.status_code in [403, 404] and default is not _UNSET_: return default raise @@ -56,51 +54,29 @@ def list_objs(*, resource, namespace=None): * The resource is namespace-scoped AND operator is namespaced-restricted. """ - api = kubernetes.client.CustomObjectsApi() - if namespace is None: - rsp = api.list_cluster_custom_object( - group=resource.group, - version=resource.version, - plural=resource.plural, - ) - else: - rsp = api.list_namespaced_custom_object( - group=resource.group, - version=resource.version, - plural=resource.plural, - namespace=namespace, - ) - return rsp - - -def make_list_fn(*, resource, namespace=None): + api = auth.get_pykube_api() + cls = classes._make_cls(resource=resource) + namespace = namespace if issubclass(cls, pykube.objects.NamespacedAPIObject) else None + lst = cls.objects(api, namespace=pykube.all if namespace is None else namespace) + return lst.response + + +def watch_objs(*, resource, namespace=None, timeout=None, since=None): """ - Return a function to be called to receive the list of objects. - Needed in that form for the API streaming calls (see watching.py). + Watch the objects of specific resource type. - However, the returned function is already bound to the specified - resource type, and requires no resource-identifying parameters. + The cluster-scoped call is used in two cases: + + * The resource itself is cluster-scoped, and namespacing makes not sense. + * The operator serves all namespaces for the namespaced custom resource. - Docstrings are important! Kubernetes client uses them to guess - the returned object types and the parameters type. - Function wrapping does that: preserves the docstrings. + Otherwise, the namespace-scoped call is used: + + * The resource is namespace-scoped AND operator is namespaced-restricted. """ - api = kubernetes.client.CustomObjectsApi() - if namespace is None: - @functools.wraps(api.list_cluster_custom_object) - def list_fn(**kwargs): - return api.list_cluster_custom_object( - group=resource.group, - version=resource.version, - plural=resource.plural, - **kwargs) - else: - @functools.wraps(api.list_cluster_custom_object) - def list_fn(**kwargs): - return api.list_namespaced_custom_object( - group=resource.group, - version=resource.version, - plural=resource.plural, - namespace=namespace, - **kwargs) - return list_fn + api = auth.get_pykube_api(timeout=timeout) + cls = classes._make_cls(resource=resource) + namespace = namespace if issubclass(cls, pykube.objects.NamespacedAPIObject) else None + lst = cls.objects(api, namespace=pykube.all if namespace is None else namespace) + src = lst.watch(since=since) + return iter({'type': event.type, 'object': event.object.obj} for event in src) diff --git a/kopf/clients/patching.py b/kopf/clients/patching.py index 2432af3a..0fb944d3 100644 --- a/kopf/clients/patching.py +++ b/kopf/clients/patching.py @@ -1,9 +1,8 @@ import asyncio -import functools - -import kubernetes from kopf import config +from kopf.clients import auth +from kopf.clients import classes async def patch_obj(*, resource, patch, namespace=None, name=None, body=None): @@ -23,19 +22,14 @@ async def patch_obj(*, resource, patch, namespace=None, name=None, body=None): namespace = body.get('metadata', {}).get('namespace') if body is not None else namespace name = body.get('metadata', {}).get('name') if body is not None else name + if body is None: + nskw = {} if namespace is None else dict(namespace=namespace) + body = {'metadata': {'name': name}} + body['metadata'].update(nskw) - api = kubernetes.client.CustomObjectsApi() - request_kwargs = { - 'group': resource.group, - 'version': resource.version, - 'plural': resource.plural, - 'name': name, - 'body': patch - } - patch_func = api.patch_cluster_custom_object - if namespace is not None: - request_kwargs['namespace'] = namespace - patch_func = api.patch_namespaced_custom_object - loop = asyncio.get_running_loop() + api = auth.get_pykube_api() + cls = classes._make_cls(resource=resource) + obj = cls(api, body) - await loop.run_in_executor(config.WorkersConfig.get_syn_executor(), functools.partial(patch_func, **request_kwargs)) + loop = asyncio.get_running_loop() + await loop.run_in_executor(config.WorkersConfig.get_syn_executor(), obj.patch, patch) diff --git a/kopf/clients/watching.py b/kopf/clients/watching.py index 36454379..9e61cb50 100644 --- a/kopf/clients/watching.py +++ b/kopf/clients/watching.py @@ -22,16 +22,12 @@ import logging from typing import Union -import kubernetes - +from kopf import config from kopf.clients import fetching from kopf.reactor import registries logger = logging.getLogger(__name__) -DEFAULT_STREAM_TIMEOUT = None -""" The maximum duration of one streaming request. Patched in some tests. """ - class WatchingError(Exception): """ @@ -81,16 +77,19 @@ async def streaming_watch( rsp = fetching.list_objs(resource=resource, namespace=namespace) resource_version = rsp['metadata']['resourceVersion'] for item in rsp['items']: + # FIXME: fix in pykube to inject the missing item's fields from the list's metainfo. + item.setdefault('kind', rsp['kind'][:-4] if rsp['kind'][-4:] == 'List' else rsp['kind']) + item.setdefault('apiVersion', rsp['apiVersion']) yield {'type': None, 'object': item} # Then, watch the resources starting from the list's resource version. kwargs = {} kwargs.update(dict(resource_version=resource_version) if resource_version else {}) - kwargs.update(dict(timeout_seconds=DEFAULT_STREAM_TIMEOUT) if DEFAULT_STREAM_TIMEOUT else {}) + kwargs.update(dict(timeout_seconds=config.WatchersConfig.default_stream_timeout) if config.WatchersConfig.default_stream_timeout else {}) loop = asyncio.get_event_loop() - fn = fetching.make_list_fn(resource=resource, namespace=namespace) - watch = kubernetes.watch.Watch() - stream = watch.stream(fn, **kwargs) + stream = fetching.watch_objs(resource=resource, namespace=namespace, + timeout=config.WatchersConfig.default_stream_timeout, + since=resource_version) async for event in streaming_aiter(stream, loop=loop): # "410 Gone" is for the "resource version too old" error, we must restart watching. @@ -131,3 +130,4 @@ async def infinite_watch( while True: async for event in streaming_watch(resource=resource, namespace=namespace): yield event + await asyncio.sleep(config.WatchersConfig.watcher_retry_delay) diff --git a/kopf/config.py b/kopf/config.py index 41c48c5e..3480e035 100644 --- a/kopf/config.py +++ b/kopf/config.py @@ -3,9 +3,6 @@ import logging from typing import Optional -import kubernetes -import kubernetes.client.rest - from kopf.engines import logging as logging_engine format = '[%(asctime)s] %(name)-20.20s [%(levelname)-8.8s] %(message)s' @@ -32,11 +29,16 @@ def configure(debug=None, verbose=None, quiet=None): logger.setLevel(log_level) # Configure the Kubernetes client defaults according to our settings. - config = kubernetes.client.configuration.Configuration() - config.logger_format = format - config.logger_file = None # once again after the constructor to re-apply the formatter - config.debug = debug - kubernetes.client.configuration.Configuration.set_default(config) + try: + import kubernetes + except ImportError: + pass + else: + config = kubernetes.client.configuration.Configuration() + config.logger_format = format + config.logger_file = None # once again after the constructor to re-apply the formatter + config.debug = debug + kubernetes.client.configuration.Configuration.set_default(config) # Kubernetes client is as buggy as hell: it adds its own stream handlers even in non-debug mode, # does not respect the formatting, and dumps too much of the low-level info. @@ -103,3 +105,15 @@ def set_synchronous_tasks_threadpool_limit(new_limit: int): WorkersConfig.synchronous_tasks_threadpool_limit = new_limit if WorkersConfig.threadpool_executor: WorkersConfig.threadpool_executor._max_workers = new_limit + + +class WatchersConfig: + """ + Used to configure the K8s API watchers and streams. + """ + + default_stream_timeout = None + """ The maximum duration of one streaming request. Patched in some tests. """ + + watcher_retry_delay = 0.1 + """ How long should a pause be between watch requests (to prevent flooding). """ diff --git a/kopf/engines/peering.py b/kopf/engines/peering.py index 330db897..2db25c1a 100644 --- a/kopf/engines/peering.py +++ b/kopf/engines/peering.py @@ -164,7 +164,7 @@ def _is_peering_legacy(name: str, namespace: Optional[str]): if crd is None: return False - if str(crd.spec.scope).lower() != 'cluster': + if str(crd.get('spec', {}).get('scope', '')).lower() != 'cluster': return False # no legacy mode detected obj = fetching.read_obj(resource=LEGACY_PEERING_RESOURCE, name=name, default=None) diff --git a/kopf/reactor/registries.py b/kopf/reactor/registries.py index 8cf5a3bf..22dff296 100644 --- a/kopf/reactor/registries.py +++ b/kopf/reactor/registries.py @@ -23,6 +23,16 @@ class Resource(NamedTuple): version: Text plural: Text + @property + def name(self): + return f'{self.plural}.{self.group}' + + @property + def api_version(self): + # Strip heading/trailing slashes if group is absent (e.g. for pods). + return f'{self.group}/{self.version}'.strip('/') + + # A registered handler (function + event meta info). class Handler(NamedTuple): fn: Callable diff --git a/setup.py b/setup.py index d5e847e5..8aac822c 100644 --- a/setup.py +++ b/setup.py @@ -43,6 +43,6 @@ 'click', 'iso8601', 'aiojobs', - 'kubernetes<10.0.0', # see: https://github.com/kubernetes-client/python/issues/866 + 'pykube-ng>=0.25', ], ) diff --git a/tests/cli/conftest.py b/tests/cli/conftest.py index 557c8976..57516ea6 100644 --- a/tests/cli/conftest.py +++ b/tests/cli/conftest.py @@ -2,7 +2,6 @@ import sys import click.testing -import kubernetes import pytest import kopf @@ -65,8 +64,12 @@ def clean_modules_cache(): @pytest.fixture(autouse=True) def clean_kubernetes_client(): - kubernetes.client.configuration.Configuration.set_default(None) - + try: + import kubernetes + except ImportError: + pass # absent client is already "clean" (or not "dirty" at least). + else: + kubernetes.client.configuration.Configuration.set_default(None) @pytest.fixture() @@ -82,7 +85,10 @@ def invoke(runner): @pytest.fixture() def login(mocker): - return mocker.patch('kopf.clients.auth.login') + mocker.patch('kopf.clients.auth.login_pykube') + mocker.patch('kopf.clients.auth.login_client') + mocker.patch('kopf.clients.auth.verify_pykube') + mocker.patch('kopf.clients.auth.verify_client') @pytest.fixture() diff --git a/tests/cli/test_help.py b/tests/cli/test_help.py index e35ce473..e0261159 100644 --- a/tests/cli/test_help.py +++ b/tests/cli/test_help.py @@ -1,12 +1,18 @@ def test_help_in_root(invoke, mocker): - login = mocker.patch('kopf.clients.auth.login') + login_pykube = mocker.patch('kopf.clients.auth.login_pykube') + login_client = mocker.patch('kopf.clients.auth.login_client') + verify_pykube = mocker.patch('kopf.clients.auth.verify_pykube') + verify_client = mocker.patch('kopf.clients.auth.verify_client') result = invoke(['--help']) assert result.exit_code == 0 - assert not login.called + assert not login_pykube.called + assert not login_client.called + assert not verify_pykube.called + assert not verify_client.called assert 'Usage: kopf [OPTIONS]' in result.output assert ' run ' in result.output @@ -15,14 +21,20 @@ def test_help_in_root(invoke, mocker): def test_help_in_subcommand(invoke, mocker): - login = mocker.patch('kopf.clients.auth.login') + login_pykube = mocker.patch('kopf.clients.auth.login_pykube') + login_client = mocker.patch('kopf.clients.auth.login_client') + verify_pykube = mocker.patch('kopf.clients.auth.verify_pykube') + verify_client = mocker.patch('kopf.clients.auth.verify_client') preload = mocker.patch('kopf.utilities.loaders.preload') real_run = mocker.patch('kopf.reactor.queueing.run') result = invoke(['run', '--help']) assert result.exit_code == 0 - assert not login.called + assert not login_pykube.called + assert not login_client.called + assert not verify_pykube.called + assert not verify_client.called assert not preload.called assert not real_run.called diff --git a/tests/cli/test_logging.py b/tests/cli/test_logging.py index 7e970599..d13c5ff9 100644 --- a/tests/cli/test_logging.py +++ b/tests/cli/test_logging.py @@ -1,7 +1,6 @@ import asyncio import logging -import kubernetes import pytest @@ -52,6 +51,8 @@ def test_verbosity(invoke, caplog, options, envvars, expect_debug, expect_info, (['--verbose']), ], ids=['default', 'q', 'quiet', 'v', 'verbose']) def test_no_lowlevel_dumps_in_nondebug(invoke, caplog, options, login, preload, real_run): + kubernetes = pytest.importorskip('kubernetes') + result = invoke(['run'] + options) assert result.exit_code == 0 @@ -70,6 +71,8 @@ def test_no_lowlevel_dumps_in_nondebug(invoke, caplog, options, login, preload, (['--debug']), ], ids=['d', 'debug']) def test_lowlevel_dumps_in_debug_mode(invoke, caplog, options, login, preload, real_run): + kubernetes = pytest.importorskip('kubernetes') + result = invoke(['run'] + options) assert result.exit_code == 0 diff --git a/tests/cli/test_login.py b/tests/cli/test_login.py index 54fe0696..04768c36 100644 --- a/tests/cli/test_login.py +++ b/tests/cli/test_login.py @@ -1,7 +1,15 @@ -import kubernetes +""" +Remember: We do not test the clients, we assume they work when used properly. +We test our own functions here, and check if the clients were called. +""" import pytest +import requests +import urllib3 -from kopf.clients.auth import login, LoginError +from kopf.clients.auth import login, LoginError, AccessError + +RESPONSE_401 = requests.Response() +RESPONSE_401.status_code = 401 @pytest.fixture(autouse=True) @@ -9,121 +17,272 @@ def _auto_clean_kubernetes_client(clean_kubernetes_client): pass -def test_direct_auth_works_incluster(mocker): - # We do not test the client, we assume it works when used properly. - core_api = mocker.patch.object(kubernetes.client, 'CoreApi') - load_kube_config = mocker.patch.object(kubernetes.config, 'load_kube_config') - load_incluster_config = mocker.patch.object(kubernetes.config, 'load_incluster_config') +def test_kubernetes_uninstalled_has_effect(no_kubernetes): + with pytest.raises(ImportError): + import kubernetes + +# +# Tests via the direct function invocation. +# +def test_direct_auth_works_incluster_without_client(login_mocks, no_kubernetes): login() - assert load_incluster_config.called - assert not load_kube_config.called - assert core_api.called # to verify that auth worked + assert login_mocks.pykube_in_cluster.called + assert not login_mocks.pykube_from_file.called + + +def test_direct_auth_works_viaconfig_without_client(login_mocks, no_kubernetes): + login_mocks.pykube_in_cluster.side_effect = FileNotFoundError + + login() + assert login_mocks.pykube_in_cluster.called + assert login_mocks.pykube_from_file.called -def test_direct_auth_works_kubeconfig(mocker): - # We do not test the client, we assume it works when used properly. - core_api = mocker.patch.object(kubernetes.client, 'CoreApi') - load_kube_config = mocker.patch.object(kubernetes.config, 'load_kube_config') - load_incluster_config = mocker.patch.object(kubernetes.config, 'load_incluster_config') - load_incluster_config.side_effect = kubernetes.config.ConfigException +def test_direct_auth_works_incluster_with_client(login_mocks, kubernetes): login() - assert load_incluster_config.called - assert load_kube_config.called - assert core_api.called # to verify that auth worked + assert login_mocks.pykube_in_cluster.called + assert not login_mocks.pykube_from_file.called + + assert login_mocks.client_in_cluster.called + assert not login_mocks.client_from_file.called + +def test_direct_auth_works_viaconfig_with_client(login_mocks, kubernetes): + login_mocks.pykube_in_cluster.side_effect = FileNotFoundError + login_mocks.client_in_cluster.side_effect = kubernetes.config.ConfigException + + login() -def test_direct_auth_fails(mocker): - # We do not test the client, we assume it works when used properly. - core_api = mocker.patch.object(kubernetes.client, 'CoreApi') - load_kube_config = mocker.patch.object(kubernetes.config, 'load_kube_config') - load_incluster_config = mocker.patch.object(kubernetes.config, 'load_incluster_config') - load_incluster_config.side_effect = kubernetes.config.ConfigException - load_kube_config.side_effect = kubernetes.config.ConfigException + assert login_mocks.pykube_in_cluster.called + assert login_mocks.pykube_from_file.called + + assert login_mocks.client_in_cluster.called + assert login_mocks.client_from_file.called + + +def test_direct_auth_fails_on_errors_in_pykube(login_mocks, any_kubernetes): + login_mocks.pykube_in_cluster.side_effect = FileNotFoundError + login_mocks.pykube_from_file.side_effect = FileNotFoundError with pytest.raises(LoginError): login() - assert load_incluster_config.called - assert load_kube_config.called - assert not core_api.called # to verify that auth worked + assert login_mocks.pykube_in_cluster.called + assert login_mocks.pykube_from_file.called -def test_direct_api_fails(mocker): - # We do not test the client, we assume it works when used properly. - core_api = mocker.patch.object(kubernetes.client, 'CoreApi') - load_kube_config = mocker.patch.object(kubernetes.config, 'load_kube_config') - load_incluster_config = mocker.patch.object(kubernetes.config, 'load_incluster_config') - core_api.side_effect = kubernetes.client.rest.ApiException(status=401) +def test_direct_auth_fails_on_errors_in_client(login_mocks, kubernetes): + login_mocks.client_in_cluster.side_effect = kubernetes.config.ConfigException + login_mocks.client_from_file.side_effect = kubernetes.config.ConfigException with pytest.raises(LoginError): login() - assert load_incluster_config.called - assert not load_kube_config.called - assert core_api.called # to verify that auth worked + assert login_mocks.pykube_in_cluster.called + assert not login_mocks.pykube_from_file.called + + assert login_mocks.client_in_cluster.called + assert login_mocks.client_from_file.called + + +def test_direct_check_fails_on_tcp_error_in_pykube(login_mocks, any_kubernetes): + login_mocks.pykube_checker.side_effect = requests.exceptions.ConnectionError() + + with pytest.raises(AccessError): + login(verify=True) + + assert login_mocks.pykube_in_cluster.called + assert not login_mocks.pykube_from_file.called + assert login_mocks.pykube_checker.called + + +def test_direct_check_fails_on_401_error_in_pykube(login_mocks, any_kubernetes): + login_mocks.pykube_checker.side_effect = requests.exceptions.HTTPError(response=RESPONSE_401) + + with pytest.raises(AccessError): + login(verify=True) + + assert login_mocks.pykube_in_cluster.called + assert not login_mocks.pykube_from_file.called + assert login_mocks.pykube_checker.called + + +def test_direct_check_fails_on_tcp_error_in_client(login_mocks, kubernetes): + login_mocks.client_checker.side_effect = urllib3.exceptions.HTTPError() + + with pytest.raises(AccessError): + login(verify=True) + + assert login_mocks.pykube_in_cluster.called + assert not login_mocks.pykube_from_file.called + assert login_mocks.pykube_checker.called + + assert login_mocks.client_in_cluster.called + assert not login_mocks.client_from_file.called + assert login_mocks.client_checker.called + + +def test_direct_check_fails_on_401_error_in_client(login_mocks, kubernetes): + login_mocks.client_checker.side_effect = kubernetes.client.rest.ApiException(status=401) + + with pytest.raises(AccessError): + login(verify=True) + + assert login_mocks.pykube_in_cluster.called + assert not login_mocks.pykube_from_file.called + assert login_mocks.pykube_checker.called + + assert login_mocks.client_in_cluster.called + assert not login_mocks.client_from_file.called + assert login_mocks.client_checker.called + +# +# The same tests, but via the CLI command run. +# + +def test_clirun_auth_works_incluster_without_client(login_mocks, no_kubernetes, + invoke, preload, real_run): + result = invoke(['run']) + assert result.exit_code == 0 + + assert login_mocks.pykube_in_cluster.called + assert not login_mocks.pykube_from_file.called -def test_clirun_auth_works_incluster(invoke, mocker, preload, real_run): - # We do not test the client, we assume it works when used properly. - core_api = mocker.patch.object(kubernetes.client, 'CoreApi') - load_kube_config = mocker.patch.object(kubernetes.config, 'load_kube_config') - load_incluster_config = mocker.patch.object(kubernetes.config, 'load_incluster_config') +def test_clirun_auth_works_viaconfig_without_client(login_mocks, no_kubernetes, + invoke, preload, real_run): + login_mocks.pykube_in_cluster.side_effect = FileNotFoundError result = invoke(['run']) assert result.exit_code == 0 - assert load_incluster_config.called - assert not load_kube_config.called - assert core_api.called # to verify that auth worked + assert login_mocks.pykube_in_cluster.called + assert login_mocks.pykube_from_file.called -def test_clirun_auth_works_kubeconfig(invoke, mocker, preload, real_run): - # We do not test the client, we assume it works when used properly. - core_api = mocker.patch.object(kubernetes.client, 'CoreApi') - load_kube_config = mocker.patch.object(kubernetes.config, 'load_kube_config') - load_incluster_config = mocker.patch.object(kubernetes.config, 'load_incluster_config') - load_incluster_config.side_effect = kubernetes.config.ConfigException +def test_clirun_auth_works_incluster_with_client(login_mocks, kubernetes, + invoke, preload, real_run): result = invoke(['run']) assert result.exit_code == 0 - assert load_incluster_config.called - assert load_kube_config.called - assert core_api.called # to verify that auth worked + assert login_mocks.pykube_in_cluster.called + assert not login_mocks.pykube_from_file.called + assert login_mocks.pykube_checker.called + assert login_mocks.client_in_cluster.called + assert not login_mocks.client_from_file.called + assert login_mocks.client_checker.called -def test_clirun_auth_fails(invoke, mocker, preload, real_run): - # We do not test the client, we assume it works when used properly. - core_api = mocker.patch.object(kubernetes.client, 'CoreApi') - load_kube_config = mocker.patch.object(kubernetes.config, 'load_kube_config') - load_incluster_config = mocker.patch.object(kubernetes.config, 'load_incluster_config') - load_incluster_config.side_effect = kubernetes.config.ConfigException - load_kube_config.side_effect = kubernetes.config.ConfigException + +def test_clirun_auth_works_viaconfig_with_client(login_mocks, kubernetes, + invoke, preload, real_run): + login_mocks.pykube_in_cluster.side_effect = FileNotFoundError + login_mocks.client_in_cluster.side_effect = kubernetes.config.ConfigException + + result = invoke(['run']) + assert result.exit_code == 0 + + assert login_mocks.pykube_in_cluster.called + assert login_mocks.pykube_from_file.called + assert login_mocks.pykube_checker.called + + assert login_mocks.client_in_cluster.called + assert login_mocks.client_from_file.called + assert login_mocks.client_checker.called + + +def test_clirun_auth_fails_on_config_error_in_pykube(login_mocks, any_kubernetes, + invoke, preload, real_run): + login_mocks.pykube_in_cluster.side_effect = FileNotFoundError + login_mocks.pykube_from_file.side_effect = FileNotFoundError result = invoke(['run']) assert result.exit_code != 0 assert 'neither in-cluster, nor via kubeconfig' in result.stdout - assert load_incluster_config.called - assert load_kube_config.called - assert not core_api.called # to verify that auth worked + assert login_mocks.pykube_in_cluster.called + assert login_mocks.pykube_from_file.called + assert not login_mocks.pykube_checker.called -def test_clirun_api_fails(invoke, mocker, preload, real_run): - # We do not test the client, we assume it works when used properly. - core_api = mocker.patch.object(kubernetes.client, 'CoreApi') - load_kube_config = mocker.patch.object(kubernetes.config, 'load_kube_config') - load_incluster_config = mocker.patch.object(kubernetes.config, 'load_incluster_config') - core_api.side_effect = kubernetes.client.rest.ApiException(status=401) +def test_clirun_auth_fails_on_config_error_in_client(login_mocks, kubernetes, + invoke, preload, real_run): + login_mocks.client_in_cluster.side_effect = kubernetes.config.ConfigException + login_mocks.client_from_file.side_effect = kubernetes.config.ConfigException + + result = invoke(['run']) + assert result.exit_code != 0 + assert 'neither in-cluster, nor via kubeconfig' in result.stdout + + assert login_mocks.pykube_in_cluster.called + assert not login_mocks.pykube_from_file.called + assert login_mocks.pykube_checker.called + + assert login_mocks.client_in_cluster.called + assert login_mocks.client_from_file.called + assert not login_mocks.client_checker.called + + +def test_clirun_check_fails_on_tcp_error_in_pykube(login_mocks, any_kubernetes, + invoke, preload, real_run): + login_mocks.pykube_checker.side_effect = requests.exceptions.ConnectionError() + + result = invoke(['run']) + assert result.exit_code != 0 + assert 'Please configure the cluster access' in result.stdout + + assert login_mocks.pykube_in_cluster.called + assert not login_mocks.pykube_from_file.called + assert login_mocks.pykube_checker.called + + +def test_clirun_check_fails_on_401_error_in_pykube(login_mocks, any_kubernetes, + invoke, preload, real_run): + login_mocks.pykube_checker.side_effect = requests.exceptions.HTTPError(response=RESPONSE_401) + + result = invoke(['run']) + assert result.exit_code != 0 + assert 'Please login or configure the tokens' in result.stdout + + assert login_mocks.pykube_in_cluster.called + assert not login_mocks.pykube_from_file.called + assert login_mocks.pykube_checker.called + + +def test_clirun_check_fails_on_tcp_error_in_client(login_mocks, kubernetes, + invoke, preload, real_run): + login_mocks.client_checker.side_effect = urllib3.exceptions.HTTPError() + + result = invoke(['run']) + assert result.exit_code != 0 + assert 'Please configure the cluster access' in result.stdout + + assert login_mocks.pykube_in_cluster.called + assert not login_mocks.pykube_from_file.called + assert login_mocks.pykube_checker.called + + assert login_mocks.client_in_cluster.called + assert not login_mocks.client_from_file.called + assert login_mocks.client_checker.called + + +def test_clirun_check_fails_on_401_error_in_client(login_mocks, kubernetes, + invoke, preload, real_run): + login_mocks.client_checker.side_effect = kubernetes.client.rest.ApiException(status=401) result = invoke(['run']) assert result.exit_code != 0 assert 'Please login or configure the tokens' in result.stdout - assert load_incluster_config.called - assert not load_kube_config.called - assert core_api.called # to verify that auth worked + assert login_mocks.pykube_in_cluster.called + assert not login_mocks.pykube_from_file.called + assert login_mocks.pykube_checker.called + + assert login_mocks.client_in_cluster.called + assert not login_mocks.client_from_file.called + assert login_mocks.client_checker.called diff --git a/tests/conftest.py b/tests/conftest.py index cb48a5dd..b381351d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,11 +1,16 @@ import asyncio +import dataclasses import io +import json import logging import os import re +import sys import time +from unittest.mock import Mock import asynctest +import pykube import pytest import pytest_mock @@ -16,6 +21,7 @@ def pytest_configure(config): config.addinivalue_line('markers', "e2e: end-to-end tests with real operators.") + config.addinivalue_line('markers', "resource_clustered: (internal parameterizatiom mark).") # This logic is not applied if pytest is started explicitly on ./examples/. @@ -60,6 +66,162 @@ def resource(): """ The resource used in the tests. Usually mocked, so it does not matter. """ return Resource('zalando.org', 'v1', 'kopfexamples') +# +# Mocks for Kubernetes API clients (any of them). Reasons: +# 1. We do not test the clients, we test the layers on top of them, +# so everything low-level should be mocked and assumed to be functional. +# 2. No external calls must be made under any circumstances. +# The unit-tests must be fully isolated from the environment. +# + +@pytest.fixture() +def req_mock(mocker, resource, request): + + # Pykube config is needed to create a pykube's API instance. + # But we do not want and do not need to actually authenticate, so we mock. + # Some fields are used by pykube's objects: we have to know them ("leaky abstractions"). + cfg_mock = mocker.patch('kopf.clients.auth.get_pykube_cfg').return_value + cfg_mock.cluster = {'server': 'localhost'} + cfg_mock.namespace = 'default' + + # Simulated list of cluster-defined CRDs: all of them at once. See: `resource` fixture(s). + # Simulate the resource as cluster-scoped is there is a marker on the test. + namespaced = not any(marker.name == 'resource_clustered' for marker in request.node.own_markers) + res_mock = mocker.patch('pykube.http.HTTPClient.resource_list') + res_mock.return_value = {'resources': [ + {'name': 'kopfexamples', 'kind': 'KopfExample', 'namespaced': namespaced}, + ]} + + # Prevent ANY outer requests, no matter what. These ones are usually asserted. + req_mock = mocker.patch('requests.Session').return_value + return req_mock + + +@pytest.fixture() +def stream(req_mock): + """ A mock for the stream of events as if returned by K8s client. """ + def feed(*args): + side_effect = [] + for arg in args: + if isinstance(arg, (list, tuple)): + arg = iter(json.dumps(event).encode('utf-8') for event in arg) + side_effect.append(arg) + req_mock.get.return_value.iter_lines.side_effect = side_effect + return Mock(spec_set=['feed'], feed=feed) + +# +# Mocks for login & checks. Used in specifialised login tests, +# and in all CLI tests (since login is implicit with CLI commands). +# + +@dataclasses.dataclass(frozen=True, eq=False, order=False) +class LoginMocks: + pykube_in_cluster: Mock = None + pykube_from_file: Mock = None + pykube_checker: Mock = None + client_in_cluster: Mock = None + client_from_file: Mock = None + client_checker: Mock = None + + +@pytest.fixture() +def login_mocks(mocker): + + # Pykube config is needed to create a pykube's API instance. + # But we do not want and do not need to actually authenticate, so we mock. + # Some fields are used by pykube's objects: we have to know them ("leaky abstractions"). + cfg_mock = mocker.patch('kopf.clients.auth.get_pykube_cfg').return_value + cfg_mock.cluster = {'server': 'localhost'} + cfg_mock.namespace = 'default' + + # Make all client libraries potentially optional, but do not skip the tests: + # skipping the tests is the tests' decision, not this mocking fixture's one. + kwargs = {} + try: + import pykube + except ImportError: + pass + else: + kwargs.update( + pykube_in_cluster=mocker.patch.object(pykube.KubeConfig, 'from_service_account'), + pykube_from_file=mocker.patch.object(pykube.KubeConfig, 'from_file'), + pykube_checker=mocker.patch.object(pykube.http.HTTPClient, 'get'), + ) + try: + import kubernetes + except ImportError: + pass + else: + kwargs.update( + client_in_cluster=mocker.patch.object(kubernetes.config, 'load_incluster_config'), + client_from_file=mocker.patch.object(kubernetes.config, 'load_kube_config'), + client_checker=mocker.patch.object(kubernetes.client, 'CoreApi'), + ) + return LoginMocks(**kwargs) + +# +# Simulating that Kubernetes client library is not installed. +# + +class ProhibitedImportFinder: + def find_spec(self, fullname, path, target=None): + if fullname == 'kubernetes' or fullname.startswith('kubernetes.'): + raise ImportError("Import is prohibited for tests.") + + +@pytest.fixture() +def _kubernetes(): + # If kubernetes client is required, it should either be installed, + # or skip the test: we cannot simulate its presence (unlike its absence). + return pytest.importorskip('kubernetes') + + +@pytest.fixture() +def _no_kubernetes(): + try: + import kubernetes as kubernetes_before + except ImportError: + yield + return # nothing to patch & restore. + + # Remove any cached modules. + preserved = {} + for name, mod in list(sys.modules.items()): + if name == 'kubernetes' or name.startswith('kubernetes.'): + preserved[name] = mod + del sys.modules[name] + + # Inject the prohibition for loading this module. And restore when done. + finder = ProhibitedImportFinder() + sys.meta_path.insert(0, finder) + try: + yield + finally: + sys.meta_path.remove(finder) + sys.modules.update(preserved) + + # Verify if it works and that we didn't break the importing machinery. + import kubernetes as kubernetes_after + assert kubernetes_after is kubernetes_before + + +@pytest.fixture(params=[True], ids=['with-client']) # for hinting suffixes +def kubernetes(request): + return request.getfixturevalue('_kubernetes') + + +@pytest.fixture(params=[False], ids=['no-client']) # for hinting suffixes +def no_kubernetes(request): + return request.getfixturevalue('_no_kubernetes') + + +@pytest.fixture(params=[False, True], ids=['no-client', 'with-client']) +def any_kubernetes(request): + if request.param: + return request.getfixturevalue('_kubernetes') + else: + return request.getfixturevalue('_no_kubernetes') + # # Helpers for the timing checks. # diff --git a/tests/e2e/test_examples.py b/tests/e2e/test_examples.py index c6e3f78e..1df6f1cb 100644 --- a/tests/e2e/test_examples.py +++ b/tests/e2e/test_examples.py @@ -3,6 +3,8 @@ import subprocess import time +import pytest + from kopf.testing import KopfRunner @@ -25,11 +27,16 @@ def test_all_examples_are_runnable(mocker, with_crd, with_peering, exampledir): if m.group(2): requires_finalizer = not eval(m.group(3)) + # Skip the e2e test if the framework-optional but test-required library is missing. + m = re.search(r'import kubernetes', example_py.read_text(), re.M) + if m: + pytest.importorskip('kubernetes') + # To prevent lengthy sleeps on the simulated retries. mocker.patch('kopf.reactor.handling.DEFAULT_RETRY_DELAY', 1) # To prevent lengthy threads in the loop executor when the process exits. - mocker.patch('kopf.clients.watching.DEFAULT_STREAM_TIMEOUT', 10) + mocker.patch('kopf.config.WatchersConfig.default_stream_timeout', 10) # Run an operator and simulate some activity with the operated resource. with KopfRunner(['run', '--verbose', str(example_py)]) as runner: diff --git a/tests/handling/conftest.py b/tests/handling/conftest.py index c397a498..e25fd6d6 100644 --- a/tests/handling/conftest.py +++ b/tests/handling/conftest.py @@ -39,7 +39,6 @@ from unittest.mock import Mock import pytest -from kubernetes.client.rest import ApiException # to avoid mocking it import kopf from kopf.reactor.causation import Cause, RESUME @@ -53,13 +52,7 @@ class K8sMocks: @pytest.fixture(autouse=True) -def k8s_mocked(mocker): - """ Prevent any actual K8s calls.""" - - # TODO: consolidate with tests/k8s/conftest.py:client_mock() - client_mock = mocker.patch('kubernetes.client') - client_mock.rest.ApiException = ApiException # to be raises and caught - +def k8s_mocked(mocker, req_mock): # We mock on the level of our own K8s API wrappers, not the K8s client. return K8sMocks( patch_obj=mocker.patch('kopf.clients.patching.patch_obj'), diff --git a/tests/k8s/conftest.py b/tests/k8s/conftest.py index 92515c0e..62f0b576 100644 --- a/tests/k8s/conftest.py +++ b/tests/k8s/conftest.py @@ -1,19 +1,6 @@ import pytest -from kubernetes.client import V1Event as V1Event -from kubernetes.client import V1EventSource as V1EventSource -from kubernetes.client import V1ObjectMeta as V1ObjectMeta -from kubernetes.client import V1beta1Event as V1beta1Event -from kubernetes.client.rest import ApiException # to avoid mocking it -# We do not test the Kubernetes client, so everything there should be mocked. -# Also, no external calls must be made under any circumstances. Hence, auto-use. @pytest.fixture(autouse=True) -def client_mock(mocker): - client_mock = mocker.patch('kubernetes.client') - client_mock.rest.ApiException = ApiException # to be raises and caught - client_mock.V1Event = V1Event - client_mock.V1beta1Event = V1beta1Event - client_mock.V1EventSource = V1EventSource - client_mock.V1ObjectMeta = V1ObjectMeta - return client_mock +def _autouse_req_mock(req_mock): + pass diff --git a/tests/k8s/test_events.py b/tests/k8s/test_events.py index 370f32de..515daea9 100644 --- a/tests/k8s/test_events.py +++ b/tests/k8s/test_events.py @@ -1,15 +1,12 @@ +import json + import pytest -from asynctest import call, ANY +import requests from kopf.clients.events import post_event -async def test_posting(client_mock): - result = object() - apicls_mock = client_mock.CoreV1Api - apicls_mock.return_value.create_namespaced_event.return_value = result - postfn_mock = apicls_mock.return_value.create_namespaced_event - +async def test_posting(req_mock): obj = {'apiVersion': 'group/version', 'kind': 'kind', 'metadata': {'namespace': 'ns', @@ -17,29 +14,22 @@ async def test_posting(client_mock): 'uid': 'uid'}} await post_event(obj=obj, type='type', reason='reason', message='message') - assert postfn_mock.called - assert postfn_mock.call_count == 1 - assert postfn_mock.call_args_list == [call( - namespace='ns', # same as the object's namespace - body=ANY, - )] - - event = postfn_mock.call_args_list[0][1]['body'] - assert event.type == 'type' - assert event.reason == 'reason' - assert event.message == 'message' - assert event.source.component == 'kopf' - assert event.involved_object['apiVersion'] == 'group/version' - assert event.involved_object['kind'] == 'kind' - assert event.involved_object['namespace'] == 'ns' - assert event.involved_object['name'] == 'name' - assert event.involved_object['uid'] == 'uid' - - -async def test_type_is_v1_not_v1beta1(client_mock): - apicls_mock = client_mock.CoreV1Api - postfn_mock = apicls_mock.return_value.create_namespaced_event + assert req_mock.post.called + assert req_mock.post.call_count == 1 + + data = json.loads(req_mock.post.call_args_list[0][1]['data']) + assert data['type'] == 'type' + assert data['reason'] == 'reason' + assert data['message'] == 'message' + assert data['source']['component'] == 'kopf' + assert data['involvedObject']['apiVersion'] == 'group/version' + assert data['involvedObject']['kind'] == 'kind' + assert data['involvedObject']['namespace'] == 'ns' + assert data['involvedObject']['name'] == 'name' + assert data['involvedObject']['uid'] == 'uid' + +async def test_type_is_v1_not_v1beta1(req_mock): obj = {'apiVersion': 'group/version', 'kind': 'kind', 'metadata': {'namespace': 'ns', @@ -47,16 +37,17 @@ async def test_type_is_v1_not_v1beta1(client_mock): 'uid': 'uid'}} await post_event(obj=obj, type='type', reason='reason', message='message') - event = postfn_mock.call_args_list[0][1]['body'] - assert isinstance(event, client_mock.V1Event) - assert not isinstance(event, client_mock.V1beta1Event) + assert req_mock.post.called + + url = req_mock.post.call_args_list[0][1]['url'] + assert 'v1beta1' not in url + assert '/api/v1/namespaces/ns/events' in url -async def test_api_errors_logged_but_suppressed(client_mock, assert_logs): - error = client_mock.rest.ApiException('boo!') - apicls_mock = client_mock.CoreV1Api - apicls_mock.return_value.create_namespaced_event.side_effect = error - postfn_mock = apicls_mock.return_value.create_namespaced_event +async def test_api_errors_logged_but_suppressed(req_mock, assert_logs): + response = requests.Response() + error = requests.exceptions.HTTPError("boo!", response=response) + req_mock.post.side_effect = error obj = {'apiVersion': 'group/version', 'kind': 'kind', @@ -65,16 +56,15 @@ async def test_api_errors_logged_but_suppressed(client_mock, assert_logs): 'uid': 'uid'}} await post_event(obj=obj, type='type', reason='reason', message='message') - assert postfn_mock.called + assert req_mock.post.called assert_logs([ "Failed to post an event.*boo!", ]) -async def test_regular_errors_escalate(client_mock): +async def test_regular_errors_escalate(req_mock): error = Exception('boo!') - apicls_mock = client_mock.CoreV1Api - apicls_mock.return_value.create_namespaced_event.side_effect = error + req_mock.post.side_effect = error obj = {'apiVersion': 'group/version', 'kind': 'kind', @@ -88,12 +78,7 @@ async def test_regular_errors_escalate(client_mock): assert excinfo.value is error -async def test_message_is_cut_to_max_length(client_mock): - result = object() - apicls_mock = client_mock.CoreV1Api - apicls_mock.return_value.create_namespaced_event.return_value = result - postfn_mock = apicls_mock.return_value.create_namespaced_event - +async def test_message_is_cut_to_max_length(req_mock): obj = {'apiVersion': 'group/version', 'kind': 'kind', 'metadata': {'namespace': 'ns', @@ -102,8 +87,8 @@ async def test_message_is_cut_to_max_length(client_mock): message = 'start' + ('x' * 2048) + 'end' await post_event(obj=obj, type='type', reason='reason', message=message) - event = postfn_mock.call_args_list[0][1]['body'] - assert len(event.message) <= 1024 # max supported API message length - assert '...' in event.message - assert event.message.startswith('start') - assert event.message.endswith('end') + data = json.loads(req_mock.post.call_args_list[0][1]['data']) + assert len(data['message']) <= 1024 # max supported API message length + assert '...' in data['message'] + assert data['message'].startswith('start') + assert data['message'].endswith('end') diff --git a/tests/k8s/test_list_objs.py b/tests/k8s/test_list_objs.py index 6b9af8d7..6a5b5c4a 100644 --- a/tests/k8s/test_list_objs.py +++ b/tests/k8s/test_list_objs.py @@ -1,59 +1,46 @@ -import kubernetes.client.rest import pytest -from asynctest import call +import requests from kopf.clients.fetching import list_objs -def test_when_successful_clustered(client_mock, resource): - result = object() - apicls_mock = client_mock.CustomObjectsApi - apicls_mock.return_value.list_cluster_custom_object.return_value = result - apicls_mock.return_value.list_namespaced_custom_object.return_value = result - sidefn_mock = apicls_mock.return_value.list_namespaced_custom_object - mainfn_mock = apicls_mock.return_value.list_cluster_custom_object +def test_when_successful_clustered(req_mock, resource): + result = {'items': []} + req_mock.get.return_value.json.return_value = result lst = list_objs(resource=resource, namespace=None) assert lst is result - assert not sidefn_mock.called - assert mainfn_mock.call_count == 1 - assert mainfn_mock.call_args_list == [call( - group=resource.group, - version=resource.version, - plural=resource.plural, - )] + assert req_mock.get.called + assert req_mock.get.call_count == 1 + url = req_mock.get.call_args_list[0][1]['url'] + assert 'apis/zalando.org/v1/kopfexamples' in url + assert 'namespaces/' not in url -def test_when_successful_namespaced(client_mock, resource): - result = object() - apicls_mock = client_mock.CustomObjectsApi - apicls_mock.return_value.list_cluster_custom_object.return_value = result - apicls_mock.return_value.list_namespaced_custom_object.return_value = result - sidefn_mock = apicls_mock.return_value.list_cluster_custom_object - mainfn_mock = apicls_mock.return_value.list_namespaced_custom_object + +def test_when_successful_namespaced(req_mock, resource): + result = {'items': []} + req_mock.get.return_value.json.return_value = result lst = list_objs(resource=resource, namespace='ns1') assert lst is result - assert not sidefn_mock.called - assert mainfn_mock.call_count == 1 - assert mainfn_mock.call_args_list == [call( - group=resource.group, - version=resource.version, - plural=resource.plural, - namespace='ns1', - )] + assert req_mock.get.called + assert req_mock.get.call_count == 1 + + url = req_mock.get.call_args_list[0][1]['url'] + assert 'apis/zalando.org/v1/namespaces/ns1/kopfexamples' in url @pytest.mark.parametrize('namespace', [None, 'ns1'], ids=['without-namespace', 'with-namespace']) @pytest.mark.parametrize('status', [400, 401, 403, 500, 666]) -def test_raises_api_error(client_mock, resource, namespace, status): - error = kubernetes.client.rest.ApiException(status=status) - apicls_mock = client_mock.CustomObjectsApi - apicls_mock.return_value.list_cluster_custom_object.side_effect = error - apicls_mock.return_value.list_namespaced_custom_object.side_effect = error +def test_raises_api_error(req_mock, resource, namespace, status): + response = requests.Response() + response.status_code = status + error = requests.exceptions.HTTPError("boo!", response=response) + req_mock.get.side_effect = error - with pytest.raises(kubernetes.client.rest.ApiException) as e: + with pytest.raises(requests.exceptions.HTTPError) as e: list_objs(resource=resource, namespace=namespace) - assert e.value.status == status + assert e.value.response.status_code == status diff --git a/tests/k8s/test_make_list_fn.py b/tests/k8s/test_make_list_fn.py deleted file mode 100644 index 8ea5e99e..00000000 --- a/tests/k8s/test_make_list_fn.py +++ /dev/null @@ -1,92 +0,0 @@ -import pydoc - -import kubernetes.client.rest -import pytest -from asynctest import call - -from kopf.clients.fetching import make_list_fn - - -def test_when_present_clustered(client_mock, resource): - result = object() - apicls_mock = client_mock.CustomObjectsApi - apicls_mock.return_value.list_cluster_custom_object.return_value = result - apicls_mock.return_value.list_namespaced_custom_object.return_value = result - sidefn_mock = apicls_mock.return_value.list_namespaced_custom_object - mainfn_mock = apicls_mock.return_value.list_cluster_custom_object - - fn = make_list_fn(resource=resource, namespace=None) - assert callable(fn) - - assert not sidefn_mock.called - assert not mainfn_mock.called - - res = fn(opt1='val1', opt2=123) - assert res is result - - assert sidefn_mock.call_count == 0 - assert mainfn_mock.call_count == 1 - assert mainfn_mock.call_args_list == [call( - group=resource.group, - version=resource.version, - plural=resource.plural, - opt1='val1', opt2=123, - )] - - -def test_when_present_namespaced(client_mock, resource): - result = object() - apicls_mock = client_mock.CustomObjectsApi - apicls_mock.return_value.list_cluster_custom_object.return_value = result - apicls_mock.return_value.list_namespaced_custom_object.return_value = result - sidefn_mock = apicls_mock.return_value.list_cluster_custom_object - mainfn_mock = apicls_mock.return_value.list_namespaced_custom_object - - fn = make_list_fn(resource=resource, namespace='ns1') - assert callable(fn) - - assert not sidefn_mock.called - assert not mainfn_mock.called - - res = fn(opt1='val1', opt2=123) - assert res is result - - assert sidefn_mock.call_count == 0 - assert mainfn_mock.call_count == 1 - assert mainfn_mock.call_args_list == [call( - group=resource.group, - version=resource.version, - plural=resource.plural, - namespace='ns1', - opt1='val1', opt2=123, - )] - - -@pytest.mark.parametrize('namespace', [None, 'ns1'], ids=['without-namespace', 'with-namespace']) -@pytest.mark.parametrize('status', [400, 401, 403, 404, 500, 666]) -def test_raises_api_error(client_mock, resource, namespace, status): - error = kubernetes.client.rest.ApiException(status=status) - apicls_mock = client_mock.CustomObjectsApi - apicls_mock.return_value.list_cluster_custom_object.side_effect = error - apicls_mock.return_value.list_namespaced_custom_object.side_effect = error - - fn = make_list_fn(resource=resource, namespace=namespace) - with pytest.raises(kubernetes.client.rest.ApiException) as e: - fn(opt1='val1', opt2=123) - assert e.value.status == status - - -@pytest.mark.parametrize('namespace', [None, 'ns1'], ids=['without-namespace', 'with-namespace']) -def test_docstrings_are_preserved(client_mock, resource, namespace): - # Docstrings are important! Kubernetes client uses them to guess - # the returned object types and the parameters type. - docstring = """some doc \n :return: sometype""" - - apicls_mock = client_mock.CustomObjectsApi - apicls_mock.return_value.list_cluster_custom_object.__doc__ = docstring - apicls_mock.return_value.list_namespaced_custom_object.__doc__ = docstring - - fn = make_list_fn(resource=resource, namespace=namespace) - fn_docstring = pydoc.getdoc(fn) # same as k8s client does this - assert isinstance(fn_docstring, str) - assert ':return: sometype' in docstring # it will be reformatted diff --git a/tests/k8s/test_patching.py b/tests/k8s/test_patching.py index 2ce9b32b..6076508e 100644 --- a/tests/k8s/test_patching.py +++ b/tests/k8s/test_patching.py @@ -1,132 +1,100 @@ -import asyncio +import json import pytest -from asynctest import call from kopf.clients.patching import patch_obj -async def test_by_name_clustered(client_mock, resource): - patch = object() - apicls_mock = client_mock.CustomObjectsApi - sidefn_mock = apicls_mock.return_value.patch_namespaced_custom_object - mainfn_mock = apicls_mock.return_value.patch_cluster_custom_object - +@pytest.mark.resource_clustered # see `req_mock` +async def test_by_name_clustered(req_mock, resource): + patch = {'x': 'y'} res = await patch_obj(resource=resource, namespace=None, name='name1', patch=patch) assert res is None # never return any k8s-client specific things - assert not sidefn_mock.called - assert mainfn_mock.call_count == 1 - assert mainfn_mock.call_args_list == [call( - group=resource.group, - version=resource.version, - plural=resource.plural, - name='name1', - body=patch, - )] + assert req_mock.patch.called + assert req_mock.patch.call_count == 1 + + url = req_mock.patch.call_args_list[0][1]['url'] + assert 'namespaces/' not in url + assert f'apis/{resource.api_version}/{resource.plural}/name1' in url + data = json.loads(req_mock.patch.call_args_list[0][1]['data']) + assert data == {'x': 'y'} -async def test_by_name_namespaced(client_mock, resource): - patch = object() - apicls_mock = client_mock.CustomObjectsApi - sidefn_mock = apicls_mock.return_value.patch_cluster_custom_object - mainfn_mock = apicls_mock.return_value.patch_namespaced_custom_object +async def test_by_name_namespaced(req_mock, resource): + patch = {'x': 'y'} res = await patch_obj(resource=resource, namespace='ns1', name='name1', patch=patch) assert res is None # never return any k8s-client specific things - assert not sidefn_mock.called - assert mainfn_mock.call_count == 1 - assert mainfn_mock.call_args_list == [call( - group=resource.group, - version=resource.version, - plural=resource.plural, - namespace='ns1', - name='name1', - body=patch, - )] + assert req_mock.patch.called + assert req_mock.patch.call_count == 1 + url = req_mock.patch.call_args_list[0][1]['url'] + assert 'namespaces/' in url + assert f'apis/{resource.api_version}/namespaces/ns1/{resource.plural}/name1' in url -async def test_by_body_clustered(client_mock, resource): - patch = object() - apicls_mock = client_mock.CustomObjectsApi - sidefn_mock = apicls_mock.return_value.patch_namespaced_custom_object - mainfn_mock = apicls_mock.return_value.patch_cluster_custom_object + data = json.loads(req_mock.patch.call_args_list[0][1]['data']) + assert data == {'x': 'y'} + +@pytest.mark.resource_clustered # see `req_mock` +async def test_by_body_clustered(req_mock, resource): + patch = {'x': 'y'} body = {'metadata': {'name': 'name1'}} res = await patch_obj(resource=resource, body=body, patch=patch) assert res is None # never return any k8s-client specific things - assert not sidefn_mock.called - assert mainfn_mock.call_count == 1 - assert mainfn_mock.call_args_list == [call( - group=resource.group, - version=resource.version, - plural=resource.plural, - name='name1', - body=patch, - )] + assert req_mock.patch.called + assert req_mock.patch.call_count == 1 + + url = req_mock.patch.call_args_list[0][1]['url'] + assert 'namespaces/' not in url + assert f'apis/{resource.api_version}/{resource.plural}/name1' in url + data = json.loads(req_mock.patch.call_args_list[0][1]['data']) + assert data == {'x': 'y'} -async def test_by_body_namespaced(client_mock, resource): - patch = object() - apicls_mock = client_mock.CustomObjectsApi - sidefn_mock = apicls_mock.return_value.patch_cluster_custom_object - mainfn_mock = apicls_mock.return_value.patch_namespaced_custom_object +async def test_by_body_namespaced(req_mock, resource): + patch = {'x': 'y'} body = {'metadata': {'namespace': 'ns1', 'name': 'name1'}} res = await patch_obj(resource=resource, body=body, patch=patch) assert res is None # never return any k8s-client specific things - assert not sidefn_mock.called - assert mainfn_mock.call_count == 1 - assert mainfn_mock.call_args_list == [call( - group=resource.group, - version=resource.version, - plural=resource.plural, - namespace='ns1', - name='name1', - body=patch, - )] + assert req_mock.patch.called + assert req_mock.patch.call_count == 1 + url = req_mock.patch.call_args_list[0][1]['url'] + assert 'namespaces/' in url + assert f'apis/{resource.api_version}/namespaces/ns1/{resource.plural}/name1' in url -async def test_raises_when_body_conflicts_with_namespace(client_mock, resource): - patch = object() - apicls_mock = client_mock.CustomObjectsApi - sidefn_mock = apicls_mock.return_value.patch_namespaced_custom_object - mainfn_mock = apicls_mock.return_value.patch_cluster_custom_object + data = json.loads(req_mock.patch.call_args_list[0][1]['data']) + assert data == {'x': 'y'} + +async def test_raises_when_body_conflicts_with_namespace(req_mock, resource): + patch = {'x': 'y'} body = {'metadata': {'namespace': 'ns1', 'name': 'name1'}} with pytest.raises(TypeError): await patch_obj(resource=resource, body=body, namespace='ns1', patch=patch) - assert not sidefn_mock.called - assert not mainfn_mock.called - + assert not req_mock.patch.called -async def test_raises_when_body_conflicts_with_name(client_mock, resource): - patch = object() - apicls_mock = client_mock.CustomObjectsApi - sidefn_mock = apicls_mock.return_value.patch_namespaced_custom_object - mainfn_mock = apicls_mock.return_value.patch_cluster_custom_object +async def test_raises_when_body_conflicts_with_name(req_mock, resource): + patch = {'x': 'y'} body = {'metadata': {'namespace': 'ns1', 'name': 'name1'}} with pytest.raises(TypeError): await patch_obj(resource=resource, body=body, name='name1', patch=patch) - assert not sidefn_mock.called - assert not mainfn_mock.called - + assert not req_mock.patch.called -async def test_raises_when_body_conflicts_with_ids(client_mock, resource): - patch = object() - apicls_mock = client_mock.CustomObjectsApi - sidefn_mock = apicls_mock.return_value.patch_namespaced_custom_object - mainfn_mock = apicls_mock.return_value.patch_cluster_custom_object +async def test_raises_when_body_conflicts_with_ids(req_mock, resource): + patch = {'x': 'y'} body = {'metadata': {'namespace': 'ns1', 'name': 'name1'}} with pytest.raises(TypeError): await patch_obj(resource=resource, body=body, namespace='ns1', name='name1', patch=patch) - assert not sidefn_mock.called - assert not mainfn_mock.called + assert not req_mock.patch.called diff --git a/tests/k8s/test_read_crd.py b/tests/k8s/test_read_crd.py index 7d2655b5..24929d10 100644 --- a/tests/k8s/test_read_crd.py +++ b/tests/k8s/test_read_crd.py @@ -1,55 +1,54 @@ -import kubernetes import pytest -from asynctest import call +import requests from kopf.clients.fetching import read_crd -def test_when_present(client_mock, resource): - apicls_mock = client_mock.ApiextensionsV1beta1Api - readfn_mock = apicls_mock.return_value.read_custom_resource_definition - result_mock = readfn_mock.return_value +def test_when_present(req_mock, resource): + result = {} + req_mock.get.return_value.json.return_value = result crd = read_crd(resource=resource) - assert crd is result_mock + assert crd is result - assert apicls_mock.called - assert apicls_mock.call_count == 1 - assert readfn_mock.called - assert readfn_mock.call_count == 1 - assert readfn_mock.call_args_list == [ - call(name=f'{resource.plural}.{resource.group}') - ] + assert req_mock.get.called + assert req_mock.get.call_count == 1 + + url = req_mock.get.call_args_list[0][1]['url'] + assert '/customresourcedefinitions/kopfexamples.zalando.org' in url @pytest.mark.parametrize('status', [403, 404]) -def test_when_absent_with_no_default(client_mock, resource, status): - error = kubernetes.client.rest.ApiException(status=status) - apicls_mock = client_mock.ApiextensionsV1beta1Api - apicls_mock.return_value.read_custom_resource_definition.side_effect = error +def test_when_absent_with_no_default(req_mock, resource, status): + response = requests.Response() + response.status_code = status + error = requests.exceptions.HTTPError("boo!", response=response) + req_mock.get.side_effect = error - with pytest.raises(kubernetes.client.rest.ApiException) as e: + with pytest.raises(requests.exceptions.HTTPError) as e: read_crd(resource=resource) - assert e.value.status == status + assert e.value.response.status_code == status @pytest.mark.parametrize('default', [None, object()], ids=['none', 'object']) @pytest.mark.parametrize('status', [403, 404]) -def test_when_absent_with_default(client_mock, resource, default, status): - error = kubernetes.client.rest.ApiException(status=status) - apicls_mock = client_mock.ApiextensionsV1beta1Api - apicls_mock.return_value.read_custom_resource_definition.side_effect = error +def test_when_absent_with_default(req_mock, resource, default, status): + response = requests.Response() + response.status_code = status + error = requests.exceptions.HTTPError("boo!", response=response) + req_mock.get.side_effect = error crd = read_crd(resource=resource, default=default) assert crd is default @pytest.mark.parametrize('status', [400, 401, 500, 666]) -def test_raises_api_error_despite_default(client_mock, resource, status): - error = kubernetes.client.rest.ApiException(status=status) - apicls_mock = client_mock.ApiextensionsV1beta1Api - apicls_mock.return_value.read_custom_resource_definition.side_effect = error +def test_raises_api_error_despite_default(req_mock, resource, status): + response = requests.Response() + response.status_code = status + error = requests.exceptions.HTTPError("boo!", response=response) + req_mock.get.side_effect = error - with pytest.raises(kubernetes.client.rest.ApiException) as e: + with pytest.raises(requests.exceptions.HTTPError) as e: read_crd(resource=resource, default=object()) - assert e.value.status == status + assert e.value.response.status_code == status diff --git a/tests/k8s/test_read_obj.py b/tests/k8s/test_read_obj.py index 33b8cf13..08294fdf 100644 --- a/tests/k8s/test_read_obj.py +++ b/tests/k8s/test_read_obj.py @@ -1,87 +1,74 @@ -import kubernetes.client.rest +import pykube import pytest -from asynctest import call +import requests from kopf.clients.fetching import read_obj -def test_when_present_clustered(client_mock, resource): - result = object() - apicls_mock = client_mock.CustomObjectsApi - apicls_mock.return_value.get_cluster_custom_object.return_value = result - apicls_mock.return_value.get_namespaced_custom_object.return_value = result - sidefn_mock = apicls_mock.return_value.get_namespaced_custom_object - mainfn_mock = apicls_mock.return_value.get_cluster_custom_object +@pytest.mark.resource_clustered # see `req_mock` +def test_when_present_clustered(req_mock, resource): + result = {} + req_mock.get.return_value.json.return_value = result crd = read_obj(resource=resource, namespace=None, name='name1') assert crd is result - assert not sidefn_mock.called - assert mainfn_mock.call_count == 1 - assert mainfn_mock.call_args_list == [call( - group=resource.group, - version=resource.version, - plural=resource.plural, - name='name1', - )] + assert req_mock.get.called + assert req_mock.get.call_count == 1 + url = req_mock.get.call_args_list[0][1]['url'] + assert 'apis/zalando.org/v1/kopfexamples/name1' in url + assert 'namespaces/' not in url -def test_when_present_namespaced(client_mock, resource): - result = object() - apicls_mock = client_mock.CustomObjectsApi - apicls_mock.return_value.get_cluster_custom_object.return_value = result - apicls_mock.return_value.get_namespaced_custom_object.return_value = result - sidefn_mock = apicls_mock.return_value.get_cluster_custom_object - mainfn_mock = apicls_mock.return_value.get_namespaced_custom_object + +def test_when_present_namespaced(req_mock, resource): + result = {} + req_mock.get.return_value.json.return_value = result crd = read_obj(resource=resource, namespace='ns1', name='name1') assert crd is result - assert not sidefn_mock.called - assert mainfn_mock.call_count == 1 - assert mainfn_mock.call_args_list == [call( - group=resource.group, - version=resource.version, - plural=resource.plural, - namespace='ns1', - name='name1', - )] + assert req_mock.get.called + assert req_mock.get.call_count == 1 + + url = req_mock.get.call_args_list[0][1]['url'] + assert 'apis/zalando.org/v1/namespaces/ns1/kopfexamples/name1' in url @pytest.mark.parametrize('namespace', [None, 'ns1'], ids=['without-namespace', 'with-namespace']) -@pytest.mark.parametrize('status', [404]) -def test_when_absent_with_no_default(client_mock, resource, namespace, status): - error = kubernetes.client.rest.ApiException(status=status) - apicls_mock = client_mock.CustomObjectsApi - apicls_mock.return_value.get_cluster_custom_object.side_effect = error - apicls_mock.return_value.get_namespaced_custom_object.side_effect = error - - with pytest.raises(kubernetes.client.rest.ApiException) as e: +@pytest.mark.parametrize('status', [403, 404]) +def test_when_absent_with_no_default(req_mock, resource, namespace, status): + response = requests.Response() + response.status_code = status + error = requests.exceptions.HTTPError("boo!", response=response) + req_mock.get.side_effect = error + + with pytest.raises(requests.exceptions.HTTPError) as e: read_obj(resource=resource, namespace=namespace, name='name1') - assert e.value.status == status + assert e.value.response.status_code == status @pytest.mark.parametrize('default', [None, object()], ids=['none', 'object']) @pytest.mark.parametrize('namespace', [None, 'ns1'], ids=['without-namespace', 'with-namespace']) -@pytest.mark.parametrize('status', [404]) -def test_when_absent_with_default(client_mock, resource, namespace, default, status): - error = kubernetes.client.rest.ApiException(status=status) - apicls_mock = client_mock.CustomObjectsApi - apicls_mock.return_value.get_cluster_custom_object.side_effect = error - apicls_mock.return_value.get_namespaced_custom_object.side_effect = error +@pytest.mark.parametrize('status', [403, 404]) +def test_when_absent_with_default(req_mock, resource, namespace, default, status): + response = requests.Response() + response.status_code = status + error = requests.exceptions.HTTPError("boo!", response=response) + req_mock.get.side_effect = error crd = read_obj(resource=resource, namespace=namespace, name='name1', default=default) assert crd is default @pytest.mark.parametrize('namespace', [None, 'ns1'], ids=['without-namespace', 'with-namespace']) -@pytest.mark.parametrize('status', [400, 401, 403, 500, 666]) -def test_raises_api_error_despite_default(client_mock, resource, namespace, status): - error = kubernetes.client.rest.ApiException(status=status) - apicls_mock = client_mock.CustomObjectsApi - apicls_mock.return_value.get_cluster_custom_object.side_effect = error - apicls_mock.return_value.get_namespaced_custom_object.side_effect = error - - with pytest.raises(kubernetes.client.rest.ApiException) as e: +@pytest.mark.parametrize('status', [400, 401, 500, 666]) +def test_raises_api_error_despite_default(req_mock, resource, namespace, status): + response = requests.Response() + response.status_code = status + error = requests.exceptions.HTTPError("boo!", response=response) + req_mock.get.side_effect = error + + with pytest.raises(requests.exceptions.HTTPError) as e: read_obj(resource=resource, namespace=namespace, name='name1', default=object()) - assert e.value.status == status + assert e.value.response.status_code == status diff --git a/tests/k8s/test_scopes.py b/tests/k8s/test_scopes.py index 67aa1b30..922027ff 100644 --- a/tests/k8s/test_scopes.py +++ b/tests/k8s/test_scopes.py @@ -9,12 +9,8 @@ class PreventedActualCallError(Exception): pass -async def test_watching_over_cluster(resource, client_mock): - apicls_mock = client_mock.CustomObjectsApi - cl_list_fn = apicls_mock.return_value.list_cluster_custom_object - ns_list_fn = apicls_mock.return_value.list_namespaced_custom_object - cl_list_fn.side_effect = PreventedActualCallError() - ns_list_fn.side_effect = PreventedActualCallError() +async def test_watching_over_cluster(resource, req_mock): + req_mock.get.side_effect = PreventedActualCallError() itr = streaming_watch( resource=resource, @@ -27,25 +23,16 @@ async def test_watching_over_cluster(resource, client_mock): with pytest.raises(PreventedActualCallError): async for _ in itr: pass # fully deplete it - assert apicls_mock.called - assert apicls_mock.call_count == 1 + assert req_mock.get.called + assert req_mock.get.call_count == 1 - # Cluster-scoped listing function is used irrelevant of the resource. - assert not ns_list_fn.called - assert cl_list_fn.called - assert cl_list_fn.call_count == 1 - assert cl_list_fn.call_args[1]['group'] == resource.group - assert cl_list_fn.call_args[1]['version'] == resource.version - assert cl_list_fn.call_args[1]['plural'] == resource.plural - assert 'namespace' not in cl_list_fn.call_args[1] + url = req_mock.get.call_args_list[0][1]['url'] + assert 'apis/zalando.org/v1/kopfexamples' in url + assert 'namespaces/' not in url -async def test_watching_over_namespace(resource, client_mock): - apicls_mock = client_mock.CustomObjectsApi - cl_list_fn = apicls_mock.return_value.list_cluster_custom_object - ns_list_fn = apicls_mock.return_value.list_namespaced_custom_object - cl_list_fn.side_effect = PreventedActualCallError() - ns_list_fn.side_effect = PreventedActualCallError() +async def test_watching_over_namespace(resource, req_mock): + req_mock.get.side_effect = PreventedActualCallError() itr = streaming_watch( resource=resource, @@ -58,14 +45,8 @@ async def test_watching_over_namespace(resource, client_mock): with pytest.raises(PreventedActualCallError): async for _ in itr: pass # fully deplete it - assert apicls_mock.called - assert apicls_mock.call_count == 1 - - # The scope-relevant listing function is used, depending on the resource. - assert not cl_list_fn.called - assert ns_list_fn.called - assert ns_list_fn.call_count == 1 - assert ns_list_fn.call_args[1]['group'] == resource.group - assert ns_list_fn.call_args[1]['version'] == resource.version - assert ns_list_fn.call_args[1]['plural'] == resource.plural - assert ns_list_fn.call_args[1]['namespace'] == 'something' + assert req_mock.get.called + assert req_mock.get.call_count == 1 + + url = req_mock.get.call_args_list[0][1]['url'] + assert 'apis/zalando.org/v1/namespaces/something/kopfexamples' in url diff --git a/tests/k8s/test_watching.py b/tests/k8s/test_watching.py index ca2a2556..e72ced31 100644 --- a/tests/k8s/test_watching.py +++ b/tests/k8s/test_watching.py @@ -20,7 +20,7 @@ ] STREAM_WITH_UNKNOWN_EVENT = [ {'type': 'ADDED', 'object': {'spec': 'a'}}, - {'type': 'UNKNOWN'}, + {'type': 'UNKNOWN', 'object': {}}, {'type': 'ADDED', 'object': {'spec': 'b'}}, ] STREAM_WITH_ERROR_410GONE = [ @@ -39,15 +39,8 @@ class SampleException(Exception): pass -@pytest.fixture() -def stream(mocker): - """ A mock for the stream of events as if returned by K8s client. """ - stream = mocker.patch('kubernetes.watch.Watch.stream') - return stream - - async def test_empty_stream_yields_nothing(resource, stream): - stream.return_value = iter([]) + stream.feed([]) events = [] async for event in streaming_watch(resource=resource, namespace=None): @@ -57,7 +50,7 @@ async def test_empty_stream_yields_nothing(resource, stream): async def test_event_stream_yields_everything(resource, stream): - stream.return_value = iter(STREAM_WITH_NORMAL_EVENTS) + stream.feed(STREAM_WITH_NORMAL_EVENTS) events = [] async for event in streaming_watch(resource=resource, namespace=None): @@ -70,13 +63,12 @@ async def test_event_stream_yields_everything(resource, stream): async def test_unknown_event_type_ignored(resource, stream, caplog): caplog.set_level(logging.DEBUG) - stream.return_value = iter(STREAM_WITH_UNKNOWN_EVENT) + stream.feed(STREAM_WITH_UNKNOWN_EVENT) events = [] async for event in streaming_watch(resource=resource, namespace=None): events.append(event) - assert stream.call_count == 1 assert len(events) == 2 assert events[0]['object']['spec'] == 'a' assert events[1]['object']['spec'] == 'b' @@ -86,57 +78,53 @@ async def test_unknown_event_type_ignored(resource, stream, caplog): async def test_error_410gone_exits_normally(resource, stream, caplog): caplog.set_level(logging.DEBUG) - stream.return_value = iter(STREAM_WITH_ERROR_410GONE) + stream.feed(STREAM_WITH_ERROR_410GONE) events = [] async for event in streaming_watch(resource=resource, namespace=None): events.append(event) - assert stream.call_count == 1 assert len(events) == 1 assert events[0]['object']['spec'] == 'a' assert "Restarting the watch-stream" in caplog.text async def test_unknown_error_raises_exception(resource, stream): - stream.return_value = iter(STREAM_WITH_ERROR_CODE) + stream.feed(STREAM_WITH_ERROR_CODE) events = [] with pytest.raises(WatchingError) as e: async for event in streaming_watch(resource=resource, namespace=None): events.append(event) - assert stream.call_count == 1 assert len(events) == 1 assert events[0]['object']['spec'] == 'a' assert '666' in str(e.value) async def test_exception_escalates(resource, stream): - stream.side_effect = SampleException() + stream.feed(SampleException()) events = [] with pytest.raises(SampleException): async for event in streaming_watch(resource=resource, namespace=None): events.append(event) - assert stream.call_count == 1 assert len(events) == 0 async def test_infinite_watch_never_exits_normally(resource, stream): - stream.side_effect = [ - iter(STREAM_WITH_ERROR_410GONE), # watching restarted - iter(STREAM_WITH_UNKNOWN_EVENT), # event ignored + stream.feed( + STREAM_WITH_ERROR_410GONE, # watching restarted + STREAM_WITH_UNKNOWN_EVENT, # event ignored SampleException(), # to finally exit it somehow - ] + ) events = [] with pytest.raises(SampleException): async for event in infinite_watch(resource=resource, namespace=None): events.append(event) - assert stream.call_count == 3 # 2 streams + 1 exception assert len(events) == 3 assert events[0]['object']['spec'] == 'a' assert events[1]['object']['spec'] == 'a' diff --git a/tests/reactor/conftest.py b/tests/reactor/conftest.py index d0e68abb..d6ad1171 100644 --- a/tests/reactor/conftest.py +++ b/tests/reactor/conftest.py @@ -7,24 +7,10 @@ from kopf.reactor.queueing import watcher from kopf.reactor.queueing import worker as original_worker -# We do not test the Kubernetes client, so everything there should be mocked. -# Also, no external calls must be made under any circumstances. Hence, auto-use. -@pytest.fixture(autouse=True) -def client_mock(mocker): - client_mock = mocker.patch('kubernetes.client') - - fake_list = {'items': [], 'metadata': {'resourceVersion': None}} - client_mock.CustomObjectsApi.list_cluster_custom_object.return_value = fake_list - client_mock.CustomObjectsApi.list_namespaced_custom_object.return_value = fake_list - - return client_mock - -@pytest.fixture() -def stream(mocker, client_mock): - """ A mock for the stream of events as if returned by K8s client. """ - stream = mocker.patch('kubernetes.watch.Watch.stream') - return stream +@pytest.fixture(autouse=True) +def _autouse_req_mock(req_mock): + pass @pytest.fixture() @@ -56,7 +42,7 @@ def watcher_limited(mocker): def watcher_in_background(resource, handler, event_loop, worker_spy, stream): # Prevent any real streaming for the very beginning, before it even starts. - stream.return_value = iter([]) + stream.feed([]) # Spawn a watcher in the background. coro = watcher( diff --git a/tests/reactor/test_queueing.py b/tests/reactor/test_queueing.py index e8641e32..86fd54ba 100644 --- a/tests/reactor/test_queueing.py +++ b/tests/reactor/test_queueing.py @@ -48,7 +48,7 @@ async def test_event_multiplexing(worker_mock, timer, resource, handler, stream, """ Verify that every unique uid goes into its own queue+worker, which are never shared. """ # Inject the events of unique objects - to produce few queues/workers. - stream.return_value = iter(events) + stream.feed(events) # Run the watcher (near-instantly and test-blocking). with timer: @@ -120,7 +120,7 @@ async def test_event_batching(mocker, resource, handler, timer, stream, events, mocker.patch('kopf.config.WorkersConfig.worker_exit_timeout', 0.5) # Inject the events of unique objects - to produce few queues/workers. - stream.return_value = iter(events) + stream.feed(events) # Run the watcher (near-instantly and test-blocking). with timer: @@ -168,9 +168,10 @@ async def test_garbage_collection_of_queues(mocker, stream, events, unique, work mocker.patch('kopf.config.WorkersConfig.worker_idle_timeout', 0.5) mocker.patch('kopf.config.WorkersConfig.worker_batch_window', 0.1) mocker.patch('kopf.config.WorkersConfig.worker_exit_timeout', 0.5) + mocker.patch('kopf.config.WatchersConfig.watcher_retry_delay', 1.0) # to prevent src depletion # Inject the events of unique objects - to produce few queues/workers. - stream.return_value = iter(events) + stream.feed(events) # Give it a moment to populate the queues and spawn all the workers. # Intercept and remember _any_ seen dict of queues for further checks. diff --git a/tests/testing/test_runner.py b/tests/testing/test_runner.py index 0e970dab..d5a4e652 100644 --- a/tests/testing/test_runner.py +++ b/tests/testing/test_runner.py @@ -4,10 +4,8 @@ @pytest.fixture(autouse=True) -def no_config_needed(mocker): - mocker.patch('kubernetes.config.load_incluster_config') - mocker.patch('kubernetes.config.load_kube_config') - mocker.patch('kubernetes.client.CoreApi') # for login self-check +def no_config_needed(login_mocks): + pass def test_command_invocation_works(): diff --git a/tools/kubernetes-client.sh b/tools/kubernetes-client.sh new file mode 100755 index 00000000..8fd024de --- /dev/null +++ b/tools/kubernetes-client.sh @@ -0,0 +1,8 @@ +#!/bin/bash +set -eu +set -x + +if [[ "${CLIENT:-}" = "yes" ]] ; then + # FIXME: See https://github.com/kubernetes-client/python/issues/866 + pip install 'kubernetes<10.0.0' +fi