Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to turn off autorotate for orphaned certs #4835

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ Changelog
Unreleased
~~~~~~~~~~~~~~~~~~~~
Added Google CA issuer plugin. This plugin creates certificates via Google CA Manager API.
Added CERTIFICATE_CREATE_REQUEST_VALIDATION: a configurable, plugin-independent callback that can be used to reject requests
based on your specific business logic. For example, you could disallow certs with rotate set and no destinations to reduce
volume of unused certs.
Added the automatically_disable_autorotate_without_endpoint_or_destination celery task, along with a customizable DISABLE_AUTOROTATION_FILTER
function you can use to determine when to disable autorotate. By default, nothing will be changed by this task when scheduled.

1.7.0 - `2024-01-17`
~~~~~~~~~~~~~~~~~~~~
Expand Down
42 changes: 41 additions & 1 deletion lemur/certificates/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
list_recent_valid_certs_issued_by_authority,
get_certificates_with_same_cn_with_rotate_on,
identify_and_persist_expiring_deployed_certificates,
send_certificate_expiration_metrics
send_certificate_expiration_metrics, get_all_certs_not_attached_to_endpoint_or_destination_with_autorotate
)
from lemur.certificates.verify import verify_string
from lemur.constants import SUCCESS_METRIC_STATUS, FAILURE_METRIC_STATUS, CRLReason
Expand Down Expand Up @@ -877,6 +877,46 @@ def automatically_enable_autorotate_with_endpoint():
database.update(cert)


@cli.command("automatically_disable_autorotate_without_endpoint_or_destination")
def automatically_disable_autorotate_without_endpoint_or_destination_command():
automatically_disable_autorotate_without_endpoint_or_destination()


def automatically_disable_autorotate_without_endpoint_or_destination():
"""
This function automatically disables auto-rotation for unexpired certificates that are
not attached to an endpoint or destination but have autorotate enabled.

WARNING: This will overwrite the Auto-rotate toggle!
"""
log_data = {
"function": f"{__name__}.{sys._getframe().f_code.co_name}",
"message": "Disabling auto-rotate for certificate"
}

eligible_certs = get_all_certs_not_attached_to_endpoint_or_destination_with_autorotate()
for cert in eligible_certs:
if not isinstance(callable, current_app.config.get("DISABLE_AUTOROTATION_FILTER")) or not current_app.config.get("DISABLE_AUTOROTATION_FILTER")(cert):
continue

log_data["certificate"] = cert.name
log_data["certificate_id"] = cert.id
log_data["authority_id"] = cert.authority_id
log_data["authority_name"] = authorities_get_by_id(cert.authority_id).name
log_data["destination_names"] = "NONE"
current_app.logger.info(log_data)
metrics.send("automatically_disable_autorotate_without_endpoint_or_destination",
"counter", 1,
metric_tags={"certificate": log_data["certificate"],
"certificate_id": log_data["certificate_id"],
"authority_id": log_data["authority_id"],
"authority_name": log_data["authority_name"],
"destination_names": log_data["destination_names"]
})
cert.rotation = False
database.update(cert)


@cli.command("automatically_enable_autorotate_with_destination")
def automatically_enable_autorotate_with_destination_command():
automatically_enable_autorotate_with_destination()
Expand Down
17 changes: 17 additions & 0 deletions lemur/certificates/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,23 @@ def get_all_certs_attached_to_endpoint_without_autorotate():
)


def get_all_certs_not_attached_to_endpoint_or_destination_with_autorotate():
"""
Retrieves all certificates that are not attached to an endpoint or destinations, but that have autorotate enabled.

:return: list of certificates not attached to an endpoint or destination with autorotate
"""
return (
Certificate.query.filter(Certificate.endpoints.none())
.filter(Certificate.destinations.none())
.filter(Certificate.rotation == true())
.filter(Certificate.revoked == false())
.filter(Certificate.not_after >= arrow.now())
.filter(not_(Certificate.replaced.any()))
.all() # noqa
)


def get_all_certs_attached_to_destination_without_autorotate(plugin_name=None):
"""
Retrieves all certificates that are attached to a destination, but that do not have autorotate enabled.
Expand Down
5 changes: 5 additions & 0 deletions lemur/certificates/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,11 @@ def post(self, data=None):
if not validators.is_valid_owner(data["owner"]):
return dict(message=f"Invalid owner: check if {data['owner']} is a valid group email. Individuals cannot be certificate owners."), 412

if isinstance(current_app.config.get("CERTIFICATE_CREATE_REQUEST_VALIDATION")):
message, code = current_app.config.get("CERTIFICATE_CREATE_REQUEST_VALIDATION")(data)
if message and code:
return dict(message=message), code

role = role_service.get_by_name(data["authority"].owner)

# all the authority role members should be allowed
Expand Down
24 changes: 24 additions & 0 deletions lemur/common/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,30 @@ def enable_autorotate_for_certs_attached_to_destination():
return log_data


@celery_app.task(soft_time_limit=3600)
def automatically_disable_autorotate_without_endpoint_or_destination():
"""
This celery task automatically disables autorotation for unexpired certificates that are
attached to no destinations or endpoints and have autorotate enabled.
:return:
"""
function = f"{__name__}.{sys._getframe().f_code.co_name}"
task_id = None
if celery_app.current_task:
task_id = celery_app.current_task.request.id

log_data = {
"function": function,
"task_id": task_id,
"message": "Disabling autorotate to eligible certificates",
}
current_app.logger.debug(log_data)

cli_certificate.automatically_disable_autorotate_without_endpoint_or_destination()
metrics.send(f"{function}.success", "counter", 1)
return log_data


@celery_app.task(soft_time_limit=3600)
def deactivate_entrust_test_certificates():
"""
Expand Down