diff --git a/CHANGELOG.rst b/CHANGELOG.rst index df91238b63..61d5344e7d 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -4,6 +4,11 @@ Changelog Unreleased ~~~~~~~~~~~~~~~~~~~~ Added Google CA issuer plugin. This plugin creates certificates via Google CA Manager API. +Added CERTIFICATE_CREATE_REQUEST_VALIDATION: a configurable, plugin-independent callback that can be used to reject requests +based on your specific business logic. For example, you could disallow certs with rotate set and no destinations to reduce +volume of unused certs. +Added the automatically_disable_autorotate_without_endpoint_or_destination celery task, along with a customizable DISABLE_AUTOROTATION_FILTER +function you can use to determine when to disable autorotate. By default, nothing will be changed by this task when scheduled. 1.7.0 - `2024-01-17` ~~~~~~~~~~~~~~~~~~~~ diff --git a/lemur/certificates/cli.py b/lemur/certificates/cli.py index 32ab738c40..c3f222885b 100644 --- a/lemur/certificates/cli.py +++ b/lemur/certificates/cli.py @@ -35,7 +35,7 @@ list_recent_valid_certs_issued_by_authority, get_certificates_with_same_cn_with_rotate_on, identify_and_persist_expiring_deployed_certificates, - send_certificate_expiration_metrics + send_certificate_expiration_metrics, get_all_certs_not_attached_to_endpoint_or_destination_with_autorotate ) from lemur.certificates.verify import verify_string from lemur.constants import SUCCESS_METRIC_STATUS, FAILURE_METRIC_STATUS, CRLReason @@ -877,6 +877,46 @@ def automatically_enable_autorotate_with_endpoint(): database.update(cert) +@cli.command("automatically_disable_autorotate_without_endpoint_or_destination") +def automatically_disable_autorotate_without_endpoint_or_destination_command(): + automatically_disable_autorotate_without_endpoint_or_destination() + + +def automatically_disable_autorotate_without_endpoint_or_destination(): + """ + This function automatically disables auto-rotation for unexpired certificates that are + not attached to an endpoint or destination but have autorotate enabled. + + WARNING: This will overwrite the Auto-rotate toggle! + """ + log_data = { + "function": f"{__name__}.{sys._getframe().f_code.co_name}", + "message": "Disabling auto-rotate for certificate" + } + + eligible_certs = get_all_certs_not_attached_to_endpoint_or_destination_with_autorotate() + for cert in eligible_certs: + if not isinstance(callable, current_app.config.get("DISABLE_AUTOROTATION_FILTER")) or not current_app.config.get("DISABLE_AUTOROTATION_FILTER")(cert): + continue + + log_data["certificate"] = cert.name + log_data["certificate_id"] = cert.id + log_data["authority_id"] = cert.authority_id + log_data["authority_name"] = authorities_get_by_id(cert.authority_id).name + log_data["destination_names"] = "NONE" + current_app.logger.info(log_data) + metrics.send("automatically_disable_autorotate_without_endpoint_or_destination", + "counter", 1, + metric_tags={"certificate": log_data["certificate"], + "certificate_id": log_data["certificate_id"], + "authority_id": log_data["authority_id"], + "authority_name": log_data["authority_name"], + "destination_names": log_data["destination_names"] + }) + cert.rotation = False + database.update(cert) + + @cli.command("automatically_enable_autorotate_with_destination") def automatically_enable_autorotate_with_destination_command(): automatically_enable_autorotate_with_destination() diff --git a/lemur/certificates/service.py b/lemur/certificates/service.py index 9337e356f7..fe93b1b567 100644 --- a/lemur/certificates/service.py +++ b/lemur/certificates/service.py @@ -183,6 +183,23 @@ def get_all_certs_attached_to_endpoint_without_autorotate(): ) +def get_all_certs_not_attached_to_endpoint_or_destination_with_autorotate(): + """ + Retrieves all certificates that are not attached to an endpoint or destinations, but that have autorotate enabled. + + :return: list of certificates not attached to an endpoint or destination with autorotate + """ + return ( + Certificate.query.filter(Certificate.endpoints.none()) + .filter(Certificate.destinations.none()) + .filter(Certificate.rotation == true()) + .filter(Certificate.revoked == false()) + .filter(Certificate.not_after >= arrow.now()) + .filter(not_(Certificate.replaced.any())) + .all() # noqa + ) + + def get_all_certs_attached_to_destination_without_autorotate(plugin_name=None): """ Retrieves all certificates that are attached to a destination, but that do not have autorotate enabled. diff --git a/lemur/certificates/views.py b/lemur/certificates/views.py index 982c84f0fc..5635577bda 100644 --- a/lemur/certificates/views.py +++ b/lemur/certificates/views.py @@ -509,6 +509,11 @@ def post(self, data=None): if not validators.is_valid_owner(data["owner"]): return dict(message=f"Invalid owner: check if {data['owner']} is a valid group email. Individuals cannot be certificate owners."), 412 + if isinstance(current_app.config.get("CERTIFICATE_CREATE_REQUEST_VALIDATION")): + message, code = current_app.config.get("CERTIFICATE_CREATE_REQUEST_VALIDATION")(data) + if message and code: + return dict(message=message), code + role = role_service.get_by_name(data["authority"].owner) # all the authority role members should be allowed diff --git a/lemur/common/celery.py b/lemur/common/celery.py index d66841a138..33565be4a0 100644 --- a/lemur/common/celery.py +++ b/lemur/common/celery.py @@ -880,6 +880,30 @@ def enable_autorotate_for_certs_attached_to_destination(): return log_data +@celery_app.task(soft_time_limit=3600) +def automatically_disable_autorotate_without_endpoint_or_destination(): + """ + This celery task automatically disables autorotation for unexpired certificates that are + attached to no destinations or endpoints and have autorotate enabled. + :return: + """ + function = f"{__name__}.{sys._getframe().f_code.co_name}" + task_id = None + if celery_app.current_task: + task_id = celery_app.current_task.request.id + + log_data = { + "function": function, + "task_id": task_id, + "message": "Disabling autorotate to eligible certificates", + } + current_app.logger.debug(log_data) + + cli_certificate.automatically_disable_autorotate_without_endpoint_or_destination() + metrics.send(f"{function}.success", "counter", 1) + return log_data + + @celery_app.task(soft_time_limit=3600) def deactivate_entrust_test_certificates(): """