From e53a598c856cf9f8e02ef3cb207f15efea85d5f9 Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Sun, 2 Jun 2019 20:19:33 +0200 Subject: [PATCH 01/16] Switch to pykube-ng, keep kubernetes client optional --- examples/requirements.txt | 1 + kopf/cli.py | 3 + kopf/clients/auth.py | 116 +++++++++++++++++-- kopf/clients/classes.py | 48 ++++++++ kopf/clients/events.py | 55 +++++---- kopf/clients/fetching.py | 120 ++++++++----------- kopf/clients/patching.py | 28 ++--- kopf/clients/watching.py | 15 +-- kopf/config.py | 30 +++-- kopf/engines/peering.py | 2 +- kopf/logging/logging.py | 41 +++++++ kopf/reactor/registries.py | 10 ++ requirements.txt | 3 + setup.py | 2 +- tests/cli/conftest.py | 3 +- tests/cli/test_help.py | 4 + tests/cli/test_login.py | 205 +++++++++++++++++++++------------ tests/conftest.py | 82 +++++++++++++ tests/e2e/test_examples.py | 2 +- tests/handling/conftest.py | 9 +- tests/k8s/conftest.py | 17 +-- tests/k8s/test_events.py | 89 ++++++-------- tests/k8s/test_list_objs.py | 63 ++++------ tests/k8s/test_make_list_fn.py | 92 --------------- tests/k8s/test_patching.py | 136 +++++++++------------- tests/k8s/test_read_crd.py | 59 +++++----- tests/k8s/test_read_obj.py | 101 +++++++--------- tests/k8s/test_scopes.py | 47 +++----- tests/k8s/test_watching.py | 34 ++---- tests/reactor/conftest.py | 22 +--- tests/reactor/test_queueing.py | 7 +- tests/testing/test_runner.py | 6 +- 32 files changed, 771 insertions(+), 681 deletions(-) create mode 100644 kopf/clients/classes.py create mode 100644 kopf/logging/logging.py delete mode 100644 tests/k8s/test_make_list_fn.py diff --git a/examples/requirements.txt b/examples/requirements.txt index e7d4c85d..140c5635 100644 --- a/examples/requirements.txt +++ b/examples/requirements.txt @@ -1,3 +1,4 @@ kopf kubernetes +pykube-ng pyyaml diff --git a/kopf/cli.py b/kopf/cli.py index 73a367b0..5d8add8e 100644 --- a/kopf/cli.py +++ b/kopf/cli.py @@ -13,8 +13,11 @@ def cli_login(): try: auth.login() + auth.verify() except auth.LoginError as e: raise click.ClickException(str(e)) + except auth.AccessError as e: + raise click.ClickException(str(e)) def logging_options(fn): diff --git a/kopf/clients/auth.py b/kopf/clients/auth.py index 746c2068..e817a073 100644 --- a/kopf/clients/auth.py +++ b/kopf/clients/auth.py @@ -1,42 +1,134 @@ import logging +from typing import Optional -import kubernetes +import pykube +import requests import urllib3.exceptions logger = logging.getLogger(__name__) +# Set in login(), consumed in get_pykube_cfg() and all API calls. +_pykube_cfg: Optional[pykube.KubeConfig] = None + class LoginError(Exception): """ Raised when the operator cannot login to the API. """ +class AccessError(Exception): + """ Raised when the operator cannot access the cluster API. """ + + def login(): """ - Login the the Kubernetes cluster, locally or remotely. + Login to Kubernetes cluster, locally or remotely. + + Keep the logged in state or config object in the global variables, + so that it can be available for future calls via the same function call. + + Automatic refresh/reload of the tokens or objects also should be done here. """ - # Configure the default client credentials for all possible environments. + # Pykube login is mandatory. If it fails, the framework will not run at all. + login_pykube() + + # We keep the official client library auto-login only because it was + # an implied behavior before switching to pykube -- to keep it so (implied). + try: + import kubernetes + except ImportError: + pass + else: + login_client() + + +def login_pykube(): + global _pykube_cfg + try: + _pykube_cfg = pykube.KubeConfig.from_service_account() + logger.debug("Pykube is configured in cluster with service account.") + except FileNotFoundError: + try: + _pykube_cfg = pykube.KubeConfig.from_file() + logger.debug("Pykube is configured via kubeconfig file.") + except (pykube.PyKubeError, FileNotFoundError): + raise LoginError(f"Cannot authenticate pykube neither in-cluster, nor via kubeconfig.") + + +def login_client(): + import kubernetes.client try: kubernetes.config.load_incluster_config() # cluster env vars - logger.debug("configured in cluster with service account") + logger.debug("Client is configured in cluster with service account.") except kubernetes.config.ConfigException as e1: try: kubernetes.config.load_kube_config() # developer's config files - logger.debug("configured via kubeconfig file") + logger.debug("Client is configured via kubeconfig file.") except kubernetes.config.ConfigException as e2: - raise LoginError(f"Cannot authenticate neither in-cluster, nor via kubeconfig.") + raise LoginError(f"Cannot authenticate client neither in-cluster, nor via kubeconfig.") + + +def verify(): + """ + Verify if login has succeeded, and the access configuration is still valid. + + If not, raise `.AccessError`. - # Make a sample API call to ensure the login is successful, - # and convert some of the known exceptions to the CLI hints. + It can also perform the permission checks before the operator + actually starts, i.e. before any fetching or watching API calls. + + If the operator is not logged in, then the check fails, + and the operator stops (as any other API call). + """ + + # Pykube login is mandatory. If it fails, the framework will not run at all. + verify_pykube() + + # We keep the official client library auto-login only because it was + # an implied behavior before switching to pykube -- to keep it so (implied). + try: + import kubernetes + except ImportError: + pass + else: + verify_client() + + +def verify_pykube(): + try: + api = get_pykube_api() + rsp = api.get(version="", base="/") + rsp.raise_for_status() + except requests.exceptions.HTTPError as e: + if e.response.status_code == 401: + raise AccessError("Cannot authenticate to the Kubernetes API. " + "Please login or configure the tokens.") + else: + raise + + +def verify_client(): + import kubernetes.client.rest try: api = kubernetes.client.CoreApi() api.get_api_versions() except urllib3.exceptions.HTTPError as e: - raise LoginError("Cannot connect to the Kubernetes API. " - "Please configure the cluster access.") + raise AccessError("Cannot connect to the Kubernetes API. " + "Please configure the cluster access.") except kubernetes.client.rest.ApiException as e: if e.status == 401: - raise LoginError("Cannot authenticate to the Kubernetes API. " - "Please login or configure the tokens.") + raise AccessError("Cannot authenticate to the Kubernetes API. " + "Please login or configure the tokens.") else: raise + + +def get_pykube_cfg() -> pykube.KubeConfig: + if _pykube_cfg is None: + raise LoginError("Not logged in with PyKube.") + return _pykube_cfg + + +# TODO: add some caching, but keep kwargs in mind. Maybe add a key= for purpose/use-place? +def get_pykube_api(timeout=None) -> pykube.HTTPClient: + return pykube.HTTPClient(get_pykube_cfg(), timeout=timeout) diff --git a/kopf/clients/classes.py b/kopf/clients/classes.py new file mode 100644 index 00000000..21a51433 --- /dev/null +++ b/kopf/clients/classes.py @@ -0,0 +1,48 @@ +import json +from typing import Type + +import pykube + +from kopf.clients import auth + + +# TODO: this mixin has to be migrated into pykube-ng itself. +class APIObject(pykube.objects.APIObject): + def patch(self, patch): + ''' + Patch the Kubernetes resource by calling the API. + ''' + r = self.api.patch(**self.api_kwargs( + headers={"Content-Type": "application/merge-patch+json"}, + data=json.dumps(patch), + )) + self.api.raise_for_status(r) + self.set_obj(r.json()) + + +class NamespacedAPIObject(pykube.objects.NamespacedAPIObject, APIObject): + pass + + +class CustomResourceDefinition(APIObject): + version = "apiextensions.k8s.io/v1beta1" + endpoint = "customresourcedefinitions" + kind = "CustomResourceDefinition" + + +def _make_cls(resource) -> Type[APIObject]: + api = auth.get_pykube_api() + api_resources = api.resource_list(resource.api_version)['resources'] + resource_kind = next((r['kind'] for r in api_resources if r['name'] == resource.plural), None) + is_namespaced = next((r['namespaced'] for r in api_resources if r['name'] == resource.plural), None) + if not resource_kind: + raise pykube.ObjectDoesNotExist("No such CRD at all.") + + cls_name = resource.plural + cls_base = NamespacedAPIObject if is_namespaced else APIObject + cls = type(cls_name, (cls_base,), { + 'version': resource.api_version, + 'endpoint': resource.plural, + 'kind': resource_kind, + }) + return cls diff --git a/kopf/clients/events.py b/kopf/clients/events.py index 5554f67b..b598f8d7 100644 --- a/kopf/clients/events.py +++ b/kopf/clients/events.py @@ -1,11 +1,12 @@ import asyncio import datetime -import functools import logging -import kubernetes.client.rest +import pykube +import requests from kopf import config +from kopf.clients import auth from kopf.structs import hierarchies logger = logging.getLogger(__name__) @@ -41,38 +42,36 @@ async def post_event(*, obj=None, ref=None, type, reason, message=''): suffix = message[-MAX_MESSAGE_LENGTH // 2 + (len(infix) - len(infix) // 2):] message = f'{prefix}{infix}{suffix}' - meta = kubernetes.client.V1ObjectMeta( - namespace=namespace, - generate_name='kopf-event-', - ) - body = kubernetes.client.V1Event( - metadata=meta, + body = { + 'metadata': { + 'namespace': namespace, + 'generateName': 'kopf-event-', + }, - action='Action?', - type=type, - reason=reason, - message=message, + 'action': 'Action?', + 'type': type, + 'reason': reason, + 'message': message, - reporting_component='kopf', - reporting_instance='dev', - source=kubernetes.client.V1EventSource(component='kopf'), # used in the "From" column in `kubectl describe`. + 'reportingComponent': 'kopf', + 'reportingInstance': 'dev', + 'source' : {'component': 'kopf'}, # used in the "From" column in `kubectl describe`. - involved_object=ref, + 'involvedObject': ref, - first_timestamp=now.isoformat() + 'Z', # '2019-01-28T18:25:03.000000Z' -- seen in `kubectl describe ...` - last_timestamp=now.isoformat() + 'Z', # '2019-01-28T18:25:03.000000Z' - seen in `kubectl get events` - event_time=now.isoformat() + 'Z', # '2019-01-28T18:25:03.000000Z' - ) - - api = kubernetes.client.CoreV1Api() - loop = asyncio.get_running_loop() + 'firstTimestamp': now.isoformat() + 'Z', # '2019-01-28T18:25:03.000000Z' -- seen in `kubectl describe ...` + 'lastTimestamp': now.isoformat() + 'Z', # '2019-01-28T18:25:03.000000Z' - seen in `kubectl get events` + 'eventTime': now.isoformat() + 'Z', # '2019-01-28T18:25:03.000000Z' + } try: - await loop.run_in_executor( - config.WorkersConfig.get_syn_executor(), - functools.partial(api.create_namespaced_event, **{'namespace': namespace, 'body': body}) - ) - except kubernetes.client.rest.ApiException as e: + api = auth.get_pykube_api() + obj = pykube.Event(api, body) + + loop = asyncio.get_running_loop() + await loop.run_in_executor(config.WorkersConfig.get_syn_executor(), obj.create) + + except (requests.exceptions.HTTPError, pykube.exceptions.HTTPError) as e: # Events are helpful but auxiliary, they should not fail the handling cycle. # Yet we want to notice that something went wrong (in logs). logger.warning("Failed to post an event. Ignoring and continuing. " diff --git a/kopf/clients/fetching.py b/kopf/clients/fetching.py index 716a7e05..4951dc3b 100644 --- a/kopf/clients/fetching.py +++ b/kopf/clients/fetching.py @@ -1,44 +1,42 @@ -import functools +import pykube +import requests -import kubernetes +from kopf.clients import auth +from kopf.clients import classes _UNSET_ = object() def read_crd(*, resource, default=_UNSET_): try: - name = f'{resource.plural}.{resource.group}' - api = kubernetes.client.ApiextensionsV1beta1Api() - rsp = api.read_custom_resource_definition(name=name) - return rsp - except kubernetes.client.rest.ApiException as e: - if e.status in [404, 403] and default is not _UNSET_: + api = auth.get_pykube_api() + cls = classes.CustomResourceDefinition + obj = cls.objects(api, namespace=None).get_by_name(name=resource.name) + return obj.obj + + except pykube.ObjectDoesNotExist: + if default is not _UNSET_: + return default + raise + except requests.exceptions.HTTPError as e: + if e.response.status_code in [403, 404] and default is not _UNSET_: return default raise def read_obj(*, resource, namespace=None, name=None, default=_UNSET_): try: - if namespace is None: - api = kubernetes.client.CustomObjectsApi() - rsp = api.get_cluster_custom_object( - group=resource.group, - version=resource.version, - plural=resource.plural, - name=name, - ) - else: - api = kubernetes.client.CustomObjectsApi() - rsp = api.get_namespaced_custom_object( - group=resource.group, - version=resource.version, - plural=resource.plural, - namespace=namespace, - name=name, - ) - return rsp - except kubernetes.client.rest.ApiException as e: - if e.status == 404 and default is not _UNSET_: + api = auth.get_pykube_api() + cls = classes._make_cls(resource=resource) + namespace = namespace if issubclass(cls, pykube.objects.NamespacedAPIObject) else None + obj = cls.objects(api, namespace=namespace).get_by_name(name=name) + return obj.obj + except pykube.ObjectDoesNotExist: + if default is not _UNSET_: + return default + raise + except requests.exceptions.HTTPError as e: + if e.response.status_code in [403, 404] and default is not _UNSET_: return default raise @@ -56,51 +54,29 @@ def list_objs(*, resource, namespace=None): * The resource is namespace-scoped AND operator is namespaced-restricted. """ - api = kubernetes.client.CustomObjectsApi() - if namespace is None: - rsp = api.list_cluster_custom_object( - group=resource.group, - version=resource.version, - plural=resource.plural, - ) - else: - rsp = api.list_namespaced_custom_object( - group=resource.group, - version=resource.version, - plural=resource.plural, - namespace=namespace, - ) - return rsp - - -def make_list_fn(*, resource, namespace=None): + api = auth.get_pykube_api() + cls = classes._make_cls(resource=resource) + namespace = namespace if issubclass(cls, pykube.objects.NamespacedAPIObject) else None + lst = cls.objects(api, namespace=pykube.all if namespace is None else namespace) + return lst.response + + +def watch_objs(*, resource, namespace=None, timeout=None): """ - Return a function to be called to receive the list of objects. - Needed in that form for the API streaming calls (see watching.py). + Watch the objects of specific resource type. - However, the returned function is already bound to the specified - resource type, and requires no resource-identifying parameters. + The cluster-scoped call is used in two cases: + + * The resource itself is cluster-scoped, and namespacing makes not sense. + * The operator serves all namespaces for the namespaced custom resource. - Docstrings are important! Kubernetes client uses them to guess - the returned object types and the parameters type. - Function wrapping does that: preserves the docstrings. + Otherwise, the namespace-scoped call is used: + + * The resource is namespace-scoped AND operator is namespaced-restricted. """ - api = kubernetes.client.CustomObjectsApi() - if namespace is None: - @functools.wraps(api.list_cluster_custom_object) - def list_fn(**kwargs): - return api.list_cluster_custom_object( - group=resource.group, - version=resource.version, - plural=resource.plural, - **kwargs) - else: - @functools.wraps(api.list_cluster_custom_object) - def list_fn(**kwargs): - return api.list_namespaced_custom_object( - group=resource.group, - version=resource.version, - plural=resource.plural, - namespace=namespace, - **kwargs) - return list_fn + api = auth.get_pykube_api(timeout=timeout) + cls = classes._make_cls(resource=resource) + namespace = namespace if issubclass(cls, pykube.objects.NamespacedAPIObject) else None + lst = cls.objects(api, namespace=pykube.all if namespace is None else namespace) + src = lst.watch() + return iter({'type': event.type, 'object': event.object.obj} for event in src) diff --git a/kopf/clients/patching.py b/kopf/clients/patching.py index 2432af3a..0fb944d3 100644 --- a/kopf/clients/patching.py +++ b/kopf/clients/patching.py @@ -1,9 +1,8 @@ import asyncio -import functools - -import kubernetes from kopf import config +from kopf.clients import auth +from kopf.clients import classes async def patch_obj(*, resource, patch, namespace=None, name=None, body=None): @@ -23,19 +22,14 @@ async def patch_obj(*, resource, patch, namespace=None, name=None, body=None): namespace = body.get('metadata', {}).get('namespace') if body is not None else namespace name = body.get('metadata', {}).get('name') if body is not None else name + if body is None: + nskw = {} if namespace is None else dict(namespace=namespace) + body = {'metadata': {'name': name}} + body['metadata'].update(nskw) - api = kubernetes.client.CustomObjectsApi() - request_kwargs = { - 'group': resource.group, - 'version': resource.version, - 'plural': resource.plural, - 'name': name, - 'body': patch - } - patch_func = api.patch_cluster_custom_object - if namespace is not None: - request_kwargs['namespace'] = namespace - patch_func = api.patch_namespaced_custom_object - loop = asyncio.get_running_loop() + api = auth.get_pykube_api() + cls = classes._make_cls(resource=resource) + obj = cls(api, body) - await loop.run_in_executor(config.WorkersConfig.get_syn_executor(), functools.partial(patch_func, **request_kwargs)) + loop = asyncio.get_running_loop() + await loop.run_in_executor(config.WorkersConfig.get_syn_executor(), obj.patch, patch) diff --git a/kopf/clients/watching.py b/kopf/clients/watching.py index 36454379..60d44421 100644 --- a/kopf/clients/watching.py +++ b/kopf/clients/watching.py @@ -22,16 +22,12 @@ import logging from typing import Union -import kubernetes - +from kopf import config from kopf.clients import fetching from kopf.reactor import registries logger = logging.getLogger(__name__) -DEFAULT_STREAM_TIMEOUT = None -""" The maximum duration of one streaming request. Patched in some tests. """ - class WatchingError(Exception): """ @@ -86,11 +82,11 @@ async def streaming_watch( # Then, watch the resources starting from the list's resource version. kwargs = {} kwargs.update(dict(resource_version=resource_version) if resource_version else {}) - kwargs.update(dict(timeout_seconds=DEFAULT_STREAM_TIMEOUT) if DEFAULT_STREAM_TIMEOUT else {}) + kwargs.update(dict(timeout_seconds=config.WatchersConfig.default_stream_timeout) if config.WatchersConfig.default_stream_timeout else {}) + # TODO: pass kwargs, specifically: timeout + resource_version loop = asyncio.get_event_loop() - fn = fetching.make_list_fn(resource=resource, namespace=namespace) - watch = kubernetes.watch.Watch() - stream = watch.stream(fn, **kwargs) + stream = fetching.watch_objs(resource=resource, namespace=namespace, + timeout=config.WatchersConfig.default_stream_timeout) async for event in streaming_aiter(stream, loop=loop): # "410 Gone" is for the "resource version too old" error, we must restart watching. @@ -131,3 +127,4 @@ async def infinite_watch( while True: async for event in streaming_watch(resource=resource, namespace=namespace): yield event + await asyncio.sleep(config.WatchersConfig.watcher_retry_delay) diff --git a/kopf/config.py b/kopf/config.py index 41c48c5e..3480e035 100644 --- a/kopf/config.py +++ b/kopf/config.py @@ -3,9 +3,6 @@ import logging from typing import Optional -import kubernetes -import kubernetes.client.rest - from kopf.engines import logging as logging_engine format = '[%(asctime)s] %(name)-20.20s [%(levelname)-8.8s] %(message)s' @@ -32,11 +29,16 @@ def configure(debug=None, verbose=None, quiet=None): logger.setLevel(log_level) # Configure the Kubernetes client defaults according to our settings. - config = kubernetes.client.configuration.Configuration() - config.logger_format = format - config.logger_file = None # once again after the constructor to re-apply the formatter - config.debug = debug - kubernetes.client.configuration.Configuration.set_default(config) + try: + import kubernetes + except ImportError: + pass + else: + config = kubernetes.client.configuration.Configuration() + config.logger_format = format + config.logger_file = None # once again after the constructor to re-apply the formatter + config.debug = debug + kubernetes.client.configuration.Configuration.set_default(config) # Kubernetes client is as buggy as hell: it adds its own stream handlers even in non-debug mode, # does not respect the formatting, and dumps too much of the low-level info. @@ -103,3 +105,15 @@ def set_synchronous_tasks_threadpool_limit(new_limit: int): WorkersConfig.synchronous_tasks_threadpool_limit = new_limit if WorkersConfig.threadpool_executor: WorkersConfig.threadpool_executor._max_workers = new_limit + + +class WatchersConfig: + """ + Used to configure the K8s API watchers and streams. + """ + + default_stream_timeout = None + """ The maximum duration of one streaming request. Patched in some tests. """ + + watcher_retry_delay = 0.1 + """ How long should a pause be between watch requests (to prevent flooding). """ diff --git a/kopf/engines/peering.py b/kopf/engines/peering.py index 330db897..2db25c1a 100644 --- a/kopf/engines/peering.py +++ b/kopf/engines/peering.py @@ -164,7 +164,7 @@ def _is_peering_legacy(name: str, namespace: Optional[str]): if crd is None: return False - if str(crd.spec.scope).lower() != 'cluster': + if str(crd.get('spec', {}).get('scope', '')).lower() != 'cluster': return False # no legacy mode detected obj = fetching.read_obj(resource=LEGACY_PEERING_RESOURCE, name=name, default=None) diff --git a/kopf/logging/logging.py b/kopf/logging/logging.py new file mode 100644 index 00000000..1bed3560 --- /dev/null +++ b/kopf/logging/logging.py @@ -0,0 +1,41 @@ +import asyncio +import logging + +format = '[%(asctime)s] %(name)-20.20s [%(levelname)-8.8s] %(message)s' + + +def configure(debug=None, verbose=None, quiet=None): + log_level = 'DEBUG' if debug or verbose else 'WARNING' if quiet else 'INFO' + + logger = logging.getLogger() + handler = logging.StreamHandler() + formatter = logging.Formatter(format) + handler.setFormatter(formatter) + logger.addHandler(handler) + logger.setLevel(log_level) + + # Configure the Kubernetes client defaults according to our settings. + try: + import kubernetes + except ImportError: + pass + else: + config = kubernetes.client.configuration.Configuration() + config.logger_format = format + config.logger_file = None # once again after the constructor to re-apply the formatter + config.debug = debug + kubernetes.client.configuration.Configuration.set_default(config) + + # Kubernetes client is as buggy as hell: it adds its own stream handlers even in non-debug mode, + # does not respect the formatting, and dumps too much of the low-level info. + if not debug: + logger = logging.getLogger("urllib3") + del logger.handlers[1:] # everything except the default NullHandler + + # Prevent the low-level logging unless in the debug verbosity mode. Keep only the operator's messages. + logging.getLogger('urllib3').propagate = debug + logging.getLogger('asyncio').propagate = debug + logging.getLogger('kubernetes').propagate = debug + + loop = asyncio.get_event_loop() + loop.set_debug(debug) diff --git a/kopf/reactor/registries.py b/kopf/reactor/registries.py index 8cf5a3bf..22dff296 100644 --- a/kopf/reactor/registries.py +++ b/kopf/reactor/registries.py @@ -23,6 +23,16 @@ class Resource(NamedTuple): version: Text plural: Text + @property + def name(self): + return f'{self.plural}.{self.group}' + + @property + def api_version(self): + # Strip heading/trailing slashes if group is absent (e.g. for pods). + return f'{self.group}/{self.version}'.strip('/') + + # A registered handler (function + event meta info). class Handler(NamedTuple): fn: Callable diff --git a/requirements.txt b/requirements.txt index 3cd41951..f864ffce 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,6 +4,9 @@ urllib3<1.25 # The runtime dependencies of the framework, as if via `pip install kopf`. -e . +# Optional packages are present for testing (presence is auto-detected). +kubernetes + # Everything needed to develop (test, debug) the framework. pytest-asyncio pytest-mock diff --git a/setup.py b/setup.py index d5e847e5..75bc7a58 100644 --- a/setup.py +++ b/setup.py @@ -43,6 +43,6 @@ 'click', 'iso8601', 'aiojobs', - 'kubernetes<10.0.0', # see: https://github.com/kubernetes-client/python/issues/866 + 'pykube-ng', ], ) diff --git a/tests/cli/conftest.py b/tests/cli/conftest.py index 557c8976..8aed91ee 100644 --- a/tests/cli/conftest.py +++ b/tests/cli/conftest.py @@ -82,7 +82,8 @@ def invoke(runner): @pytest.fixture() def login(mocker): - return mocker.patch('kopf.clients.auth.login') + mocker.patch('kopf.clients.auth.login') + mocker.patch('kopf.clients.auth.verify') @pytest.fixture() diff --git a/tests/cli/test_help.py b/tests/cli/test_help.py index e35ce473..8db19d41 100644 --- a/tests/cli/test_help.py +++ b/tests/cli/test_help.py @@ -2,11 +2,13 @@ def test_help_in_root(invoke, mocker): login = mocker.patch('kopf.clients.auth.login') + verify = mocker.patch('kopf.clients.auth.verify') result = invoke(['--help']) assert result.exit_code == 0 assert not login.called + assert not verify.called assert 'Usage: kopf [OPTIONS]' in result.output assert ' run ' in result.output @@ -16,6 +18,7 @@ def test_help_in_root(invoke, mocker): def test_help_in_subcommand(invoke, mocker): login = mocker.patch('kopf.clients.auth.login') + verify = mocker.patch('kopf.clients.auth.verify') preload = mocker.patch('kopf.utilities.loaders.preload') real_run = mocker.patch('kopf.reactor.queueing.run') @@ -23,6 +26,7 @@ def test_help_in_subcommand(invoke, mocker): assert result.exit_code == 0 assert not login.called + assert not verify.called assert not preload.called assert not real_run.called diff --git a/tests/cli/test_login.py b/tests/cli/test_login.py index 54fe0696..74097593 100644 --- a/tests/cli/test_login.py +++ b/tests/cli/test_login.py @@ -1,7 +1,12 @@ -import kubernetes +""" +Remember: We do not test the clients, we assume they work when used properly. +We test our own functions here, and check if the clients were called. +""" +import kubernetes.client.rest import pytest +import requests -from kopf.clients.auth import login, LoginError +from kopf.clients.auth import login, verify, LoginError, AccessError @pytest.fixture(autouse=True) @@ -9,121 +14,173 @@ def _auto_clean_kubernetes_client(clean_kubernetes_client): pass -def test_direct_auth_works_incluster(mocker): - # We do not test the client, we assume it works when used properly. - core_api = mocker.patch.object(kubernetes.client, 'CoreApi') - load_kube_config = mocker.patch.object(kubernetes.config, 'load_kube_config') - load_incluster_config = mocker.patch.object(kubernetes.config, 'load_incluster_config') +def test_direct_auth_works_incluster(login_mocks): login() - assert load_incluster_config.called - assert not load_kube_config.called - assert core_api.called # to verify that auth worked + assert login_mocks.pykube_in_cluster.called + assert not login_mocks.pykube_from_file.called + assert login_mocks.client_in_cluster.called + assert not login_mocks.client_from_file.called -def test_direct_auth_works_kubeconfig(mocker): - # We do not test the client, we assume it works when used properly. - core_api = mocker.patch.object(kubernetes.client, 'CoreApi') - load_kube_config = mocker.patch.object(kubernetes.config, 'load_kube_config') - load_incluster_config = mocker.patch.object(kubernetes.config, 'load_incluster_config') - load_incluster_config.side_effect = kubernetes.config.ConfigException +def test_direct_auth_works_kubeconfig(login_mocks): + login_mocks.pykube_in_cluster.side_effect = FileNotFoundError + login_mocks.client_in_cluster.side_effect = kubernetes.config.ConfigException login() - assert load_incluster_config.called - assert load_kube_config.called - assert core_api.called # to verify that auth worked + assert login_mocks.pykube_in_cluster.called + assert login_mocks.pykube_from_file.called + assert login_mocks.client_in_cluster.called + assert login_mocks.client_from_file.called -def test_direct_auth_fails(mocker): - # We do not test the client, we assume it works when used properly. - core_api = mocker.patch.object(kubernetes.client, 'CoreApi') - load_kube_config = mocker.patch.object(kubernetes.config, 'load_kube_config') - load_incluster_config = mocker.patch.object(kubernetes.config, 'load_incluster_config') - load_incluster_config.side_effect = kubernetes.config.ConfigException - load_kube_config.side_effect = kubernetes.config.ConfigException +def test_direct_auth_fails_on_errors_in_pykube(login_mocks): + login_mocks.pykube_in_cluster.side_effect = FileNotFoundError + login_mocks.pykube_from_file.side_effect = FileNotFoundError with pytest.raises(LoginError): login() - assert load_incluster_config.called - assert load_kube_config.called - assert not core_api.called # to verify that auth worked + assert login_mocks.pykube_in_cluster.called + assert login_mocks.pykube_from_file.called + assert not login_mocks.client_in_cluster.called # because pykube failed + assert not login_mocks.client_from_file.called # because pykube failed -def test_direct_api_fails(mocker): - # We do not test the client, we assume it works when used properly. - core_api = mocker.patch.object(kubernetes.client, 'CoreApi') - load_kube_config = mocker.patch.object(kubernetes.config, 'load_kube_config') - load_incluster_config = mocker.patch.object(kubernetes.config, 'load_incluster_config') - core_api.side_effect = kubernetes.client.rest.ApiException(status=401) +def test_direct_auth_fails_on_errors_in_client(login_mocks): + login_mocks.client_in_cluster.side_effect = kubernetes.config.ConfigException + login_mocks.client_from_file.side_effect = kubernetes.config.ConfigException with pytest.raises(LoginError): login() - assert load_incluster_config.called - assert not load_kube_config.called - assert core_api.called # to verify that auth worked + assert login_mocks.pykube_in_cluster.called + assert not login_mocks.pykube_from_file.called + assert login_mocks.client_in_cluster.called + assert login_mocks.client_from_file.called -def test_clirun_auth_works_incluster(invoke, mocker, preload, real_run): - # We do not test the client, we assume it works when used properly. - core_api = mocker.patch.object(kubernetes.client, 'CoreApi') - load_kube_config = mocker.patch.object(kubernetes.config, 'load_kube_config') - load_incluster_config = mocker.patch.object(kubernetes.config, 'load_incluster_config') +def test_direct_check_fails_on_errors_in_pykube(login_mocks): + response = requests.Response() + response.status_code = 401 + login_mocks.pykube_checker.side_effect = requests.exceptions.HTTPError(response=response) + + login() + with pytest.raises(AccessError): + verify() + + assert login_mocks.pykube_in_cluster.called + assert not login_mocks.pykube_from_file.called + assert login_mocks.client_in_cluster.called + assert not login_mocks.client_from_file.called + assert login_mocks.pykube_checker.called + assert not login_mocks.client_checker.called # because pykube failed + + +def test_direct_check_fails_on_errors_in_client(login_mocks): + login_mocks.client_checker.side_effect = kubernetes.client.rest.ApiException(status=401) + + login() + with pytest.raises(AccessError): + verify() + + assert login_mocks.pykube_in_cluster.called + assert not login_mocks.pykube_from_file.called + assert login_mocks.client_in_cluster.called + assert not login_mocks.client_from_file.called + assert login_mocks.pykube_checker.called + assert login_mocks.client_checker.called + + +def test_clirun_auth_works_incluster(invoke, login_mocks, preload, real_run): result = invoke(['run']) assert result.exit_code == 0 - assert load_incluster_config.called - assert not load_kube_config.called - assert core_api.called # to verify that auth worked + assert login_mocks.pykube_in_cluster.called + assert not login_mocks.pykube_from_file.called + assert login_mocks.client_in_cluster.called + assert not login_mocks.client_from_file.called + assert login_mocks.pykube_checker.called + assert login_mocks.client_checker.called -def test_clirun_auth_works_kubeconfig(invoke, mocker, preload, real_run): - # We do not test the client, we assume it works when used properly. - core_api = mocker.patch.object(kubernetes.client, 'CoreApi') - load_kube_config = mocker.patch.object(kubernetes.config, 'load_kube_config') - load_incluster_config = mocker.patch.object(kubernetes.config, 'load_incluster_config') - load_incluster_config.side_effect = kubernetes.config.ConfigException +def test_clirun_auth_works_kubeconfig(invoke, login_mocks, preload, real_run): + login_mocks.pykube_in_cluster.side_effect = FileNotFoundError + login_mocks.client_in_cluster.side_effect = kubernetes.config.ConfigException result = invoke(['run']) assert result.exit_code == 0 - assert load_incluster_config.called - assert load_kube_config.called - assert core_api.called # to verify that auth worked + assert login_mocks.pykube_in_cluster.called + assert login_mocks.pykube_from_file.called + assert login_mocks.client_in_cluster.called + assert login_mocks.client_from_file.called + assert login_mocks.pykube_checker.called + assert login_mocks.client_checker.called + + +def test_clirun_auth_fails_on_errors_in_pykube(invoke, login_mocks, preload, real_run): + login_mocks.pykube_in_cluster.side_effect = FileNotFoundError + login_mocks.pykube_from_file.side_effect = FileNotFoundError + result = invoke(['run']) + assert result.exit_code != 0 + assert 'neither in-cluster, nor via kubeconfig' in result.stdout -def test_clirun_auth_fails(invoke, mocker, preload, real_run): - # We do not test the client, we assume it works when used properly. - core_api = mocker.patch.object(kubernetes.client, 'CoreApi') - load_kube_config = mocker.patch.object(kubernetes.config, 'load_kube_config') - load_incluster_config = mocker.patch.object(kubernetes.config, 'load_incluster_config') - load_incluster_config.side_effect = kubernetes.config.ConfigException - load_kube_config.side_effect = kubernetes.config.ConfigException + assert login_mocks.pykube_in_cluster.called + assert login_mocks.pykube_from_file.called + assert not login_mocks.client_in_cluster.called # because pykube failed + assert not login_mocks.client_from_file.called # because pykube failed + assert not login_mocks.pykube_checker.called + assert not login_mocks.client_checker.called + + +def test_clirun_auth_fails_on_errors_in_client(invoke, login_mocks, preload, real_run): + login_mocks.client_in_cluster.side_effect = kubernetes.config.ConfigException + login_mocks.client_from_file.side_effect = kubernetes.config.ConfigException result = invoke(['run']) assert result.exit_code != 0 assert 'neither in-cluster, nor via kubeconfig' in result.stdout - assert load_incluster_config.called - assert load_kube_config.called - assert not core_api.called # to verify that auth worked + assert login_mocks.pykube_in_cluster.called + assert not login_mocks.pykube_from_file.called + assert login_mocks.client_in_cluster.called + assert login_mocks.client_from_file.called + assert not login_mocks.pykube_checker.called + assert not login_mocks.client_checker.called + + +def test_clirun_check_fails_on_errors_in_pykube(invoke, login_mocks, preload, real_run): + response = requests.Response() + response.status_code = 401 + login_mocks.pykube_checker.side_effect = requests.exceptions.HTTPError(response=response) + + result = invoke(['run']) + assert result.exit_code != 0 + assert 'Please login or configure the tokens' in result.stdout + + assert login_mocks.pykube_in_cluster.called + assert not login_mocks.pykube_from_file.called + assert login_mocks.client_in_cluster.called + assert not login_mocks.client_from_file.called + assert login_mocks.pykube_checker.called + assert not login_mocks.client_checker.called # because pykube failed -def test_clirun_api_fails(invoke, mocker, preload, real_run): - # We do not test the client, we assume it works when used properly. - core_api = mocker.patch.object(kubernetes.client, 'CoreApi') - load_kube_config = mocker.patch.object(kubernetes.config, 'load_kube_config') - load_incluster_config = mocker.patch.object(kubernetes.config, 'load_incluster_config') - core_api.side_effect = kubernetes.client.rest.ApiException(status=401) +def test_clirun_check_fails_on_errors_in_client(invoke, login_mocks, preload, real_run): + login_mocks.client_checker.side_effect = kubernetes.client.rest.ApiException(status=401) result = invoke(['run']) assert result.exit_code != 0 assert 'Please login or configure the tokens' in result.stdout - assert load_incluster_config.called - assert not load_kube_config.called - assert core_api.called # to verify that auth worked + assert login_mocks.pykube_in_cluster.called + assert not login_mocks.pykube_from_file.called + assert login_mocks.client_in_cluster.called + assert not login_mocks.client_from_file.called + assert login_mocks.pykube_checker.called + assert login_mocks.client_checker.called diff --git a/tests/conftest.py b/tests/conftest.py index cb48a5dd..b0648464 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,11 +1,16 @@ import asyncio +import dataclasses import io +import json import logging import os import re import time +from unittest.mock import Mock import asynctest +import kubernetes.client.rest +import pykube import pytest import pytest_mock @@ -16,6 +21,7 @@ def pytest_configure(config): config.addinivalue_line('markers', "e2e: end-to-end tests with real operators.") + config.addinivalue_line('markers', "resource_clustered: (internal parameterizatiom mark).") # This logic is not applied if pytest is started explicitly on ./examples/. @@ -60,6 +66,82 @@ def resource(): """ The resource used in the tests. Usually mocked, so it does not matter. """ return Resource('zalando.org', 'v1', 'kopfexamples') +# +# Mocks for Kubernetes API clients (any of them). Reasons: +# 1. We do not test the clients, we test the layers on top of them, +# so everything low-level should be mocked and assumed to be functional. +# 2. No external calls must be made under any circumstances. +# The unit-tests must be fully isolated from the environment. +# + +@pytest.fixture() +def req_mock(mocker, resource, request): + + # Pykube config is needed to create a pykube's API instance. + # But we do not want and do not need to actually authenticate, so we mock. + # Some fields are used by pykube's objects: we have to know them ("leaky abstractions"). + cfg_mock = mocker.patch('kopf.clients.auth.get_pykube_cfg').return_value + cfg_mock.cluster = {'server': 'localhost'} + cfg_mock.namespace = 'default' + + # Simulated list of cluster-defined CRDs: all of them at once. See: `resource` fixture(s). + # Simulate the resource as cluster-scoped is there is a marker on the test. + namespaced = not any(marker.name == 'resource_clustered' for marker in request.node.own_markers) + res_mock = mocker.patch('pykube.http.HTTPClient.resource_list') + res_mock.return_value = {'resources': [ + {'name': 'kopfexamples', 'kind': 'KopfExample', 'namespaced': namespaced}, + ]} + + # Prevent ANY outer requests, no matter what. These ones are usually asserted. + req_mock = mocker.patch('requests.Session').return_value + return req_mock + + +@pytest.fixture() +def stream(req_mock): + """ A mock for the stream of events as if returned by K8s client. """ + def feed(*args): + side_effect = [] + for arg in args: + if isinstance(arg, (list, tuple)): + arg = iter(json.dumps(event).encode('utf-8') for event in arg) + side_effect.append(arg) + req_mock.get.return_value.iter_lines.side_effect = side_effect + return Mock(spec_set=['feed'], feed=feed) + +# +# Mocks for login & checks. Used in specifialised login tests, +# and in all CLI tests (since login is implicit with CLI commands). +# + +@dataclasses.dataclass(frozen=True, eq=False, order=False) +class LoginMocks: + pykube_in_cluster: Mock + pykube_from_file: Mock + pykube_checker: Mock + client_in_cluster: Mock + client_from_file: Mock + client_checker: Mock + + +@pytest.fixture() +def login_mocks(mocker): + # Pykube config is needed to create a pykube's API instance. + # But we do not want and do not need to actually authenticate, so we mock. + # Some fields are used by pykube's objects: we have to know them ("leaky abstractions"). + cfg_mock = mocker.patch('kopf.clients.auth.get_pykube_cfg').return_value + cfg_mock.cluster = {'server': 'localhost'} + cfg_mock.namespace = 'default' + + return LoginMocks( + pykube_in_cluster=mocker.patch.object(pykube.KubeConfig, 'from_service_account'), + pykube_from_file=mocker.patch.object(pykube.KubeConfig, 'from_file'), + pykube_checker=mocker.patch.object(pykube.http.HTTPClient, 'get'), + client_in_cluster=mocker.patch.object(kubernetes.config, 'load_incluster_config'), + client_from_file=mocker.patch.object(kubernetes.config, 'load_kube_config'), + client_checker=mocker.patch.object(kubernetes.client, 'CoreApi'), + ) + # # Helpers for the timing checks. # diff --git a/tests/e2e/test_examples.py b/tests/e2e/test_examples.py index c6e3f78e..0303a3c1 100644 --- a/tests/e2e/test_examples.py +++ b/tests/e2e/test_examples.py @@ -29,7 +29,7 @@ def test_all_examples_are_runnable(mocker, with_crd, with_peering, exampledir): mocker.patch('kopf.reactor.handling.DEFAULT_RETRY_DELAY', 1) # To prevent lengthy threads in the loop executor when the process exits. - mocker.patch('kopf.clients.watching.DEFAULT_STREAM_TIMEOUT', 10) + mocker.patch('kopf.config.WatchersConfig.default_stream_timeout', 10) # Run an operator and simulate some activity with the operated resource. with KopfRunner(['run', '--verbose', str(example_py)]) as runner: diff --git a/tests/handling/conftest.py b/tests/handling/conftest.py index c397a498..e25fd6d6 100644 --- a/tests/handling/conftest.py +++ b/tests/handling/conftest.py @@ -39,7 +39,6 @@ from unittest.mock import Mock import pytest -from kubernetes.client.rest import ApiException # to avoid mocking it import kopf from kopf.reactor.causation import Cause, RESUME @@ -53,13 +52,7 @@ class K8sMocks: @pytest.fixture(autouse=True) -def k8s_mocked(mocker): - """ Prevent any actual K8s calls.""" - - # TODO: consolidate with tests/k8s/conftest.py:client_mock() - client_mock = mocker.patch('kubernetes.client') - client_mock.rest.ApiException = ApiException # to be raises and caught - +def k8s_mocked(mocker, req_mock): # We mock on the level of our own K8s API wrappers, not the K8s client. return K8sMocks( patch_obj=mocker.patch('kopf.clients.patching.patch_obj'), diff --git a/tests/k8s/conftest.py b/tests/k8s/conftest.py index 92515c0e..62f0b576 100644 --- a/tests/k8s/conftest.py +++ b/tests/k8s/conftest.py @@ -1,19 +1,6 @@ import pytest -from kubernetes.client import V1Event as V1Event -from kubernetes.client import V1EventSource as V1EventSource -from kubernetes.client import V1ObjectMeta as V1ObjectMeta -from kubernetes.client import V1beta1Event as V1beta1Event -from kubernetes.client.rest import ApiException # to avoid mocking it -# We do not test the Kubernetes client, so everything there should be mocked. -# Also, no external calls must be made under any circumstances. Hence, auto-use. @pytest.fixture(autouse=True) -def client_mock(mocker): - client_mock = mocker.patch('kubernetes.client') - client_mock.rest.ApiException = ApiException # to be raises and caught - client_mock.V1Event = V1Event - client_mock.V1beta1Event = V1beta1Event - client_mock.V1EventSource = V1EventSource - client_mock.V1ObjectMeta = V1ObjectMeta - return client_mock +def _autouse_req_mock(req_mock): + pass diff --git a/tests/k8s/test_events.py b/tests/k8s/test_events.py index 370f32de..515daea9 100644 --- a/tests/k8s/test_events.py +++ b/tests/k8s/test_events.py @@ -1,15 +1,12 @@ +import json + import pytest -from asynctest import call, ANY +import requests from kopf.clients.events import post_event -async def test_posting(client_mock): - result = object() - apicls_mock = client_mock.CoreV1Api - apicls_mock.return_value.create_namespaced_event.return_value = result - postfn_mock = apicls_mock.return_value.create_namespaced_event - +async def test_posting(req_mock): obj = {'apiVersion': 'group/version', 'kind': 'kind', 'metadata': {'namespace': 'ns', @@ -17,29 +14,22 @@ async def test_posting(client_mock): 'uid': 'uid'}} await post_event(obj=obj, type='type', reason='reason', message='message') - assert postfn_mock.called - assert postfn_mock.call_count == 1 - assert postfn_mock.call_args_list == [call( - namespace='ns', # same as the object's namespace - body=ANY, - )] - - event = postfn_mock.call_args_list[0][1]['body'] - assert event.type == 'type' - assert event.reason == 'reason' - assert event.message == 'message' - assert event.source.component == 'kopf' - assert event.involved_object['apiVersion'] == 'group/version' - assert event.involved_object['kind'] == 'kind' - assert event.involved_object['namespace'] == 'ns' - assert event.involved_object['name'] == 'name' - assert event.involved_object['uid'] == 'uid' - - -async def test_type_is_v1_not_v1beta1(client_mock): - apicls_mock = client_mock.CoreV1Api - postfn_mock = apicls_mock.return_value.create_namespaced_event + assert req_mock.post.called + assert req_mock.post.call_count == 1 + + data = json.loads(req_mock.post.call_args_list[0][1]['data']) + assert data['type'] == 'type' + assert data['reason'] == 'reason' + assert data['message'] == 'message' + assert data['source']['component'] == 'kopf' + assert data['involvedObject']['apiVersion'] == 'group/version' + assert data['involvedObject']['kind'] == 'kind' + assert data['involvedObject']['namespace'] == 'ns' + assert data['involvedObject']['name'] == 'name' + assert data['involvedObject']['uid'] == 'uid' + +async def test_type_is_v1_not_v1beta1(req_mock): obj = {'apiVersion': 'group/version', 'kind': 'kind', 'metadata': {'namespace': 'ns', @@ -47,16 +37,17 @@ async def test_type_is_v1_not_v1beta1(client_mock): 'uid': 'uid'}} await post_event(obj=obj, type='type', reason='reason', message='message') - event = postfn_mock.call_args_list[0][1]['body'] - assert isinstance(event, client_mock.V1Event) - assert not isinstance(event, client_mock.V1beta1Event) + assert req_mock.post.called + + url = req_mock.post.call_args_list[0][1]['url'] + assert 'v1beta1' not in url + assert '/api/v1/namespaces/ns/events' in url -async def test_api_errors_logged_but_suppressed(client_mock, assert_logs): - error = client_mock.rest.ApiException('boo!') - apicls_mock = client_mock.CoreV1Api - apicls_mock.return_value.create_namespaced_event.side_effect = error - postfn_mock = apicls_mock.return_value.create_namespaced_event +async def test_api_errors_logged_but_suppressed(req_mock, assert_logs): + response = requests.Response() + error = requests.exceptions.HTTPError("boo!", response=response) + req_mock.post.side_effect = error obj = {'apiVersion': 'group/version', 'kind': 'kind', @@ -65,16 +56,15 @@ async def test_api_errors_logged_but_suppressed(client_mock, assert_logs): 'uid': 'uid'}} await post_event(obj=obj, type='type', reason='reason', message='message') - assert postfn_mock.called + assert req_mock.post.called assert_logs([ "Failed to post an event.*boo!", ]) -async def test_regular_errors_escalate(client_mock): +async def test_regular_errors_escalate(req_mock): error = Exception('boo!') - apicls_mock = client_mock.CoreV1Api - apicls_mock.return_value.create_namespaced_event.side_effect = error + req_mock.post.side_effect = error obj = {'apiVersion': 'group/version', 'kind': 'kind', @@ -88,12 +78,7 @@ async def test_regular_errors_escalate(client_mock): assert excinfo.value is error -async def test_message_is_cut_to_max_length(client_mock): - result = object() - apicls_mock = client_mock.CoreV1Api - apicls_mock.return_value.create_namespaced_event.return_value = result - postfn_mock = apicls_mock.return_value.create_namespaced_event - +async def test_message_is_cut_to_max_length(req_mock): obj = {'apiVersion': 'group/version', 'kind': 'kind', 'metadata': {'namespace': 'ns', @@ -102,8 +87,8 @@ async def test_message_is_cut_to_max_length(client_mock): message = 'start' + ('x' * 2048) + 'end' await post_event(obj=obj, type='type', reason='reason', message=message) - event = postfn_mock.call_args_list[0][1]['body'] - assert len(event.message) <= 1024 # max supported API message length - assert '...' in event.message - assert event.message.startswith('start') - assert event.message.endswith('end') + data = json.loads(req_mock.post.call_args_list[0][1]['data']) + assert len(data['message']) <= 1024 # max supported API message length + assert '...' in data['message'] + assert data['message'].startswith('start') + assert data['message'].endswith('end') diff --git a/tests/k8s/test_list_objs.py b/tests/k8s/test_list_objs.py index 6b9af8d7..6a5b5c4a 100644 --- a/tests/k8s/test_list_objs.py +++ b/tests/k8s/test_list_objs.py @@ -1,59 +1,46 @@ -import kubernetes.client.rest import pytest -from asynctest import call +import requests from kopf.clients.fetching import list_objs -def test_when_successful_clustered(client_mock, resource): - result = object() - apicls_mock = client_mock.CustomObjectsApi - apicls_mock.return_value.list_cluster_custom_object.return_value = result - apicls_mock.return_value.list_namespaced_custom_object.return_value = result - sidefn_mock = apicls_mock.return_value.list_namespaced_custom_object - mainfn_mock = apicls_mock.return_value.list_cluster_custom_object +def test_when_successful_clustered(req_mock, resource): + result = {'items': []} + req_mock.get.return_value.json.return_value = result lst = list_objs(resource=resource, namespace=None) assert lst is result - assert not sidefn_mock.called - assert mainfn_mock.call_count == 1 - assert mainfn_mock.call_args_list == [call( - group=resource.group, - version=resource.version, - plural=resource.plural, - )] + assert req_mock.get.called + assert req_mock.get.call_count == 1 + url = req_mock.get.call_args_list[0][1]['url'] + assert 'apis/zalando.org/v1/kopfexamples' in url + assert 'namespaces/' not in url -def test_when_successful_namespaced(client_mock, resource): - result = object() - apicls_mock = client_mock.CustomObjectsApi - apicls_mock.return_value.list_cluster_custom_object.return_value = result - apicls_mock.return_value.list_namespaced_custom_object.return_value = result - sidefn_mock = apicls_mock.return_value.list_cluster_custom_object - mainfn_mock = apicls_mock.return_value.list_namespaced_custom_object + +def test_when_successful_namespaced(req_mock, resource): + result = {'items': []} + req_mock.get.return_value.json.return_value = result lst = list_objs(resource=resource, namespace='ns1') assert lst is result - assert not sidefn_mock.called - assert mainfn_mock.call_count == 1 - assert mainfn_mock.call_args_list == [call( - group=resource.group, - version=resource.version, - plural=resource.plural, - namespace='ns1', - )] + assert req_mock.get.called + assert req_mock.get.call_count == 1 + + url = req_mock.get.call_args_list[0][1]['url'] + assert 'apis/zalando.org/v1/namespaces/ns1/kopfexamples' in url @pytest.mark.parametrize('namespace', [None, 'ns1'], ids=['without-namespace', 'with-namespace']) @pytest.mark.parametrize('status', [400, 401, 403, 500, 666]) -def test_raises_api_error(client_mock, resource, namespace, status): - error = kubernetes.client.rest.ApiException(status=status) - apicls_mock = client_mock.CustomObjectsApi - apicls_mock.return_value.list_cluster_custom_object.side_effect = error - apicls_mock.return_value.list_namespaced_custom_object.side_effect = error +def test_raises_api_error(req_mock, resource, namespace, status): + response = requests.Response() + response.status_code = status + error = requests.exceptions.HTTPError("boo!", response=response) + req_mock.get.side_effect = error - with pytest.raises(kubernetes.client.rest.ApiException) as e: + with pytest.raises(requests.exceptions.HTTPError) as e: list_objs(resource=resource, namespace=namespace) - assert e.value.status == status + assert e.value.response.status_code == status diff --git a/tests/k8s/test_make_list_fn.py b/tests/k8s/test_make_list_fn.py deleted file mode 100644 index 8ea5e99e..00000000 --- a/tests/k8s/test_make_list_fn.py +++ /dev/null @@ -1,92 +0,0 @@ -import pydoc - -import kubernetes.client.rest -import pytest -from asynctest import call - -from kopf.clients.fetching import make_list_fn - - -def test_when_present_clustered(client_mock, resource): - result = object() - apicls_mock = client_mock.CustomObjectsApi - apicls_mock.return_value.list_cluster_custom_object.return_value = result - apicls_mock.return_value.list_namespaced_custom_object.return_value = result - sidefn_mock = apicls_mock.return_value.list_namespaced_custom_object - mainfn_mock = apicls_mock.return_value.list_cluster_custom_object - - fn = make_list_fn(resource=resource, namespace=None) - assert callable(fn) - - assert not sidefn_mock.called - assert not mainfn_mock.called - - res = fn(opt1='val1', opt2=123) - assert res is result - - assert sidefn_mock.call_count == 0 - assert mainfn_mock.call_count == 1 - assert mainfn_mock.call_args_list == [call( - group=resource.group, - version=resource.version, - plural=resource.plural, - opt1='val1', opt2=123, - )] - - -def test_when_present_namespaced(client_mock, resource): - result = object() - apicls_mock = client_mock.CustomObjectsApi - apicls_mock.return_value.list_cluster_custom_object.return_value = result - apicls_mock.return_value.list_namespaced_custom_object.return_value = result - sidefn_mock = apicls_mock.return_value.list_cluster_custom_object - mainfn_mock = apicls_mock.return_value.list_namespaced_custom_object - - fn = make_list_fn(resource=resource, namespace='ns1') - assert callable(fn) - - assert not sidefn_mock.called - assert not mainfn_mock.called - - res = fn(opt1='val1', opt2=123) - assert res is result - - assert sidefn_mock.call_count == 0 - assert mainfn_mock.call_count == 1 - assert mainfn_mock.call_args_list == [call( - group=resource.group, - version=resource.version, - plural=resource.plural, - namespace='ns1', - opt1='val1', opt2=123, - )] - - -@pytest.mark.parametrize('namespace', [None, 'ns1'], ids=['without-namespace', 'with-namespace']) -@pytest.mark.parametrize('status', [400, 401, 403, 404, 500, 666]) -def test_raises_api_error(client_mock, resource, namespace, status): - error = kubernetes.client.rest.ApiException(status=status) - apicls_mock = client_mock.CustomObjectsApi - apicls_mock.return_value.list_cluster_custom_object.side_effect = error - apicls_mock.return_value.list_namespaced_custom_object.side_effect = error - - fn = make_list_fn(resource=resource, namespace=namespace) - with pytest.raises(kubernetes.client.rest.ApiException) as e: - fn(opt1='val1', opt2=123) - assert e.value.status == status - - -@pytest.mark.parametrize('namespace', [None, 'ns1'], ids=['without-namespace', 'with-namespace']) -def test_docstrings_are_preserved(client_mock, resource, namespace): - # Docstrings are important! Kubernetes client uses them to guess - # the returned object types and the parameters type. - docstring = """some doc \n :return: sometype""" - - apicls_mock = client_mock.CustomObjectsApi - apicls_mock.return_value.list_cluster_custom_object.__doc__ = docstring - apicls_mock.return_value.list_namespaced_custom_object.__doc__ = docstring - - fn = make_list_fn(resource=resource, namespace=namespace) - fn_docstring = pydoc.getdoc(fn) # same as k8s client does this - assert isinstance(fn_docstring, str) - assert ':return: sometype' in docstring # it will be reformatted diff --git a/tests/k8s/test_patching.py b/tests/k8s/test_patching.py index 2ce9b32b..6076508e 100644 --- a/tests/k8s/test_patching.py +++ b/tests/k8s/test_patching.py @@ -1,132 +1,100 @@ -import asyncio +import json import pytest -from asynctest import call from kopf.clients.patching import patch_obj -async def test_by_name_clustered(client_mock, resource): - patch = object() - apicls_mock = client_mock.CustomObjectsApi - sidefn_mock = apicls_mock.return_value.patch_namespaced_custom_object - mainfn_mock = apicls_mock.return_value.patch_cluster_custom_object - +@pytest.mark.resource_clustered # see `req_mock` +async def test_by_name_clustered(req_mock, resource): + patch = {'x': 'y'} res = await patch_obj(resource=resource, namespace=None, name='name1', patch=patch) assert res is None # never return any k8s-client specific things - assert not sidefn_mock.called - assert mainfn_mock.call_count == 1 - assert mainfn_mock.call_args_list == [call( - group=resource.group, - version=resource.version, - plural=resource.plural, - name='name1', - body=patch, - )] + assert req_mock.patch.called + assert req_mock.patch.call_count == 1 + + url = req_mock.patch.call_args_list[0][1]['url'] + assert 'namespaces/' not in url + assert f'apis/{resource.api_version}/{resource.plural}/name1' in url + data = json.loads(req_mock.patch.call_args_list[0][1]['data']) + assert data == {'x': 'y'} -async def test_by_name_namespaced(client_mock, resource): - patch = object() - apicls_mock = client_mock.CustomObjectsApi - sidefn_mock = apicls_mock.return_value.patch_cluster_custom_object - mainfn_mock = apicls_mock.return_value.patch_namespaced_custom_object +async def test_by_name_namespaced(req_mock, resource): + patch = {'x': 'y'} res = await patch_obj(resource=resource, namespace='ns1', name='name1', patch=patch) assert res is None # never return any k8s-client specific things - assert not sidefn_mock.called - assert mainfn_mock.call_count == 1 - assert mainfn_mock.call_args_list == [call( - group=resource.group, - version=resource.version, - plural=resource.plural, - namespace='ns1', - name='name1', - body=patch, - )] + assert req_mock.patch.called + assert req_mock.patch.call_count == 1 + url = req_mock.patch.call_args_list[0][1]['url'] + assert 'namespaces/' in url + assert f'apis/{resource.api_version}/namespaces/ns1/{resource.plural}/name1' in url -async def test_by_body_clustered(client_mock, resource): - patch = object() - apicls_mock = client_mock.CustomObjectsApi - sidefn_mock = apicls_mock.return_value.patch_namespaced_custom_object - mainfn_mock = apicls_mock.return_value.patch_cluster_custom_object + data = json.loads(req_mock.patch.call_args_list[0][1]['data']) + assert data == {'x': 'y'} + +@pytest.mark.resource_clustered # see `req_mock` +async def test_by_body_clustered(req_mock, resource): + patch = {'x': 'y'} body = {'metadata': {'name': 'name1'}} res = await patch_obj(resource=resource, body=body, patch=patch) assert res is None # never return any k8s-client specific things - assert not sidefn_mock.called - assert mainfn_mock.call_count == 1 - assert mainfn_mock.call_args_list == [call( - group=resource.group, - version=resource.version, - plural=resource.plural, - name='name1', - body=patch, - )] + assert req_mock.patch.called + assert req_mock.patch.call_count == 1 + + url = req_mock.patch.call_args_list[0][1]['url'] + assert 'namespaces/' not in url + assert f'apis/{resource.api_version}/{resource.plural}/name1' in url + data = json.loads(req_mock.patch.call_args_list[0][1]['data']) + assert data == {'x': 'y'} -async def test_by_body_namespaced(client_mock, resource): - patch = object() - apicls_mock = client_mock.CustomObjectsApi - sidefn_mock = apicls_mock.return_value.patch_cluster_custom_object - mainfn_mock = apicls_mock.return_value.patch_namespaced_custom_object +async def test_by_body_namespaced(req_mock, resource): + patch = {'x': 'y'} body = {'metadata': {'namespace': 'ns1', 'name': 'name1'}} res = await patch_obj(resource=resource, body=body, patch=patch) assert res is None # never return any k8s-client specific things - assert not sidefn_mock.called - assert mainfn_mock.call_count == 1 - assert mainfn_mock.call_args_list == [call( - group=resource.group, - version=resource.version, - plural=resource.plural, - namespace='ns1', - name='name1', - body=patch, - )] + assert req_mock.patch.called + assert req_mock.patch.call_count == 1 + url = req_mock.patch.call_args_list[0][1]['url'] + assert 'namespaces/' in url + assert f'apis/{resource.api_version}/namespaces/ns1/{resource.plural}/name1' in url -async def test_raises_when_body_conflicts_with_namespace(client_mock, resource): - patch = object() - apicls_mock = client_mock.CustomObjectsApi - sidefn_mock = apicls_mock.return_value.patch_namespaced_custom_object - mainfn_mock = apicls_mock.return_value.patch_cluster_custom_object + data = json.loads(req_mock.patch.call_args_list[0][1]['data']) + assert data == {'x': 'y'} + +async def test_raises_when_body_conflicts_with_namespace(req_mock, resource): + patch = {'x': 'y'} body = {'metadata': {'namespace': 'ns1', 'name': 'name1'}} with pytest.raises(TypeError): await patch_obj(resource=resource, body=body, namespace='ns1', patch=patch) - assert not sidefn_mock.called - assert not mainfn_mock.called - + assert not req_mock.patch.called -async def test_raises_when_body_conflicts_with_name(client_mock, resource): - patch = object() - apicls_mock = client_mock.CustomObjectsApi - sidefn_mock = apicls_mock.return_value.patch_namespaced_custom_object - mainfn_mock = apicls_mock.return_value.patch_cluster_custom_object +async def test_raises_when_body_conflicts_with_name(req_mock, resource): + patch = {'x': 'y'} body = {'metadata': {'namespace': 'ns1', 'name': 'name1'}} with pytest.raises(TypeError): await patch_obj(resource=resource, body=body, name='name1', patch=patch) - assert not sidefn_mock.called - assert not mainfn_mock.called - + assert not req_mock.patch.called -async def test_raises_when_body_conflicts_with_ids(client_mock, resource): - patch = object() - apicls_mock = client_mock.CustomObjectsApi - sidefn_mock = apicls_mock.return_value.patch_namespaced_custom_object - mainfn_mock = apicls_mock.return_value.patch_cluster_custom_object +async def test_raises_when_body_conflicts_with_ids(req_mock, resource): + patch = {'x': 'y'} body = {'metadata': {'namespace': 'ns1', 'name': 'name1'}} with pytest.raises(TypeError): await patch_obj(resource=resource, body=body, namespace='ns1', name='name1', patch=patch) - assert not sidefn_mock.called - assert not mainfn_mock.called + assert not req_mock.patch.called diff --git a/tests/k8s/test_read_crd.py b/tests/k8s/test_read_crd.py index 7d2655b5..24929d10 100644 --- a/tests/k8s/test_read_crd.py +++ b/tests/k8s/test_read_crd.py @@ -1,55 +1,54 @@ -import kubernetes import pytest -from asynctest import call +import requests from kopf.clients.fetching import read_crd -def test_when_present(client_mock, resource): - apicls_mock = client_mock.ApiextensionsV1beta1Api - readfn_mock = apicls_mock.return_value.read_custom_resource_definition - result_mock = readfn_mock.return_value +def test_when_present(req_mock, resource): + result = {} + req_mock.get.return_value.json.return_value = result crd = read_crd(resource=resource) - assert crd is result_mock + assert crd is result - assert apicls_mock.called - assert apicls_mock.call_count == 1 - assert readfn_mock.called - assert readfn_mock.call_count == 1 - assert readfn_mock.call_args_list == [ - call(name=f'{resource.plural}.{resource.group}') - ] + assert req_mock.get.called + assert req_mock.get.call_count == 1 + + url = req_mock.get.call_args_list[0][1]['url'] + assert '/customresourcedefinitions/kopfexamples.zalando.org' in url @pytest.mark.parametrize('status', [403, 404]) -def test_when_absent_with_no_default(client_mock, resource, status): - error = kubernetes.client.rest.ApiException(status=status) - apicls_mock = client_mock.ApiextensionsV1beta1Api - apicls_mock.return_value.read_custom_resource_definition.side_effect = error +def test_when_absent_with_no_default(req_mock, resource, status): + response = requests.Response() + response.status_code = status + error = requests.exceptions.HTTPError("boo!", response=response) + req_mock.get.side_effect = error - with pytest.raises(kubernetes.client.rest.ApiException) as e: + with pytest.raises(requests.exceptions.HTTPError) as e: read_crd(resource=resource) - assert e.value.status == status + assert e.value.response.status_code == status @pytest.mark.parametrize('default', [None, object()], ids=['none', 'object']) @pytest.mark.parametrize('status', [403, 404]) -def test_when_absent_with_default(client_mock, resource, default, status): - error = kubernetes.client.rest.ApiException(status=status) - apicls_mock = client_mock.ApiextensionsV1beta1Api - apicls_mock.return_value.read_custom_resource_definition.side_effect = error +def test_when_absent_with_default(req_mock, resource, default, status): + response = requests.Response() + response.status_code = status + error = requests.exceptions.HTTPError("boo!", response=response) + req_mock.get.side_effect = error crd = read_crd(resource=resource, default=default) assert crd is default @pytest.mark.parametrize('status', [400, 401, 500, 666]) -def test_raises_api_error_despite_default(client_mock, resource, status): - error = kubernetes.client.rest.ApiException(status=status) - apicls_mock = client_mock.ApiextensionsV1beta1Api - apicls_mock.return_value.read_custom_resource_definition.side_effect = error +def test_raises_api_error_despite_default(req_mock, resource, status): + response = requests.Response() + response.status_code = status + error = requests.exceptions.HTTPError("boo!", response=response) + req_mock.get.side_effect = error - with pytest.raises(kubernetes.client.rest.ApiException) as e: + with pytest.raises(requests.exceptions.HTTPError) as e: read_crd(resource=resource, default=object()) - assert e.value.status == status + assert e.value.response.status_code == status diff --git a/tests/k8s/test_read_obj.py b/tests/k8s/test_read_obj.py index 33b8cf13..08294fdf 100644 --- a/tests/k8s/test_read_obj.py +++ b/tests/k8s/test_read_obj.py @@ -1,87 +1,74 @@ -import kubernetes.client.rest +import pykube import pytest -from asynctest import call +import requests from kopf.clients.fetching import read_obj -def test_when_present_clustered(client_mock, resource): - result = object() - apicls_mock = client_mock.CustomObjectsApi - apicls_mock.return_value.get_cluster_custom_object.return_value = result - apicls_mock.return_value.get_namespaced_custom_object.return_value = result - sidefn_mock = apicls_mock.return_value.get_namespaced_custom_object - mainfn_mock = apicls_mock.return_value.get_cluster_custom_object +@pytest.mark.resource_clustered # see `req_mock` +def test_when_present_clustered(req_mock, resource): + result = {} + req_mock.get.return_value.json.return_value = result crd = read_obj(resource=resource, namespace=None, name='name1') assert crd is result - assert not sidefn_mock.called - assert mainfn_mock.call_count == 1 - assert mainfn_mock.call_args_list == [call( - group=resource.group, - version=resource.version, - plural=resource.plural, - name='name1', - )] + assert req_mock.get.called + assert req_mock.get.call_count == 1 + url = req_mock.get.call_args_list[0][1]['url'] + assert 'apis/zalando.org/v1/kopfexamples/name1' in url + assert 'namespaces/' not in url -def test_when_present_namespaced(client_mock, resource): - result = object() - apicls_mock = client_mock.CustomObjectsApi - apicls_mock.return_value.get_cluster_custom_object.return_value = result - apicls_mock.return_value.get_namespaced_custom_object.return_value = result - sidefn_mock = apicls_mock.return_value.get_cluster_custom_object - mainfn_mock = apicls_mock.return_value.get_namespaced_custom_object + +def test_when_present_namespaced(req_mock, resource): + result = {} + req_mock.get.return_value.json.return_value = result crd = read_obj(resource=resource, namespace='ns1', name='name1') assert crd is result - assert not sidefn_mock.called - assert mainfn_mock.call_count == 1 - assert mainfn_mock.call_args_list == [call( - group=resource.group, - version=resource.version, - plural=resource.plural, - namespace='ns1', - name='name1', - )] + assert req_mock.get.called + assert req_mock.get.call_count == 1 + + url = req_mock.get.call_args_list[0][1]['url'] + assert 'apis/zalando.org/v1/namespaces/ns1/kopfexamples/name1' in url @pytest.mark.parametrize('namespace', [None, 'ns1'], ids=['without-namespace', 'with-namespace']) -@pytest.mark.parametrize('status', [404]) -def test_when_absent_with_no_default(client_mock, resource, namespace, status): - error = kubernetes.client.rest.ApiException(status=status) - apicls_mock = client_mock.CustomObjectsApi - apicls_mock.return_value.get_cluster_custom_object.side_effect = error - apicls_mock.return_value.get_namespaced_custom_object.side_effect = error - - with pytest.raises(kubernetes.client.rest.ApiException) as e: +@pytest.mark.parametrize('status', [403, 404]) +def test_when_absent_with_no_default(req_mock, resource, namespace, status): + response = requests.Response() + response.status_code = status + error = requests.exceptions.HTTPError("boo!", response=response) + req_mock.get.side_effect = error + + with pytest.raises(requests.exceptions.HTTPError) as e: read_obj(resource=resource, namespace=namespace, name='name1') - assert e.value.status == status + assert e.value.response.status_code == status @pytest.mark.parametrize('default', [None, object()], ids=['none', 'object']) @pytest.mark.parametrize('namespace', [None, 'ns1'], ids=['without-namespace', 'with-namespace']) -@pytest.mark.parametrize('status', [404]) -def test_when_absent_with_default(client_mock, resource, namespace, default, status): - error = kubernetes.client.rest.ApiException(status=status) - apicls_mock = client_mock.CustomObjectsApi - apicls_mock.return_value.get_cluster_custom_object.side_effect = error - apicls_mock.return_value.get_namespaced_custom_object.side_effect = error +@pytest.mark.parametrize('status', [403, 404]) +def test_when_absent_with_default(req_mock, resource, namespace, default, status): + response = requests.Response() + response.status_code = status + error = requests.exceptions.HTTPError("boo!", response=response) + req_mock.get.side_effect = error crd = read_obj(resource=resource, namespace=namespace, name='name1', default=default) assert crd is default @pytest.mark.parametrize('namespace', [None, 'ns1'], ids=['without-namespace', 'with-namespace']) -@pytest.mark.parametrize('status', [400, 401, 403, 500, 666]) -def test_raises_api_error_despite_default(client_mock, resource, namespace, status): - error = kubernetes.client.rest.ApiException(status=status) - apicls_mock = client_mock.CustomObjectsApi - apicls_mock.return_value.get_cluster_custom_object.side_effect = error - apicls_mock.return_value.get_namespaced_custom_object.side_effect = error - - with pytest.raises(kubernetes.client.rest.ApiException) as e: +@pytest.mark.parametrize('status', [400, 401, 500, 666]) +def test_raises_api_error_despite_default(req_mock, resource, namespace, status): + response = requests.Response() + response.status_code = status + error = requests.exceptions.HTTPError("boo!", response=response) + req_mock.get.side_effect = error + + with pytest.raises(requests.exceptions.HTTPError) as e: read_obj(resource=resource, namespace=namespace, name='name1', default=object()) - assert e.value.status == status + assert e.value.response.status_code == status diff --git a/tests/k8s/test_scopes.py b/tests/k8s/test_scopes.py index 67aa1b30..922027ff 100644 --- a/tests/k8s/test_scopes.py +++ b/tests/k8s/test_scopes.py @@ -9,12 +9,8 @@ class PreventedActualCallError(Exception): pass -async def test_watching_over_cluster(resource, client_mock): - apicls_mock = client_mock.CustomObjectsApi - cl_list_fn = apicls_mock.return_value.list_cluster_custom_object - ns_list_fn = apicls_mock.return_value.list_namespaced_custom_object - cl_list_fn.side_effect = PreventedActualCallError() - ns_list_fn.side_effect = PreventedActualCallError() +async def test_watching_over_cluster(resource, req_mock): + req_mock.get.side_effect = PreventedActualCallError() itr = streaming_watch( resource=resource, @@ -27,25 +23,16 @@ async def test_watching_over_cluster(resource, client_mock): with pytest.raises(PreventedActualCallError): async for _ in itr: pass # fully deplete it - assert apicls_mock.called - assert apicls_mock.call_count == 1 + assert req_mock.get.called + assert req_mock.get.call_count == 1 - # Cluster-scoped listing function is used irrelevant of the resource. - assert not ns_list_fn.called - assert cl_list_fn.called - assert cl_list_fn.call_count == 1 - assert cl_list_fn.call_args[1]['group'] == resource.group - assert cl_list_fn.call_args[1]['version'] == resource.version - assert cl_list_fn.call_args[1]['plural'] == resource.plural - assert 'namespace' not in cl_list_fn.call_args[1] + url = req_mock.get.call_args_list[0][1]['url'] + assert 'apis/zalando.org/v1/kopfexamples' in url + assert 'namespaces/' not in url -async def test_watching_over_namespace(resource, client_mock): - apicls_mock = client_mock.CustomObjectsApi - cl_list_fn = apicls_mock.return_value.list_cluster_custom_object - ns_list_fn = apicls_mock.return_value.list_namespaced_custom_object - cl_list_fn.side_effect = PreventedActualCallError() - ns_list_fn.side_effect = PreventedActualCallError() +async def test_watching_over_namespace(resource, req_mock): + req_mock.get.side_effect = PreventedActualCallError() itr = streaming_watch( resource=resource, @@ -58,14 +45,8 @@ async def test_watching_over_namespace(resource, client_mock): with pytest.raises(PreventedActualCallError): async for _ in itr: pass # fully deplete it - assert apicls_mock.called - assert apicls_mock.call_count == 1 - - # The scope-relevant listing function is used, depending on the resource. - assert not cl_list_fn.called - assert ns_list_fn.called - assert ns_list_fn.call_count == 1 - assert ns_list_fn.call_args[1]['group'] == resource.group - assert ns_list_fn.call_args[1]['version'] == resource.version - assert ns_list_fn.call_args[1]['plural'] == resource.plural - assert ns_list_fn.call_args[1]['namespace'] == 'something' + assert req_mock.get.called + assert req_mock.get.call_count == 1 + + url = req_mock.get.call_args_list[0][1]['url'] + assert 'apis/zalando.org/v1/namespaces/something/kopfexamples' in url diff --git a/tests/k8s/test_watching.py b/tests/k8s/test_watching.py index ca2a2556..e72ced31 100644 --- a/tests/k8s/test_watching.py +++ b/tests/k8s/test_watching.py @@ -20,7 +20,7 @@ ] STREAM_WITH_UNKNOWN_EVENT = [ {'type': 'ADDED', 'object': {'spec': 'a'}}, - {'type': 'UNKNOWN'}, + {'type': 'UNKNOWN', 'object': {}}, {'type': 'ADDED', 'object': {'spec': 'b'}}, ] STREAM_WITH_ERROR_410GONE = [ @@ -39,15 +39,8 @@ class SampleException(Exception): pass -@pytest.fixture() -def stream(mocker): - """ A mock for the stream of events as if returned by K8s client. """ - stream = mocker.patch('kubernetes.watch.Watch.stream') - return stream - - async def test_empty_stream_yields_nothing(resource, stream): - stream.return_value = iter([]) + stream.feed([]) events = [] async for event in streaming_watch(resource=resource, namespace=None): @@ -57,7 +50,7 @@ async def test_empty_stream_yields_nothing(resource, stream): async def test_event_stream_yields_everything(resource, stream): - stream.return_value = iter(STREAM_WITH_NORMAL_EVENTS) + stream.feed(STREAM_WITH_NORMAL_EVENTS) events = [] async for event in streaming_watch(resource=resource, namespace=None): @@ -70,13 +63,12 @@ async def test_event_stream_yields_everything(resource, stream): async def test_unknown_event_type_ignored(resource, stream, caplog): caplog.set_level(logging.DEBUG) - stream.return_value = iter(STREAM_WITH_UNKNOWN_EVENT) + stream.feed(STREAM_WITH_UNKNOWN_EVENT) events = [] async for event in streaming_watch(resource=resource, namespace=None): events.append(event) - assert stream.call_count == 1 assert len(events) == 2 assert events[0]['object']['spec'] == 'a' assert events[1]['object']['spec'] == 'b' @@ -86,57 +78,53 @@ async def test_unknown_event_type_ignored(resource, stream, caplog): async def test_error_410gone_exits_normally(resource, stream, caplog): caplog.set_level(logging.DEBUG) - stream.return_value = iter(STREAM_WITH_ERROR_410GONE) + stream.feed(STREAM_WITH_ERROR_410GONE) events = [] async for event in streaming_watch(resource=resource, namespace=None): events.append(event) - assert stream.call_count == 1 assert len(events) == 1 assert events[0]['object']['spec'] == 'a' assert "Restarting the watch-stream" in caplog.text async def test_unknown_error_raises_exception(resource, stream): - stream.return_value = iter(STREAM_WITH_ERROR_CODE) + stream.feed(STREAM_WITH_ERROR_CODE) events = [] with pytest.raises(WatchingError) as e: async for event in streaming_watch(resource=resource, namespace=None): events.append(event) - assert stream.call_count == 1 assert len(events) == 1 assert events[0]['object']['spec'] == 'a' assert '666' in str(e.value) async def test_exception_escalates(resource, stream): - stream.side_effect = SampleException() + stream.feed(SampleException()) events = [] with pytest.raises(SampleException): async for event in streaming_watch(resource=resource, namespace=None): events.append(event) - assert stream.call_count == 1 assert len(events) == 0 async def test_infinite_watch_never_exits_normally(resource, stream): - stream.side_effect = [ - iter(STREAM_WITH_ERROR_410GONE), # watching restarted - iter(STREAM_WITH_UNKNOWN_EVENT), # event ignored + stream.feed( + STREAM_WITH_ERROR_410GONE, # watching restarted + STREAM_WITH_UNKNOWN_EVENT, # event ignored SampleException(), # to finally exit it somehow - ] + ) events = [] with pytest.raises(SampleException): async for event in infinite_watch(resource=resource, namespace=None): events.append(event) - assert stream.call_count == 3 # 2 streams + 1 exception assert len(events) == 3 assert events[0]['object']['spec'] == 'a' assert events[1]['object']['spec'] == 'a' diff --git a/tests/reactor/conftest.py b/tests/reactor/conftest.py index d0e68abb..d6ad1171 100644 --- a/tests/reactor/conftest.py +++ b/tests/reactor/conftest.py @@ -7,24 +7,10 @@ from kopf.reactor.queueing import watcher from kopf.reactor.queueing import worker as original_worker -# We do not test the Kubernetes client, so everything there should be mocked. -# Also, no external calls must be made under any circumstances. Hence, auto-use. -@pytest.fixture(autouse=True) -def client_mock(mocker): - client_mock = mocker.patch('kubernetes.client') - - fake_list = {'items': [], 'metadata': {'resourceVersion': None}} - client_mock.CustomObjectsApi.list_cluster_custom_object.return_value = fake_list - client_mock.CustomObjectsApi.list_namespaced_custom_object.return_value = fake_list - - return client_mock - -@pytest.fixture() -def stream(mocker, client_mock): - """ A mock for the stream of events as if returned by K8s client. """ - stream = mocker.patch('kubernetes.watch.Watch.stream') - return stream +@pytest.fixture(autouse=True) +def _autouse_req_mock(req_mock): + pass @pytest.fixture() @@ -56,7 +42,7 @@ def watcher_limited(mocker): def watcher_in_background(resource, handler, event_loop, worker_spy, stream): # Prevent any real streaming for the very beginning, before it even starts. - stream.return_value = iter([]) + stream.feed([]) # Spawn a watcher in the background. coro = watcher( diff --git a/tests/reactor/test_queueing.py b/tests/reactor/test_queueing.py index e8641e32..86fd54ba 100644 --- a/tests/reactor/test_queueing.py +++ b/tests/reactor/test_queueing.py @@ -48,7 +48,7 @@ async def test_event_multiplexing(worker_mock, timer, resource, handler, stream, """ Verify that every unique uid goes into its own queue+worker, which are never shared. """ # Inject the events of unique objects - to produce few queues/workers. - stream.return_value = iter(events) + stream.feed(events) # Run the watcher (near-instantly and test-blocking). with timer: @@ -120,7 +120,7 @@ async def test_event_batching(mocker, resource, handler, timer, stream, events, mocker.patch('kopf.config.WorkersConfig.worker_exit_timeout', 0.5) # Inject the events of unique objects - to produce few queues/workers. - stream.return_value = iter(events) + stream.feed(events) # Run the watcher (near-instantly and test-blocking). with timer: @@ -168,9 +168,10 @@ async def test_garbage_collection_of_queues(mocker, stream, events, unique, work mocker.patch('kopf.config.WorkersConfig.worker_idle_timeout', 0.5) mocker.patch('kopf.config.WorkersConfig.worker_batch_window', 0.1) mocker.patch('kopf.config.WorkersConfig.worker_exit_timeout', 0.5) + mocker.patch('kopf.config.WatchersConfig.watcher_retry_delay', 1.0) # to prevent src depletion # Inject the events of unique objects - to produce few queues/workers. - stream.return_value = iter(events) + stream.feed(events) # Give it a moment to populate the queues and spawn all the workers. # Intercept and remember _any_ seen dict of queues for further checks. diff --git a/tests/testing/test_runner.py b/tests/testing/test_runner.py index 0e970dab..d5a4e652 100644 --- a/tests/testing/test_runner.py +++ b/tests/testing/test_runner.py @@ -4,10 +4,8 @@ @pytest.fixture(autouse=True) -def no_config_needed(mocker): - mocker.patch('kubernetes.config.load_incluster_config') - mocker.patch('kubernetes.config.load_kube_config') - mocker.patch('kubernetes.client.CoreApi') # for login self-check +def no_config_needed(login_mocks): + pass def test_command_invocation_works(): From 6331c1ab95292626b683a2afec06e5a781ffa724 Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Tue, 11 Jun 2019 17:20:48 +0200 Subject: [PATCH 02/16] Test that login works when kubernetes library is not installed --- tests/cli/test_login.py | 34 ++++++++++++++++++++++++++++++++++ tests/conftest.py | 30 ++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+) diff --git a/tests/cli/test_login.py b/tests/cli/test_login.py index 74097593..33e23ed7 100644 --- a/tests/cli/test_login.py +++ b/tests/cli/test_login.py @@ -14,6 +14,26 @@ def _auto_clean_kubernetes_client(clean_kubernetes_client): pass +@pytest.mark.usefixtures('kubernetes_uninstalled') +def test_kubernetes_uninstalled_has_effect(): + with pytest.raises(ImportError): + import kubernetes + +# +# Tests via the direct function invocation. +# + +@pytest.mark.usefixtures('kubernetes_uninstalled') +def test_direct_auth_works_without_client(login_mocks): + login() + verify() + + assert login_mocks.pykube_in_cluster.called + assert not login_mocks.pykube_from_file.called + assert not login_mocks.client_in_cluster.called + assert not login_mocks.client_from_file.called + + def test_direct_auth_works_incluster(login_mocks): login() @@ -93,6 +113,20 @@ def test_direct_check_fails_on_errors_in_client(login_mocks): assert login_mocks.pykube_checker.called assert login_mocks.client_checker.called +# +# The same tests, but via the CLI command run. +# + +@pytest.mark.usefixtures('kubernetes_uninstalled') +def test_clirun_auth_works_without_client(invoke, login_mocks, preload, real_run): + result = invoke(['run']) + assert result.exit_code == 0 + + assert login_mocks.pykube_in_cluster.called + assert not login_mocks.pykube_from_file.called + assert not login_mocks.client_in_cluster.called + assert not login_mocks.client_from_file.called + def test_clirun_auth_works_incluster(invoke, login_mocks, preload, real_run): diff --git a/tests/conftest.py b/tests/conftest.py index b0648464..590fb93a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -5,6 +5,7 @@ import logging import os import re +import sys import time from unittest.mock import Mock @@ -142,6 +143,35 @@ def login_mocks(mocker): client_checker=mocker.patch.object(kubernetes.client, 'CoreApi'), ) +# +# Simulating that Kubernetes client library is not installed. +# + +class ProhibitedImportFinder: + def find_spec(self, fullname, path, target=None): + if fullname == 'kubernetes' or fullname.startswith('kubernetes'): + raise ImportError("Import is prohibited for tests.") + + +@pytest.fixture() +def kubernetes_uninstalled(): + + # Remove any cached modules. + preserved = {} + for name, mod in list(sys.modules.items()): + if name == 'kubernetes' or name.startswith('kubernetes.'): + preserved[name] = mod + del sys.modules[name] + + # Inject the prohibition for loading this module. And restore when done. + finder = ProhibitedImportFinder() + sys.meta_path.insert(0, finder) + try: + yield + finally: + sys.meta_path.remove(finder) + sys.modules.update(preserved) + # # Helpers for the timing checks. # From 490a4d36617db122ee29503c1b3725e89c9d4ae2 Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Tue, 11 Jun 2019 18:23:04 +0200 Subject: [PATCH 03/16] Test with kubernetes client library & its dependencies not pre-installed --- .travis.yml | 8 +++++--- requirements.txt | 3 --- tests/cli/conftest.py | 2 +- tests/cli/test_logging.py | 5 ++++- tests/cli/test_login.py | 7 ++++++- tests/conftest.py | 3 ++- tools/kubernetes-client.sh | 8 ++++++++ 7 files changed, 26 insertions(+), 10 deletions(-) create mode 100755 tools/kubernetes-client.sh diff --git a/.travis.yml b/.travis.yml index 2b0b6757..66b17410 100644 --- a/.travis.yml +++ b/.travis.yml @@ -17,9 +17,10 @@ env: - MINIKUBE_VERSION=1.0.1 - E2E=true # to not skip the e2e tests in pytest matrix: - - KUBERNETES_VERSION=1.14.0 - - KUBERNETES_VERSION=1.13.0 - - KUBERNETES_VERSION=1.12.0 + - KUBERNETES_VERSION=1.14.0 CLIENT=yes # only one "yes" is enough + - KUBERNETES_VERSION=1.14.0 CLIENT=no + - KUBERNETES_VERSION=1.13.0 CLIENT=no + - KUBERNETES_VERSION=1.12.0 CLIENT=no # - KUBERNETES_VERSION=1.11.10 # Minikube fails on CRI preflight checks # - KUBERNETES_VERSION=1.10.13 # CRDs require spec.version, which fails on 1.14 @@ -33,6 +34,7 @@ matrix: before_script: - tools/minikube-for-travis.sh + - tools/kubernetes-client.sh script: - pytest -v diff --git a/requirements.txt b/requirements.txt index f864ffce..3cd41951 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,9 +4,6 @@ urllib3<1.25 # The runtime dependencies of the framework, as if via `pip install kopf`. -e . -# Optional packages are present for testing (presence is auto-detected). -kubernetes - # Everything needed to develop (test, debug) the framework. pytest-asyncio pytest-mock diff --git a/tests/cli/conftest.py b/tests/cli/conftest.py index 8aed91ee..9a3b2450 100644 --- a/tests/cli/conftest.py +++ b/tests/cli/conftest.py @@ -2,7 +2,6 @@ import sys import click.testing -import kubernetes import pytest import kopf @@ -65,6 +64,7 @@ def clean_modules_cache(): @pytest.fixture(autouse=True) def clean_kubernetes_client(): + kubernetes = pytest.importorskip('kubernetes') kubernetes.client.configuration.Configuration.set_default(None) diff --git a/tests/cli/test_logging.py b/tests/cli/test_logging.py index 7e970599..d13c5ff9 100644 --- a/tests/cli/test_logging.py +++ b/tests/cli/test_logging.py @@ -1,7 +1,6 @@ import asyncio import logging -import kubernetes import pytest @@ -52,6 +51,8 @@ def test_verbosity(invoke, caplog, options, envvars, expect_debug, expect_info, (['--verbose']), ], ids=['default', 'q', 'quiet', 'v', 'verbose']) def test_no_lowlevel_dumps_in_nondebug(invoke, caplog, options, login, preload, real_run): + kubernetes = pytest.importorskip('kubernetes') + result = invoke(['run'] + options) assert result.exit_code == 0 @@ -70,6 +71,8 @@ def test_no_lowlevel_dumps_in_nondebug(invoke, caplog, options, login, preload, (['--debug']), ], ids=['d', 'debug']) def test_lowlevel_dumps_in_debug_mode(invoke, caplog, options, login, preload, real_run): + kubernetes = pytest.importorskip('kubernetes') + result = invoke(['run'] + options) assert result.exit_code == 0 diff --git a/tests/cli/test_login.py b/tests/cli/test_login.py index 33e23ed7..5ef16ef5 100644 --- a/tests/cli/test_login.py +++ b/tests/cli/test_login.py @@ -2,7 +2,6 @@ Remember: We do not test the clients, we assume they work when used properly. We test our own functions here, and check if the clients were called. """ -import kubernetes.client.rest import pytest import requests @@ -45,6 +44,7 @@ def test_direct_auth_works_incluster(login_mocks): def test_direct_auth_works_kubeconfig(login_mocks): + kubernetes = pytest.importorskip('kubernetes') login_mocks.pykube_in_cluster.side_effect = FileNotFoundError login_mocks.client_in_cluster.side_effect = kubernetes.config.ConfigException @@ -70,6 +70,7 @@ def test_direct_auth_fails_on_errors_in_pykube(login_mocks): def test_direct_auth_fails_on_errors_in_client(login_mocks): + kubernetes = pytest.importorskip('kubernetes') login_mocks.client_in_cluster.side_effect = kubernetes.config.ConfigException login_mocks.client_from_file.side_effect = kubernetes.config.ConfigException @@ -100,6 +101,7 @@ def test_direct_check_fails_on_errors_in_pykube(login_mocks): def test_direct_check_fails_on_errors_in_client(login_mocks): + kubernetes = pytest.importorskip('kubernetes') login_mocks.client_checker.side_effect = kubernetes.client.rest.ApiException(status=401) login() @@ -142,6 +144,7 @@ def test_clirun_auth_works_incluster(invoke, login_mocks, preload, real_run): def test_clirun_auth_works_kubeconfig(invoke, login_mocks, preload, real_run): + kubernetes = pytest.importorskip('kubernetes') login_mocks.pykube_in_cluster.side_effect = FileNotFoundError login_mocks.client_in_cluster.side_effect = kubernetes.config.ConfigException @@ -173,6 +176,7 @@ def test_clirun_auth_fails_on_errors_in_pykube(invoke, login_mocks, preload, rea def test_clirun_auth_fails_on_errors_in_client(invoke, login_mocks, preload, real_run): + kubernetes = pytest.importorskip('kubernetes') login_mocks.client_in_cluster.side_effect = kubernetes.config.ConfigException login_mocks.client_from_file.side_effect = kubernetes.config.ConfigException @@ -206,6 +210,7 @@ def test_clirun_check_fails_on_errors_in_pykube(invoke, login_mocks, preload, re def test_clirun_check_fails_on_errors_in_client(invoke, login_mocks, preload, real_run): + kubernetes = pytest.importorskip('kubernetes') login_mocks.client_checker.side_effect = kubernetes.client.rest.ApiException(status=401) result = invoke(['run']) diff --git a/tests/conftest.py b/tests/conftest.py index 590fb93a..80e13092 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,7 +10,6 @@ from unittest.mock import Mock import asynctest -import kubernetes.client.rest import pykube import pytest import pytest_mock @@ -127,6 +126,8 @@ class LoginMocks: @pytest.fixture() def login_mocks(mocker): + kubernetes = pytest.importorskip('kubernetes') + # Pykube config is needed to create a pykube's API instance. # But we do not want and do not need to actually authenticate, so we mock. # Some fields are used by pykube's objects: we have to know them ("leaky abstractions"). diff --git a/tools/kubernetes-client.sh b/tools/kubernetes-client.sh new file mode 100755 index 00000000..8fd024de --- /dev/null +++ b/tools/kubernetes-client.sh @@ -0,0 +1,8 @@ +#!/bin/bash +set -eu +set -x + +if [[ "${CLIENT:-}" = "yes" ]] ; then + # FIXME: See https://github.com/kubernetes-client/python/issues/866 + pip install 'kubernetes<10.0.0' +fi From 05a30888a2c5b08e0fc3faa6b5c9015ebabd4ced Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Fri, 14 Jun 2019 22:03:09 +0200 Subject: [PATCH 04/16] Pass the list's resource version for watching init state --- kopf/clients/fetching.py | 4 ++-- kopf/clients/watching.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/kopf/clients/fetching.py b/kopf/clients/fetching.py index 4951dc3b..0fa9923f 100644 --- a/kopf/clients/fetching.py +++ b/kopf/clients/fetching.py @@ -61,7 +61,7 @@ def list_objs(*, resource, namespace=None): return lst.response -def watch_objs(*, resource, namespace=None, timeout=None): +def watch_objs(*, resource, namespace=None, timeout=None, since=None): """ Watch the objects of specific resource type. @@ -78,5 +78,5 @@ def watch_objs(*, resource, namespace=None, timeout=None): cls = classes._make_cls(resource=resource) namespace = namespace if issubclass(cls, pykube.objects.NamespacedAPIObject) else None lst = cls.objects(api, namespace=pykube.all if namespace is None else namespace) - src = lst.watch() + src = lst.watch(since=since) return iter({'type': event.type, 'object': event.object.obj} for event in src) diff --git a/kopf/clients/watching.py b/kopf/clients/watching.py index 60d44421..5bcb683f 100644 --- a/kopf/clients/watching.py +++ b/kopf/clients/watching.py @@ -83,10 +83,10 @@ async def streaming_watch( kwargs = {} kwargs.update(dict(resource_version=resource_version) if resource_version else {}) kwargs.update(dict(timeout_seconds=config.WatchersConfig.default_stream_timeout) if config.WatchersConfig.default_stream_timeout else {}) - # TODO: pass kwargs, specifically: timeout + resource_version loop = asyncio.get_event_loop() stream = fetching.watch_objs(resource=resource, namespace=namespace, - timeout=config.WatchersConfig.default_stream_timeout) + timeout=config.WatchersConfig.default_stream_timeout, + since=resource_version) async for event in streaming_aiter(stream, loop=loop): # "410 Gone" is for the "resource version too old" error, we must restart watching. From 41e1ff6b962f28d05e6f90ea0d1215f8dc9330b8 Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Fri, 14 Jun 2019 22:14:47 +0200 Subject: [PATCH 05/16] Simulate the missing resource kind identifiers in the initial listing --- kopf/clients/watching.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/kopf/clients/watching.py b/kopf/clients/watching.py index 5bcb683f..9e61cb50 100644 --- a/kopf/clients/watching.py +++ b/kopf/clients/watching.py @@ -77,6 +77,9 @@ async def streaming_watch( rsp = fetching.list_objs(resource=resource, namespace=namespace) resource_version = rsp['metadata']['resourceVersion'] for item in rsp['items']: + # FIXME: fix in pykube to inject the missing item's fields from the list's metainfo. + item.setdefault('kind', rsp['kind'][:-4] if rsp['kind'][-4:] == 'List' else rsp['kind']) + item.setdefault('apiVersion', rsp['apiVersion']) yield {'type': None, 'object': item} # Then, watch the resources starting from the list's resource version. From 1a7faef5a036c27d3a141f7a2493db15f4c8e448 Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Tue, 18 Jun 2019 12:01:56 +0200 Subject: [PATCH 06/16] Combine k8s login & access check into one routine, disabled by default --- kopf/cli.py | 3 +-- kopf/clients/auth.py | 51 +++++++++++++++-------------------- tests/cli/conftest.py | 6 +++-- tests/cli/test_help.py | 24 +++++++++++------ tests/cli/test_login.py | 59 ++++++++++++++++++++++++++--------------- 5 files changed, 79 insertions(+), 64 deletions(-) diff --git a/kopf/cli.py b/kopf/cli.py index 5d8add8e..e0fba667 100644 --- a/kopf/cli.py +++ b/kopf/cli.py @@ -12,8 +12,7 @@ def cli_login(): try: - auth.login() - auth.verify() + auth.login(verify=True) except auth.LoginError as e: raise click.ClickException(str(e)) except auth.AccessError as e: diff --git a/kopf/clients/auth.py b/kopf/clients/auth.py index e817a073..02862f46 100644 --- a/kopf/clients/auth.py +++ b/kopf/clients/auth.py @@ -19,7 +19,7 @@ class AccessError(Exception): """ Raised when the operator cannot access the cluster API. """ -def login(): +def login(verify=False): """ Login to Kubernetes cluster, locally or remotely. @@ -30,19 +30,24 @@ def login(): """ # Pykube login is mandatory. If it fails, the framework will not run at all. - login_pykube() + try: + import pykube + except ImportError: + raise # mandatory + else: + login_pykube(verify=verify) # We keep the official client library auto-login only because it was # an implied behavior before switching to pykube -- to keep it so (implied). try: import kubernetes except ImportError: - pass + pass # optional else: - login_client() + login_client(verify=verify) -def login_pykube(): +def login_pykube(verify=False): global _pykube_cfg try: _pykube_cfg = pykube.KubeConfig.from_service_account() @@ -54,8 +59,11 @@ def login_pykube(): except (pykube.PyKubeError, FileNotFoundError): raise LoginError(f"Cannot authenticate pykube neither in-cluster, nor via kubeconfig.") + if verify: + verify_pykube() + -def login_client(): +def login_client(verify=False): import kubernetes.client try: kubernetes.config.load_incluster_config() # cluster env vars @@ -67,34 +75,14 @@ def login_client(): except kubernetes.config.ConfigException as e2: raise LoginError(f"Cannot authenticate client neither in-cluster, nor via kubeconfig.") - -def verify(): - """ - Verify if login has succeeded, and the access configuration is still valid. - - If not, raise `.AccessError`. - - It can also perform the permission checks before the operator - actually starts, i.e. before any fetching or watching API calls. - - If the operator is not logged in, then the check fails, - and the operator stops (as any other API call). - """ - - # Pykube login is mandatory. If it fails, the framework will not run at all. - verify_pykube() - - # We keep the official client library auto-login only because it was - # an implied behavior before switching to pykube -- to keep it so (implied). - try: - import kubernetes - except ImportError: - pass - else: + if verify: verify_client() def verify_pykube(): + """ + Verify if login has succeeded, and the access configuration is still valid. + """ try: api = get_pykube_api() rsp = api.get(version="", base="/") @@ -108,6 +96,9 @@ def verify_pykube(): def verify_client(): + """ + Verify if login has succeeded, and the access configuration is still valid. + """ import kubernetes.client.rest try: api = kubernetes.client.CoreApi() diff --git a/tests/cli/conftest.py b/tests/cli/conftest.py index 9a3b2450..a69a1b02 100644 --- a/tests/cli/conftest.py +++ b/tests/cli/conftest.py @@ -82,8 +82,10 @@ def invoke(runner): @pytest.fixture() def login(mocker): - mocker.patch('kopf.clients.auth.login') - mocker.patch('kopf.clients.auth.verify') + mocker.patch('kopf.clients.auth.login_pykube') + mocker.patch('kopf.clients.auth.login_client') + mocker.patch('kopf.clients.auth.verify_pykube') + mocker.patch('kopf.clients.auth.verify_client') @pytest.fixture() diff --git a/tests/cli/test_help.py b/tests/cli/test_help.py index 8db19d41..e0261159 100644 --- a/tests/cli/test_help.py +++ b/tests/cli/test_help.py @@ -1,14 +1,18 @@ def test_help_in_root(invoke, mocker): - login = mocker.patch('kopf.clients.auth.login') - verify = mocker.patch('kopf.clients.auth.verify') + login_pykube = mocker.patch('kopf.clients.auth.login_pykube') + login_client = mocker.patch('kopf.clients.auth.login_client') + verify_pykube = mocker.patch('kopf.clients.auth.verify_pykube') + verify_client = mocker.patch('kopf.clients.auth.verify_client') result = invoke(['--help']) assert result.exit_code == 0 - assert not login.called - assert not verify.called + assert not login_pykube.called + assert not login_client.called + assert not verify_pykube.called + assert not verify_client.called assert 'Usage: kopf [OPTIONS]' in result.output assert ' run ' in result.output @@ -17,16 +21,20 @@ def test_help_in_root(invoke, mocker): def test_help_in_subcommand(invoke, mocker): - login = mocker.patch('kopf.clients.auth.login') - verify = mocker.patch('kopf.clients.auth.verify') + login_pykube = mocker.patch('kopf.clients.auth.login_pykube') + login_client = mocker.patch('kopf.clients.auth.login_client') + verify_pykube = mocker.patch('kopf.clients.auth.verify_pykube') + verify_client = mocker.patch('kopf.clients.auth.verify_client') preload = mocker.patch('kopf.utilities.loaders.preload') real_run = mocker.patch('kopf.reactor.queueing.run') result = invoke(['run', '--help']) assert result.exit_code == 0 - assert not login.called - assert not verify.called + assert not login_pykube.called + assert not login_client.called + assert not verify_pykube.called + assert not verify_client.called assert not preload.called assert not real_run.called diff --git a/tests/cli/test_login.py b/tests/cli/test_login.py index 5ef16ef5..cbb56901 100644 --- a/tests/cli/test_login.py +++ b/tests/cli/test_login.py @@ -5,7 +5,7 @@ import pytest import requests -from kopf.clients.auth import login, verify, LoginError, AccessError +from kopf.clients.auth import login, LoginError, AccessError @pytest.fixture(autouse=True) @@ -24,11 +24,11 @@ def test_kubernetes_uninstalled_has_effect(): @pytest.mark.usefixtures('kubernetes_uninstalled') def test_direct_auth_works_without_client(login_mocks): - login() - verify() + login(verify=True) assert login_mocks.pykube_in_cluster.called assert not login_mocks.pykube_from_file.called + assert not login_mocks.client_in_cluster.called assert not login_mocks.client_from_file.called @@ -39,6 +39,7 @@ def test_direct_auth_works_incluster(login_mocks): assert login_mocks.pykube_in_cluster.called assert not login_mocks.pykube_from_file.called + assert login_mocks.client_in_cluster.called assert not login_mocks.client_from_file.called @@ -52,6 +53,7 @@ def test_direct_auth_works_kubeconfig(login_mocks): assert login_mocks.pykube_in_cluster.called assert login_mocks.pykube_from_file.called + assert login_mocks.client_in_cluster.called assert login_mocks.client_from_file.called @@ -65,8 +67,10 @@ def test_direct_auth_fails_on_errors_in_pykube(login_mocks): assert login_mocks.pykube_in_cluster.called assert login_mocks.pykube_from_file.called - assert not login_mocks.client_in_cluster.called # because pykube failed - assert not login_mocks.client_from_file.called # because pykube failed + + # Because pykube failed, the client is not even tried: + assert not login_mocks.client_in_cluster.called + assert not login_mocks.client_from_file.called def test_direct_auth_fails_on_errors_in_client(login_mocks): @@ -79,6 +83,7 @@ def test_direct_auth_fails_on_errors_in_client(login_mocks): assert login_mocks.pykube_in_cluster.called assert not login_mocks.pykube_from_file.called + assert login_mocks.client_in_cluster.called assert login_mocks.client_from_file.called @@ -88,31 +93,32 @@ def test_direct_check_fails_on_errors_in_pykube(login_mocks): response.status_code = 401 login_mocks.pykube_checker.side_effect = requests.exceptions.HTTPError(response=response) - login() with pytest.raises(AccessError): - verify() + login(verify=True) assert login_mocks.pykube_in_cluster.called assert not login_mocks.pykube_from_file.called - assert login_mocks.client_in_cluster.called - assert not login_mocks.client_from_file.called assert login_mocks.pykube_checker.called - assert not login_mocks.client_checker.called # because pykube failed + + # Because pykube failed, the client is not even tried: + assert not login_mocks.client_in_cluster.called + assert not login_mocks.client_from_file.called + assert not login_mocks.client_checker.called def test_direct_check_fails_on_errors_in_client(login_mocks): kubernetes = pytest.importorskip('kubernetes') login_mocks.client_checker.side_effect = kubernetes.client.rest.ApiException(status=401) - login() with pytest.raises(AccessError): - verify() + login(verify=True) assert login_mocks.pykube_in_cluster.called assert not login_mocks.pykube_from_file.called + assert login_mocks.pykube_checker.called + assert login_mocks.client_in_cluster.called assert not login_mocks.client_from_file.called - assert login_mocks.pykube_checker.called assert login_mocks.client_checker.called # @@ -126,6 +132,7 @@ def test_clirun_auth_works_without_client(invoke, login_mocks, preload, real_run assert login_mocks.pykube_in_cluster.called assert not login_mocks.pykube_from_file.called + assert not login_mocks.client_in_cluster.called assert not login_mocks.client_from_file.called @@ -137,9 +144,10 @@ def test_clirun_auth_works_incluster(invoke, login_mocks, preload, real_run): assert login_mocks.pykube_in_cluster.called assert not login_mocks.pykube_from_file.called + assert login_mocks.pykube_checker.called + assert login_mocks.client_in_cluster.called assert not login_mocks.client_from_file.called - assert login_mocks.pykube_checker.called assert login_mocks.client_checker.called @@ -153,9 +161,10 @@ def test_clirun_auth_works_kubeconfig(invoke, login_mocks, preload, real_run): assert login_mocks.pykube_in_cluster.called assert login_mocks.pykube_from_file.called + assert login_mocks.pykube_checker.called + assert login_mocks.client_in_cluster.called assert login_mocks.client_from_file.called - assert login_mocks.pykube_checker.called assert login_mocks.client_checker.called @@ -169,9 +178,11 @@ def test_clirun_auth_fails_on_errors_in_pykube(invoke, login_mocks, preload, rea assert login_mocks.pykube_in_cluster.called assert login_mocks.pykube_from_file.called - assert not login_mocks.client_in_cluster.called # because pykube failed - assert not login_mocks.client_from_file.called # because pykube failed assert not login_mocks.pykube_checker.called + + # Because pykube failed, the client is not even tried: + assert not login_mocks.client_in_cluster.called + assert not login_mocks.client_from_file.called assert not login_mocks.client_checker.called @@ -186,9 +197,10 @@ def test_clirun_auth_fails_on_errors_in_client(invoke, login_mocks, preload, rea assert login_mocks.pykube_in_cluster.called assert not login_mocks.pykube_from_file.called + assert login_mocks.pykube_checker.called + assert login_mocks.client_in_cluster.called assert login_mocks.client_from_file.called - assert not login_mocks.pykube_checker.called assert not login_mocks.client_checker.called @@ -203,10 +215,12 @@ def test_clirun_check_fails_on_errors_in_pykube(invoke, login_mocks, preload, re assert login_mocks.pykube_in_cluster.called assert not login_mocks.pykube_from_file.called - assert login_mocks.client_in_cluster.called - assert not login_mocks.client_from_file.called assert login_mocks.pykube_checker.called - assert not login_mocks.client_checker.called # because pykube failed + + # Because pykube failed, the client is not even tried: + assert not login_mocks.client_in_cluster.called + assert not login_mocks.client_from_file.called + assert not login_mocks.client_checker.called def test_clirun_check_fails_on_errors_in_client(invoke, login_mocks, preload, real_run): @@ -219,7 +233,8 @@ def test_clirun_check_fails_on_errors_in_client(invoke, login_mocks, preload, re assert login_mocks.pykube_in_cluster.called assert not login_mocks.pykube_from_file.called + assert login_mocks.pykube_checker.called + assert login_mocks.client_in_cluster.called assert not login_mocks.client_from_file.called - assert login_mocks.pykube_checker.called assert login_mocks.client_checker.called From 0f021c704fede58b47d98bdbfc8e96ccdbeb4069 Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Tue, 18 Jun 2019 12:01:28 +0200 Subject: [PATCH 07/16] Add an example for built-in pods controller --- examples/09-testing/test_example_09.py | 13 +++++-- examples/10-builtins/README.md | 43 ++++++++++++++++++++++ examples/10-builtins/example.py | 47 +++++++++++++++++++++++++ examples/10-builtins/test_example_10.py | 46 ++++++++++++++++++++++++ 4 files changed, 146 insertions(+), 3 deletions(-) create mode 100644 examples/10-builtins/README.md create mode 100644 examples/10-builtins/example.py create mode 100644 examples/10-builtins/test_example_10.py diff --git a/examples/09-testing/test_example_09.py b/examples/09-testing/test_example_09.py index 1f0a2cee..57b28811 100644 --- a/examples/09-testing/test_example_09.py +++ b/examples/09-testing/test_example_09.py @@ -13,18 +13,25 @@ @pytest.fixture(autouse=True) def crd_exists(): - subprocess.run(f"kubectl apply -f {crd_yaml}", shell=True, check=True) + subprocess.run(f"kubectl apply -f {crd_yaml}", + check=True, timeout=10, capture_output=True, shell=True) @pytest.fixture(autouse=True) def obj_absent(): - subprocess.run(f"kubectl delete -f {obj_yaml}", shell=True, check=False) + # Operator is not running in fixtures, so we need a force-delete (or this patch). + subprocess.run(['kubectl', 'patch', '-f', obj_yaml, + '-p', '{"metadata":{"finalizers":[]}}', + '--type', 'merge'], + check=False, timeout=10, capture_output=True) + subprocess.run(f"kubectl delete -f {obj_yaml}", + check=False, timeout=10, capture_output=True, shell=True) def test_resource_lifecycle(mocker): # To prevent lengthy threads in the loop executor when the process exits. - mocker.patch('kopf.clients.watching.DEFAULT_STREAM_TIMEOUT', 10) + mocker.patch('kopf.config.WatchersConfig.default_stream_timeout', 10) # Run an operator and simulate some activity with the operated resource. with kopf.testing.KopfRunner(['run', '--verbose', '--standalone', example_py]) as runner: diff --git a/examples/10-builtins/README.md b/examples/10-builtins/README.md new file mode 100644 index 00000000..c10aaed1 --- /dev/null +++ b/examples/10-builtins/README.md @@ -0,0 +1,43 @@ +# Kopf example for built-in resources + +Kopf can also handle the built-in resources, such as Pods, Jobs, etc. + +In this example, we take control all over the pods (namespaced/cluster-wide), +and allow the pods to exist for no longer than 30 seconds -- +either after creation or after the operator restart. + +For no specific reason, just for fun. Maybe, as a way of Chaos Engineering +to force making the resilient applications (tolerant to pod killing). + +However, the system namespaces (kube-system, etc) are explicitly protected -- +to prevent killing the cluster itself. + +Start the operator: + +```bash +kopf run example.py --verbose +``` + +Start a sample pod: + +```bash +kubectl run -it --image=ubuntu expr1 -- bash -i +# wait for 30s +``` + +Since `kubectl run` creates a Deployment, not just a Pod, +a new pod will be created every 30 seconds. Observe with: + +```bash +kubectl get pods --watch +``` + +*Please note that Kopf puts a finalizer on the managed resources, +so the pod deletion will be blocked unless the operator is running +(to remove the finalizer). This will be made optional in #24.* + +Cleanup in the end: + +```bash +$ kubectl delete deployment expr1 +``` diff --git a/examples/10-builtins/example.py b/examples/10-builtins/example.py new file mode 100644 index 00000000..01d99b7a --- /dev/null +++ b/examples/10-builtins/example.py @@ -0,0 +1,47 @@ +import asyncio + +import kopf +import pykube + +tasks = {} # dict{namespace: dict{name: asyncio.Task}} + +try: + cfg = pykube.KubeConfig.from_service_account() +except FileNotFoundError: + cfg = pykube.KubeConfig.from_file() +api = pykube.HTTPClient(cfg) + + +@kopf.on.resume('', 'v1', 'pods') +@kopf.on.create('', 'v1', 'pods') +async def pod_in_sight(namespace, name, logger, **kwargs): + if namespace.startswith('kube-'): + return + else: + task = asyncio.create_task(pod_killer(namespace, name, logger)) + tasks.setdefault(namespace, {}) + tasks[namespace][name] = task + + +@kopf.on.delete('', 'v1', 'pods') +async def pod_deleted(namespace, name, **kwargs): + if namespace in tasks and name in tasks[namespace]: + task = tasks[namespace][name] + task.cancel() # it will also remove from `tasks` + + +async def pod_killer(namespace, name, logger, timeout=30): + try: + logger.info(f"=== Pod killing happens in {timeout}s.") + await asyncio.sleep(timeout) + logger.info(f"=== Pod killing happens NOW!") + + pod = pykube.Pod.objects(api, namespace=namespace).get_by_name(name) + pod.delete() + + except asyncio.CancelledError: + logger.info(f"=== Pod killing is cancelled!") + + finally: + if namespace in tasks and name in tasks[namespace]: + del tasks[namespace][name] diff --git a/examples/10-builtins/test_example_10.py b/examples/10-builtins/test_example_10.py new file mode 100644 index 00000000..564355d1 --- /dev/null +++ b/examples/10-builtins/test_example_10.py @@ -0,0 +1,46 @@ +import os.path +import time + +import pykube + +import kopf.testing + + +def test_pods_reacted(): + + example_py = os.path.join(os.path.dirname(__file__), 'example.py') + with kopf.testing.KopfRunner(['run', '--verbose', example_py]) as runner: + _create_pod() + time.sleep(5) # give it some time to react + _delete_pod() + time.sleep(1) # give it some time to react + + assert runner.exception is None + assert runner.exit_code == 0 + + assert '[default/kopf-pod-1] Creation event:' in runner.stdout + assert '[default/kopf-pod-1] === Pod killing happens in 30s.' in runner.stdout + assert '[default/kopf-pod-1] Deletion event:' in runner.stdout + assert '[default/kopf-pod-1] === Pod killing is cancelled!' in runner.stdout + + +def _create_pod(): + api = pykube.HTTPClient(pykube.KubeConfig.from_file()) + pod = pykube.Pod(api, { + 'apiVersion': 'v1', + 'kind': 'Pod', + 'metadata': {'name': 'kopf-pod-1', 'namespace': 'default'}, + 'spec': { + 'containers': [{ + 'name': 'the-only-one', + 'image': 'busybox', + 'command': ["sh", "-x", "-c", "sleep 1"], + }]}, + }) + pod.create() + + +def _delete_pod(): + api = pykube.HTTPClient(pykube.KubeConfig.from_file()) + pod = pykube.Pod.objects(api, namespace='default').get_by_name('kopf-pod-1') + pod.delete() From d225cfa636be34b10986869ae130256a0c87f060 Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Tue, 2 Jul 2019 17:55:59 +0200 Subject: [PATCH 08/16] Verify the importing machinery restored properly after import hacks --- tests/conftest.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/conftest.py b/tests/conftest.py index 80e13092..a1e536d7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -150,12 +150,13 @@ def login_mocks(mocker): class ProhibitedImportFinder: def find_spec(self, fullname, path, target=None): - if fullname == 'kubernetes' or fullname.startswith('kubernetes'): + if fullname == 'kubernetes' or fullname.startswith('kubernetes.'): raise ImportError("Import is prohibited for tests.") @pytest.fixture() def kubernetes_uninstalled(): + import kubernetes as kubernetes_before # Remove any cached modules. preserved = {} @@ -173,6 +174,10 @@ def kubernetes_uninstalled(): sys.meta_path.remove(finder) sys.modules.update(preserved) + # Verify if it works and that we didn't break the importing machinery. + import kubernetes as kubernetes_after + assert kubernetes_after is kubernetes_before + # # Helpers for the timing checks. # From 27da45c3ef7260ebec055be2730ed17cf22017ca Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Tue, 2 Jul 2019 17:57:17 +0200 Subject: [PATCH 09/16] Make all client libraries optional for mocks, but do not skip the tests --- tests/cli/conftest.py | 9 +++-- tests/cli/test_login.py | 88 +++++++++++++++++++---------------------- tests/conftest.py | 78 ++++++++++++++++++++++++++++-------- 3 files changed, 107 insertions(+), 68 deletions(-) diff --git a/tests/cli/conftest.py b/tests/cli/conftest.py index a69a1b02..57516ea6 100644 --- a/tests/cli/conftest.py +++ b/tests/cli/conftest.py @@ -64,9 +64,12 @@ def clean_modules_cache(): @pytest.fixture(autouse=True) def clean_kubernetes_client(): - kubernetes = pytest.importorskip('kubernetes') - kubernetes.client.configuration.Configuration.set_default(None) - + try: + import kubernetes + except ImportError: + pass # absent client is already "clean" (or not "dirty" at least). + else: + kubernetes.client.configuration.Configuration.set_default(None) @pytest.fixture() diff --git a/tests/cli/test_login.py b/tests/cli/test_login.py index cbb56901..4c37221b 100644 --- a/tests/cli/test_login.py +++ b/tests/cli/test_login.py @@ -13,8 +13,7 @@ def _auto_clean_kubernetes_client(clean_kubernetes_client): pass -@pytest.mark.usefixtures('kubernetes_uninstalled') -def test_kubernetes_uninstalled_has_effect(): +def test_kubernetes_uninstalled_has_effect(no_kubernetes): with pytest.raises(ImportError): import kubernetes @@ -22,19 +21,23 @@ def test_kubernetes_uninstalled_has_effect(): # Tests via the direct function invocation. # -@pytest.mark.usefixtures('kubernetes_uninstalled') -def test_direct_auth_works_without_client(login_mocks): - login(verify=True) +def test_direct_auth_works_incluster_without_client(login_mocks, no_kubernetes): + login() assert login_mocks.pykube_in_cluster.called assert not login_mocks.pykube_from_file.called - assert not login_mocks.client_in_cluster.called - assert not login_mocks.client_from_file.called +def test_direct_auth_works_viaconfig_without_client(login_mocks, no_kubernetes): + login_mocks.pykube_in_cluster.side_effect = FileNotFoundError + + login() + + assert login_mocks.pykube_in_cluster.called + assert login_mocks.pykube_from_file.called -def test_direct_auth_works_incluster(login_mocks): +def test_direct_auth_works_incluster_with_client(login_mocks, kubernetes): login() assert login_mocks.pykube_in_cluster.called @@ -44,8 +47,7 @@ def test_direct_auth_works_incluster(login_mocks): assert not login_mocks.client_from_file.called -def test_direct_auth_works_kubeconfig(login_mocks): - kubernetes = pytest.importorskip('kubernetes') +def test_direct_auth_works_viaconfig_with_client(login_mocks, kubernetes): login_mocks.pykube_in_cluster.side_effect = FileNotFoundError login_mocks.client_in_cluster.side_effect = kubernetes.config.ConfigException @@ -58,7 +60,7 @@ def test_direct_auth_works_kubeconfig(login_mocks): assert login_mocks.client_from_file.called -def test_direct_auth_fails_on_errors_in_pykube(login_mocks): +def test_direct_auth_fails_on_errors_in_pykube(login_mocks, any_kubernetes): login_mocks.pykube_in_cluster.side_effect = FileNotFoundError login_mocks.pykube_from_file.side_effect = FileNotFoundError @@ -68,13 +70,8 @@ def test_direct_auth_fails_on_errors_in_pykube(login_mocks): assert login_mocks.pykube_in_cluster.called assert login_mocks.pykube_from_file.called - # Because pykube failed, the client is not even tried: - assert not login_mocks.client_in_cluster.called - assert not login_mocks.client_from_file.called - -def test_direct_auth_fails_on_errors_in_client(login_mocks): - kubernetes = pytest.importorskip('kubernetes') +def test_direct_auth_fails_on_errors_in_client(login_mocks, kubernetes): login_mocks.client_in_cluster.side_effect = kubernetes.config.ConfigException login_mocks.client_from_file.side_effect = kubernetes.config.ConfigException @@ -88,7 +85,7 @@ def test_direct_auth_fails_on_errors_in_client(login_mocks): assert login_mocks.client_from_file.called -def test_direct_check_fails_on_errors_in_pykube(login_mocks): +def test_direct_check_fails_on_errors_in_pykube(login_mocks, any_kubernetes): response = requests.Response() response.status_code = 401 login_mocks.pykube_checker.side_effect = requests.exceptions.HTTPError(response=response) @@ -100,14 +97,8 @@ def test_direct_check_fails_on_errors_in_pykube(login_mocks): assert not login_mocks.pykube_from_file.called assert login_mocks.pykube_checker.called - # Because pykube failed, the client is not even tried: - assert not login_mocks.client_in_cluster.called - assert not login_mocks.client_from_file.called - assert not login_mocks.client_checker.called - -def test_direct_check_fails_on_errors_in_client(login_mocks): - kubernetes = pytest.importorskip('kubernetes') +def test_direct_check_fails_on_errors_in_client(login_mocks, kubernetes): login_mocks.client_checker.side_effect = kubernetes.client.rest.ApiException(status=401) with pytest.raises(AccessError): @@ -125,19 +116,28 @@ def test_direct_check_fails_on_errors_in_client(login_mocks): # The same tests, but via the CLI command run. # -@pytest.mark.usefixtures('kubernetes_uninstalled') -def test_clirun_auth_works_without_client(invoke, login_mocks, preload, real_run): +def test_clirun_auth_works_incluster_without_client(login_mocks, no_kubernetes, + invoke, preload, real_run): result = invoke(['run']) assert result.exit_code == 0 assert login_mocks.pykube_in_cluster.called assert not login_mocks.pykube_from_file.called - assert not login_mocks.client_in_cluster.called - assert not login_mocks.client_from_file.called +def test_clirun_auth_works_viaconfig_without_client(login_mocks, no_kubernetes, + invoke, preload, real_run): + login_mocks.pykube_in_cluster.side_effect = FileNotFoundError + + result = invoke(['run']) + assert result.exit_code == 0 + + assert login_mocks.pykube_in_cluster.called + assert login_mocks.pykube_from_file.called -def test_clirun_auth_works_incluster(invoke, login_mocks, preload, real_run): + +def test_clirun_auth_works_incluster_with_client(login_mocks, kubernetes, + invoke, preload, real_run): result = invoke(['run']) assert result.exit_code == 0 @@ -151,8 +151,8 @@ def test_clirun_auth_works_incluster(invoke, login_mocks, preload, real_run): assert login_mocks.client_checker.called -def test_clirun_auth_works_kubeconfig(invoke, login_mocks, preload, real_run): - kubernetes = pytest.importorskip('kubernetes') +def test_clirun_auth_works_viaconfig_with_client(login_mocks, kubernetes, + invoke, preload, real_run): login_mocks.pykube_in_cluster.side_effect = FileNotFoundError login_mocks.client_in_cluster.side_effect = kubernetes.config.ConfigException @@ -168,7 +168,8 @@ def test_clirun_auth_works_kubeconfig(invoke, login_mocks, preload, real_run): assert login_mocks.client_checker.called -def test_clirun_auth_fails_on_errors_in_pykube(invoke, login_mocks, preload, real_run): +def test_clirun_auth_fails_on_errors_in_pykube(login_mocks, any_kubernetes, + invoke, preload, real_run): login_mocks.pykube_in_cluster.side_effect = FileNotFoundError login_mocks.pykube_from_file.side_effect = FileNotFoundError @@ -180,14 +181,9 @@ def test_clirun_auth_fails_on_errors_in_pykube(invoke, login_mocks, preload, rea assert login_mocks.pykube_from_file.called assert not login_mocks.pykube_checker.called - # Because pykube failed, the client is not even tried: - assert not login_mocks.client_in_cluster.called - assert not login_mocks.client_from_file.called - assert not login_mocks.client_checker.called - -def test_clirun_auth_fails_on_errors_in_client(invoke, login_mocks, preload, real_run): - kubernetes = pytest.importorskip('kubernetes') +def test_clirun_auth_fails_on_errors_in_client(login_mocks, kubernetes, + invoke, preload, real_run): login_mocks.client_in_cluster.side_effect = kubernetes.config.ConfigException login_mocks.client_from_file.side_effect = kubernetes.config.ConfigException @@ -204,7 +200,8 @@ def test_clirun_auth_fails_on_errors_in_client(invoke, login_mocks, preload, rea assert not login_mocks.client_checker.called -def test_clirun_check_fails_on_errors_in_pykube(invoke, login_mocks, preload, real_run): +def test_clirun_check_fails_on_errors_in_pykube(login_mocks, any_kubernetes, + invoke, preload, real_run): response = requests.Response() response.status_code = 401 login_mocks.pykube_checker.side_effect = requests.exceptions.HTTPError(response=response) @@ -217,14 +214,9 @@ def test_clirun_check_fails_on_errors_in_pykube(invoke, login_mocks, preload, re assert not login_mocks.pykube_from_file.called assert login_mocks.pykube_checker.called - # Because pykube failed, the client is not even tried: - assert not login_mocks.client_in_cluster.called - assert not login_mocks.client_from_file.called - assert not login_mocks.client_checker.called - -def test_clirun_check_fails_on_errors_in_client(invoke, login_mocks, preload, real_run): - kubernetes = pytest.importorskip('kubernetes') +def test_clirun_check_fails_on_errors_in_client(login_mocks, kubernetes, + invoke, preload, real_run): login_mocks.client_checker.side_effect = kubernetes.client.rest.ApiException(status=401) result = invoke(['run']) diff --git a/tests/conftest.py b/tests/conftest.py index a1e536d7..b381351d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -116,17 +116,16 @@ def feed(*args): @dataclasses.dataclass(frozen=True, eq=False, order=False) class LoginMocks: - pykube_in_cluster: Mock - pykube_from_file: Mock - pykube_checker: Mock - client_in_cluster: Mock - client_from_file: Mock - client_checker: Mock + pykube_in_cluster: Mock = None + pykube_from_file: Mock = None + pykube_checker: Mock = None + client_in_cluster: Mock = None + client_from_file: Mock = None + client_checker: Mock = None @pytest.fixture() def login_mocks(mocker): - kubernetes = pytest.importorskip('kubernetes') # Pykube config is needed to create a pykube's API instance. # But we do not want and do not need to actually authenticate, so we mock. @@ -135,14 +134,30 @@ def login_mocks(mocker): cfg_mock.cluster = {'server': 'localhost'} cfg_mock.namespace = 'default' - return LoginMocks( - pykube_in_cluster=mocker.patch.object(pykube.KubeConfig, 'from_service_account'), - pykube_from_file=mocker.patch.object(pykube.KubeConfig, 'from_file'), - pykube_checker=mocker.patch.object(pykube.http.HTTPClient, 'get'), - client_in_cluster=mocker.patch.object(kubernetes.config, 'load_incluster_config'), - client_from_file=mocker.patch.object(kubernetes.config, 'load_kube_config'), - client_checker=mocker.patch.object(kubernetes.client, 'CoreApi'), - ) + # Make all client libraries potentially optional, but do not skip the tests: + # skipping the tests is the tests' decision, not this mocking fixture's one. + kwargs = {} + try: + import pykube + except ImportError: + pass + else: + kwargs.update( + pykube_in_cluster=mocker.patch.object(pykube.KubeConfig, 'from_service_account'), + pykube_from_file=mocker.patch.object(pykube.KubeConfig, 'from_file'), + pykube_checker=mocker.patch.object(pykube.http.HTTPClient, 'get'), + ) + try: + import kubernetes + except ImportError: + pass + else: + kwargs.update( + client_in_cluster=mocker.patch.object(kubernetes.config, 'load_incluster_config'), + client_from_file=mocker.patch.object(kubernetes.config, 'load_kube_config'), + client_checker=mocker.patch.object(kubernetes.client, 'CoreApi'), + ) + return LoginMocks(**kwargs) # # Simulating that Kubernetes client library is not installed. @@ -155,8 +170,19 @@ def find_spec(self, fullname, path, target=None): @pytest.fixture() -def kubernetes_uninstalled(): - import kubernetes as kubernetes_before +def _kubernetes(): + # If kubernetes client is required, it should either be installed, + # or skip the test: we cannot simulate its presence (unlike its absence). + return pytest.importorskip('kubernetes') + + +@pytest.fixture() +def _no_kubernetes(): + try: + import kubernetes as kubernetes_before + except ImportError: + yield + return # nothing to patch & restore. # Remove any cached modules. preserved = {} @@ -178,6 +204,24 @@ def kubernetes_uninstalled(): import kubernetes as kubernetes_after assert kubernetes_after is kubernetes_before + +@pytest.fixture(params=[True], ids=['with-client']) # for hinting suffixes +def kubernetes(request): + return request.getfixturevalue('_kubernetes') + + +@pytest.fixture(params=[False], ids=['no-client']) # for hinting suffixes +def no_kubernetes(request): + return request.getfixturevalue('_no_kubernetes') + + +@pytest.fixture(params=[False, True], ids=['no-client', 'with-client']) +def any_kubernetes(request): + if request.param: + return request.getfixturevalue('_kubernetes') + else: + return request.getfixturevalue('_no_kubernetes') + # # Helpers for the timing checks. # From 1e2b80607689c15c1dd6e0851d7677b5d4fa5a31 Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Tue, 2 Jul 2019 18:45:58 +0200 Subject: [PATCH 10/16] Skip e2e tests if they require kubernetes client when it is absent --- tests/e2e/test_examples.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/e2e/test_examples.py b/tests/e2e/test_examples.py index 0303a3c1..1df6f1cb 100644 --- a/tests/e2e/test_examples.py +++ b/tests/e2e/test_examples.py @@ -3,6 +3,8 @@ import subprocess import time +import pytest + from kopf.testing import KopfRunner @@ -25,6 +27,11 @@ def test_all_examples_are_runnable(mocker, with_crd, with_peering, exampledir): if m.group(2): requires_finalizer = not eval(m.group(3)) + # Skip the e2e test if the framework-optional but test-required library is missing. + m = re.search(r'import kubernetes', example_py.read_text(), re.M) + if m: + pytest.importorskip('kubernetes') + # To prevent lengthy sleeps on the simulated retries. mocker.patch('kopf.reactor.handling.DEFAULT_RETRY_DELAY', 1) From 44602c1527adefe9419d8b3ce7899cc08a1f38dd Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Tue, 9 Jul 2019 11:31:15 +0200 Subject: [PATCH 11/16] Remove a note for an already solved issue --- examples/10-builtins/README.md | 4 ---- 1 file changed, 4 deletions(-) diff --git a/examples/10-builtins/README.md b/examples/10-builtins/README.md index c10aaed1..6f9c29a1 100644 --- a/examples/10-builtins/README.md +++ b/examples/10-builtins/README.md @@ -32,10 +32,6 @@ a new pod will be created every 30 seconds. Observe with: kubectl get pods --watch ``` -*Please note that Kopf puts a finalizer on the managed resources, -so the pod deletion will be blocked unless the operator is running -(to remove the finalizer). This will be made optional in #24.* - Cleanup in the end: ```bash From 2118b8ed0f9f06822ba694a4751784804267d8dc Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Tue, 9 Jul 2019 11:54:31 +0200 Subject: [PATCH 12/16] Raise only on connection and authentication errors, ignore permissions --- kopf/clients/auth.py | 20 ++++++++-- tests/cli/test_login.py | 88 +++++++++++++++++++++++++++++++++-------- 2 files changed, 88 insertions(+), 20 deletions(-) diff --git a/kopf/clients/auth.py b/kopf/clients/auth.py index 02862f46..62e46e24 100644 --- a/kopf/clients/auth.py +++ b/kopf/clients/auth.py @@ -82,22 +82,36 @@ def login_client(verify=False): def verify_pykube(): """ Verify if login has succeeded, and the access configuration is still valid. + + All other errors (e.g. 403, 404) are ignored: it means, the host and port + are configured and are reachable, the authentication token is accepted, + and the rest are authorization or configuration errors (not a showstopper). """ try: api = get_pykube_api() rsp = api.get(version="", base="/") rsp.raise_for_status() + api.raise_for_status(rsp) # replaces requests's HTTPError with its own. + except requests.exceptions.ConnectionError as e: + raise AccessError("Cannot connect to the Kubernetes API. " + "Please configure the cluster access.") + except pykube.exceptions.HTTPError as e: + if e.code == 401: + raise AccessError("Cannot authenticate to the Kubernetes API. " + "Please login or configure the tokens.") except requests.exceptions.HTTPError as e: if e.response.status_code == 401: raise AccessError("Cannot authenticate to the Kubernetes API. " "Please login or configure the tokens.") - else: - raise def verify_client(): """ Verify if login has succeeded, and the access configuration is still valid. + + All other errors (e.g. 403, 404) are ignored: it means, the host and port + are configured and are reachable, the authentication token is accepted, + and the rest are authorization or configuration errors (not a showstopper). """ import kubernetes.client.rest try: @@ -110,8 +124,6 @@ def verify_client(): if e.status == 401: raise AccessError("Cannot authenticate to the Kubernetes API. " "Please login or configure the tokens.") - else: - raise def get_pykube_cfg() -> pykube.KubeConfig: diff --git a/tests/cli/test_login.py b/tests/cli/test_login.py index 4c37221b..04768c36 100644 --- a/tests/cli/test_login.py +++ b/tests/cli/test_login.py @@ -4,9 +4,13 @@ """ import pytest import requests +import urllib3 from kopf.clients.auth import login, LoginError, AccessError +RESPONSE_401 = requests.Response() +RESPONSE_401.status_code = 401 + @pytest.fixture(autouse=True) def _auto_clean_kubernetes_client(clean_kubernetes_client): @@ -85,10 +89,19 @@ def test_direct_auth_fails_on_errors_in_client(login_mocks, kubernetes): assert login_mocks.client_from_file.called -def test_direct_check_fails_on_errors_in_pykube(login_mocks, any_kubernetes): - response = requests.Response() - response.status_code = 401 - login_mocks.pykube_checker.side_effect = requests.exceptions.HTTPError(response=response) +def test_direct_check_fails_on_tcp_error_in_pykube(login_mocks, any_kubernetes): + login_mocks.pykube_checker.side_effect = requests.exceptions.ConnectionError() + + with pytest.raises(AccessError): + login(verify=True) + + assert login_mocks.pykube_in_cluster.called + assert not login_mocks.pykube_from_file.called + assert login_mocks.pykube_checker.called + + +def test_direct_check_fails_on_401_error_in_pykube(login_mocks, any_kubernetes): + login_mocks.pykube_checker.side_effect = requests.exceptions.HTTPError(response=RESPONSE_401) with pytest.raises(AccessError): login(verify=True) @@ -98,7 +111,22 @@ def test_direct_check_fails_on_errors_in_pykube(login_mocks, any_kubernetes): assert login_mocks.pykube_checker.called -def test_direct_check_fails_on_errors_in_client(login_mocks, kubernetes): +def test_direct_check_fails_on_tcp_error_in_client(login_mocks, kubernetes): + login_mocks.client_checker.side_effect = urllib3.exceptions.HTTPError() + + with pytest.raises(AccessError): + login(verify=True) + + assert login_mocks.pykube_in_cluster.called + assert not login_mocks.pykube_from_file.called + assert login_mocks.pykube_checker.called + + assert login_mocks.client_in_cluster.called + assert not login_mocks.client_from_file.called + assert login_mocks.client_checker.called + + +def test_direct_check_fails_on_401_error_in_client(login_mocks, kubernetes): login_mocks.client_checker.side_effect = kubernetes.client.rest.ApiException(status=401) with pytest.raises(AccessError): @@ -168,8 +196,8 @@ def test_clirun_auth_works_viaconfig_with_client(login_mocks, kubernetes, assert login_mocks.client_checker.called -def test_clirun_auth_fails_on_errors_in_pykube(login_mocks, any_kubernetes, - invoke, preload, real_run): +def test_clirun_auth_fails_on_config_error_in_pykube(login_mocks, any_kubernetes, + invoke, preload, real_run): login_mocks.pykube_in_cluster.side_effect = FileNotFoundError login_mocks.pykube_from_file.side_effect = FileNotFoundError @@ -182,8 +210,8 @@ def test_clirun_auth_fails_on_errors_in_pykube(login_mocks, any_kubernetes, assert not login_mocks.pykube_checker.called -def test_clirun_auth_fails_on_errors_in_client(login_mocks, kubernetes, - invoke, preload, real_run): +def test_clirun_auth_fails_on_config_error_in_client(login_mocks, kubernetes, + invoke, preload, real_run): login_mocks.client_in_cluster.side_effect = kubernetes.config.ConfigException login_mocks.client_from_file.side_effect = kubernetes.config.ConfigException @@ -200,11 +228,22 @@ def test_clirun_auth_fails_on_errors_in_client(login_mocks, kubernetes, assert not login_mocks.client_checker.called -def test_clirun_check_fails_on_errors_in_pykube(login_mocks, any_kubernetes, - invoke, preload, real_run): - response = requests.Response() - response.status_code = 401 - login_mocks.pykube_checker.side_effect = requests.exceptions.HTTPError(response=response) +def test_clirun_check_fails_on_tcp_error_in_pykube(login_mocks, any_kubernetes, + invoke, preload, real_run): + login_mocks.pykube_checker.side_effect = requests.exceptions.ConnectionError() + + result = invoke(['run']) + assert result.exit_code != 0 + assert 'Please configure the cluster access' in result.stdout + + assert login_mocks.pykube_in_cluster.called + assert not login_mocks.pykube_from_file.called + assert login_mocks.pykube_checker.called + + +def test_clirun_check_fails_on_401_error_in_pykube(login_mocks, any_kubernetes, + invoke, preload, real_run): + login_mocks.pykube_checker.side_effect = requests.exceptions.HTTPError(response=RESPONSE_401) result = invoke(['run']) assert result.exit_code != 0 @@ -215,8 +254,25 @@ def test_clirun_check_fails_on_errors_in_pykube(login_mocks, any_kubernetes, assert login_mocks.pykube_checker.called -def test_clirun_check_fails_on_errors_in_client(login_mocks, kubernetes, - invoke, preload, real_run): +def test_clirun_check_fails_on_tcp_error_in_client(login_mocks, kubernetes, + invoke, preload, real_run): + login_mocks.client_checker.side_effect = urllib3.exceptions.HTTPError() + + result = invoke(['run']) + assert result.exit_code != 0 + assert 'Please configure the cluster access' in result.stdout + + assert login_mocks.pykube_in_cluster.called + assert not login_mocks.pykube_from_file.called + assert login_mocks.pykube_checker.called + + assert login_mocks.client_in_cluster.called + assert not login_mocks.client_from_file.called + assert login_mocks.client_checker.called + + +def test_clirun_check_fails_on_401_error_in_client(login_mocks, kubernetes, + invoke, preload, real_run): login_mocks.client_checker.side_effect = kubernetes.client.rest.ApiException(status=401) result = invoke(['run']) From 0db1078306debd742582aba9a40446709093b34f Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Tue, 9 Jul 2019 11:57:40 +0200 Subject: [PATCH 13/16] Use the default timeout of pykube if not passed to `get_pykube_api()` --- kopf/clients/auth.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kopf/clients/auth.py b/kopf/clients/auth.py index 62e46e24..aab51483 100644 --- a/kopf/clients/auth.py +++ b/kopf/clients/auth.py @@ -134,4 +134,5 @@ def get_pykube_cfg() -> pykube.KubeConfig: # TODO: add some caching, but keep kwargs in mind. Maybe add a key= for purpose/use-place? def get_pykube_api(timeout=None) -> pykube.HTTPClient: - return pykube.HTTPClient(get_pykube_cfg(), timeout=timeout) + kwargs = dict(timeout=timeout) if timeout is not None else dict() + return pykube.HTTPClient(get_pykube_cfg(), **kwargs) From 495744bd36f7b2433de34593c83753ca2bb2f3a7 Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Tue, 9 Jul 2019 11:59:23 +0200 Subject: [PATCH 14/16] Specify which CRD is absent in an exception --- kopf/clients/classes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kopf/clients/classes.py b/kopf/clients/classes.py index 21a51433..218ba825 100644 --- a/kopf/clients/classes.py +++ b/kopf/clients/classes.py @@ -36,7 +36,7 @@ def _make_cls(resource) -> Type[APIObject]: resource_kind = next((r['kind'] for r in api_resources if r['name'] == resource.plural), None) is_namespaced = next((r['namespaced'] for r in api_resources if r['name'] == resource.plural), None) if not resource_kind: - raise pykube.ObjectDoesNotExist("No such CRD at all.") + raise pykube.ObjectDoesNotExist(f"No such CRD: {resource.name}") cls_name = resource.plural cls_base = NamespacedAPIObject if is_namespaced else APIObject From f75fa7a45d8fd2df805806bbba6587bb80fdc6fb Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Tue, 9 Jul 2019 12:34:57 +0200 Subject: [PATCH 15/16] Remove a file that accidentally sneaked in --- kopf/logging/logging.py | 41 ----------------------------------------- 1 file changed, 41 deletions(-) delete mode 100644 kopf/logging/logging.py diff --git a/kopf/logging/logging.py b/kopf/logging/logging.py deleted file mode 100644 index 1bed3560..00000000 --- a/kopf/logging/logging.py +++ /dev/null @@ -1,41 +0,0 @@ -import asyncio -import logging - -format = '[%(asctime)s] %(name)-20.20s [%(levelname)-8.8s] %(message)s' - - -def configure(debug=None, verbose=None, quiet=None): - log_level = 'DEBUG' if debug or verbose else 'WARNING' if quiet else 'INFO' - - logger = logging.getLogger() - handler = logging.StreamHandler() - formatter = logging.Formatter(format) - handler.setFormatter(formatter) - logger.addHandler(handler) - logger.setLevel(log_level) - - # Configure the Kubernetes client defaults according to our settings. - try: - import kubernetes - except ImportError: - pass - else: - config = kubernetes.client.configuration.Configuration() - config.logger_format = format - config.logger_file = None # once again after the constructor to re-apply the formatter - config.debug = debug - kubernetes.client.configuration.Configuration.set_default(config) - - # Kubernetes client is as buggy as hell: it adds its own stream handlers even in non-debug mode, - # does not respect the formatting, and dumps too much of the low-level info. - if not debug: - logger = logging.getLogger("urllib3") - del logger.handlers[1:] # everything except the default NullHandler - - # Prevent the low-level logging unless in the debug verbosity mode. Keep only the operator's messages. - logging.getLogger('urllib3').propagate = debug - logging.getLogger('asyncio').propagate = debug - logging.getLogger('kubernetes').propagate = debug - - loop = asyncio.get_event_loop() - loop.set_debug(debug) From f105def0738a0e7dc43804ea7e4cf9cc9fff5a60 Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Tue, 9 Jul 2019 12:50:39 +0200 Subject: [PATCH 16/16] Switch to pykube-ng version with patching & CRDs added --- kopf/clients/classes.py | 29 ++--------------------------- kopf/clients/fetching.py | 2 +- setup.py | 2 +- 3 files changed, 4 insertions(+), 29 deletions(-) diff --git a/kopf/clients/classes.py b/kopf/clients/classes.py index 218ba825..b4e5c54a 100644 --- a/kopf/clients/classes.py +++ b/kopf/clients/classes.py @@ -1,4 +1,3 @@ -import json from typing import Type import pykube @@ -6,31 +5,7 @@ from kopf.clients import auth -# TODO: this mixin has to be migrated into pykube-ng itself. -class APIObject(pykube.objects.APIObject): - def patch(self, patch): - ''' - Patch the Kubernetes resource by calling the API. - ''' - r = self.api.patch(**self.api_kwargs( - headers={"Content-Type": "application/merge-patch+json"}, - data=json.dumps(patch), - )) - self.api.raise_for_status(r) - self.set_obj(r.json()) - - -class NamespacedAPIObject(pykube.objects.NamespacedAPIObject, APIObject): - pass - - -class CustomResourceDefinition(APIObject): - version = "apiextensions.k8s.io/v1beta1" - endpoint = "customresourcedefinitions" - kind = "CustomResourceDefinition" - - -def _make_cls(resource) -> Type[APIObject]: +def _make_cls(resource) -> Type[pykube.objects.APIObject]: api = auth.get_pykube_api() api_resources = api.resource_list(resource.api_version)['resources'] resource_kind = next((r['kind'] for r in api_resources if r['name'] == resource.plural), None) @@ -39,7 +14,7 @@ def _make_cls(resource) -> Type[APIObject]: raise pykube.ObjectDoesNotExist(f"No such CRD: {resource.name}") cls_name = resource.plural - cls_base = NamespacedAPIObject if is_namespaced else APIObject + cls_base = pykube.objects.NamespacedAPIObject if is_namespaced else pykube.objects.APIObject cls = type(cls_name, (cls_base,), { 'version': resource.api_version, 'endpoint': resource.plural, diff --git a/kopf/clients/fetching.py b/kopf/clients/fetching.py index 0fa9923f..57b6b313 100644 --- a/kopf/clients/fetching.py +++ b/kopf/clients/fetching.py @@ -10,7 +10,7 @@ def read_crd(*, resource, default=_UNSET_): try: api = auth.get_pykube_api() - cls = classes.CustomResourceDefinition + cls = pykube.CustomResourceDefinition obj = cls.objects(api, namespace=None).get_by_name(name=resource.name) return obj.obj diff --git a/setup.py b/setup.py index 75bc7a58..8aac822c 100644 --- a/setup.py +++ b/setup.py @@ -43,6 +43,6 @@ 'click', 'iso8601', 'aiojobs', - 'pykube-ng', + 'pykube-ng>=0.25', ], )