Skip to content

Commit d4f4ac2

Browse files
committed
use unified interface for generating OCSP keys
1 parent 21bb325 commit d4f4ac2

File tree

14 files changed

+100
-126
lines changed

14 files changed

+100
-126
lines changed

ca/ca/settings.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@
112112

113113
# Make this unique, and don't share it with anybody.
114114
SECRET_KEY = ""
115-
SECRET_KEY_FILE = None
115+
SECRET_KEY_FILE = ""
116116

117117
MIDDLEWARE = [
118118
"django.middleware.security.SecurityMiddleware",

ca/ca/settings_utils.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,12 +180,13 @@ class ProjectSettingsModelMixin:
180180
"This setting has no effect if you define the ``LOGGING`` setting."
181181
)
182182
STORAGES: dict[str, dict[str, Any]] = Field(default_factory=dict)
183-
SECRET_KEY_FILE: str | None = Field(
184-
default=None,
183+
SECRET_KEY_FILE: str = Field(
184+
default="",
185185
description="A path to a file that stores Django's `SECRET_KEY "
186186
"<https://docs.djangoproject.com/en/dev/ref/settings/#std:setting-SECRET_KEY>`_. The setting is only "
187187
"used if no ``SECRET_KEY`` is defined.",
188188
examples=["/var/lib/django-ca/certs/ca/shared/secret_key"],
189+
json_schema_extra={"default_explanation": "Not set."},
189190
)
190191
USE_TZ: bool
191192

ca/django_ca/celery/messages.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ class UseCertificateAuthoritiesTaskArgs(CeleryMessageModel):
4646

4747
@model_validator(mode="after")
4848
def validate_exclude(self) -> Self:
49+
"""Validator to make sure that not both `serials` and `exclude` is set."""
4950
if self.serials and self.exclude:
5051
raise ValueError("Message cannot contain both serials and excluded serials.")
5152
return self

ca/django_ca/management/base.py

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,8 @@
1414
"""Command subclasses and argparse helpers for django-ca."""
1515

1616
import abc
17-
import argparse
18-
import typing
19-
from collections.abc import Callable
2017
from datetime import UTC, datetime, timedelta
21-
from typing import Any
18+
from typing import TYPE_CHECKING, Any, ClassVar
2219

2320
from cryptography import x509
2421
from cryptography.x509.oid import AuthorityInformationAccessOID
@@ -44,7 +41,8 @@
4441
ConfigurableExtensionType,
4542
)
4643

47-
if typing.TYPE_CHECKING:
44+
if TYPE_CHECKING:
45+
from celery.local import Proxy # celery.local is defined in our stubs
4846
from django_stubs_ext import StrOrPromise
4947

5048

@@ -381,7 +379,7 @@ class BaseSignCertCommand(UsePrivateKeyMixin, BaseSignCommand, metaclass=abc.ABC
381379
"""Base class for commands signing certificates (sign_cert, resign_cert)."""
382380

383381
add_extensions_help = "" # concrete classes should set this
384-
subject_help: typing.ClassVar[str] # concrete classes should set this
382+
subject_help: ClassVar[str] # concrete classes should set this
385383

386384
def add_base_args(self, parser: CommandParser, no_default_ca: bool = False) -> ArgumentGroup:
387385
"""Add common arguments for signing certificates."""
@@ -584,10 +582,10 @@ class GenerateCommandBase(UsePrivateKeyMixin, BaseCommand):
584582
"""Base class for commands to generate CRLs and OCSP keys."""
585583

586584
what: str
587-
single_task: Callable[..., Any]
588-
multiple_task: Callable[..., Any]
585+
single_task: "Proxy[[UseCertificateAuthorityTaskArgs], Any]"
586+
multiple_task: "Proxy[[UseCertificateAuthoritiesTaskArgs], Any]"
589587

590-
def add_arguments(self, parser: argparse.ArgumentParser) -> None:
588+
def add_arguments(self, parser: CommandParser) -> None:
591589
parser.add_argument(
592590
"serials",
593591
metavar="SERIAL",
@@ -614,14 +612,23 @@ def handle(self, serials: list[str], exclude: list[str], force: bool, **options:
614612
raise CommandError("Cannot name serials and exclude list at the same time.")
615613

616614
if len(serials) == 1:
617-
ca, key_backend_options = self.get_key_backend_options(serials[0], options)
618-
619-
data = UseCertificateAuthorityTaskArgs(
620-
serial=ca.serial,
621-
force=force,
622-
key_backend_options=key_backend_options.model_dump(mode="json", exclude_unset=True),
623-
)
624-
run_task(self.single_task, data)
615+
try:
616+
ca, key_backend_options = self.get_key_backend_options(serials[0], options)
617+
618+
single_data = UseCertificateAuthorityTaskArgs(
619+
serial=ca.serial,
620+
force=force,
621+
key_backend_options=key_backend_options.model_dump(mode="json", exclude_unset=True),
622+
)
623+
624+
run_task(self.single_task, single_data)
625+
except Exception as ex:
626+
# Exceptions only happen if Celery is not used or the connection to the broker fails
627+
raise CommandError(ex) from ex
625628
else:
626-
data = UseCertificateAuthoritiesTaskArgs(serials=serials, exclude=exclude, force=force)
627-
run_task(self.multiple_task, data)
629+
multiple_data = UseCertificateAuthoritiesTaskArgs(serials=serials, exclude=exclude, force=force)
630+
try:
631+
run_task(self.multiple_task, multiple_data)
632+
except Exception as ex: # pragma: no cover # does not usually happen
633+
# Exceptions only happen if Celery is not used or the connection to the broker fails
634+
raise CommandError(ex) from ex

ca/django_ca/management/commands/cache_crls.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@
2626
class Command(GenerateCrlsCommand): # noqa: D101
2727
help = "(Deprecated) Cache CRLs. Use generate_crls instead."
2828

29-
def handle(self, serials: list[str], **options: Any) -> None:
29+
def handle(self, serials: list[str], exclude: list[str], force: bool, **options: Any) -> None:
3030
self.stderr.write(
3131
self.style.WARNING(
3232
"Warning: This command is deprecated. Please use generate_crls instead. "
3333
"This alias will be removed in django_ca~=3.2.0."
3434
)
3535
)
36-
super().handle(serials, **options)
36+
super().handle(serials, exclude=exclude, force=force, **options)

ca/django_ca/management/commands/generate_ocsp_keys.py

Lines changed: 6 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -16,81 +16,14 @@
1616
.. seealso:: https://docs.djangoproject.com/en/dev/howto/custom-management-commands/
1717
"""
1818

19-
from collections.abc import Iterable
20-
from typing import Any
19+
from django_ca.management.base import GenerateCommandBase
20+
from django_ca.tasks import generate_ocsp_key, generate_ocsp_keys
2121

22-
from pydantic import ValidationError
2322

24-
from django.core.management.base import CommandError, CommandParser
25-
26-
from django_ca.celery import run_task
27-
from django_ca.celery.messages import UseCertificateAuthorityTaskArgs
28-
from django_ca.conf import model_settings
29-
from django_ca.management.base import BaseCommand
30-
from django_ca.management.mixins import UsePrivateKeyMixin
31-
from django_ca.models import CertificateAuthority
32-
from django_ca.tasks import generate_ocsp_key
33-
from django_ca.utils import add_colons
34-
35-
36-
class Command(UsePrivateKeyMixin, BaseCommand):
23+
class Command(GenerateCommandBase):
3724
"""Implement the :command:`manage.py generate_ocsp_keys` command."""
3825

3926
help = "Generate OCSP keys."
40-
41-
def add_arguments(self, parser: CommandParser) -> None:
42-
parser.add_argument(
43-
"serials",
44-
metavar="SERIAL",
45-
nargs="*",
46-
help="Generate OCSP keys only for the given CA. If omitted, generate keys for all CAs.",
47-
)
48-
49-
parser.add_argument(
50-
"--force",
51-
action="store_true",
52-
default=False,
53-
help="Force regeneration of OCSP responder certificates.",
54-
)
55-
parser.add_argument("--quiet", action="store_true", default=False, help="Do not output warnings.")
56-
self.add_use_private_key_arguments(parser)
57-
58-
def handle(self, serials: Iterable[str], quiet: bool, force: bool, **options: Any) -> None:
59-
if not serials:
60-
serials = CertificateAuthority.objects.all().order_by("serial").values_list("serial", flat=True)
61-
62-
if "ocsp" not in model_settings.CA_PROFILES:
63-
raise CommandError("ocsp: Undefined profile.")
64-
65-
errors = 0
66-
for serial in serials:
67-
serial = serial.replace(":", "").strip().upper()
68-
hr_serial = add_colons(serial)
69-
try:
70-
ca: CertificateAuthority = CertificateAuthority.objects.get(serial=serial)
71-
except CertificateAuthority.DoesNotExist:
72-
self.stderr.write(self.style.ERROR(f"{hr_serial}: Unknown CA."))
73-
continue
74-
75-
try:
76-
key_backend_options = ca.key_backend.get_use_private_key_options(ca, options)
77-
except ValidationError as ex:
78-
self.validation_error_to_command_error(ex)
79-
except Exception as ex: # pragma: no cover # pylint: disable=broad-exception-caught
80-
if quiet is False:
81-
self.stderr.write(self.style.WARNING(f"{hr_serial}: {ex}"))
82-
continue
83-
84-
try:
85-
message = UseCertificateAuthorityTaskArgs(
86-
serial=serial,
87-
key_backend_options=key_backend_options.model_dump(mode="json", exclude_unset=True),
88-
force=force,
89-
)
90-
run_task(generate_ocsp_key, message)
91-
except Exception as ex: # pylint: disable=broad-exception-caught
92-
self.stderr.write(f"{serial}: {ex}")
93-
errors += 1
94-
95-
if errors:
96-
raise CommandError(f"Generation of {errors} OCSP key(s) failed.")
27+
what = "OCSP keys"
28+
single_task = generate_ocsp_key
29+
multiple_task = generate_ocsp_keys

ca/django_ca/management/mixins.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -674,9 +674,10 @@ def get_signing_options(
674674

675675
return key_backend_options, algorithm
676676

677-
def get_key_backend_options(
677+
def get_key_backend_options( # type: ignore[return] # b/c of validation_error_to_command_error
678678
self, serial: str, options: dict[str, Any]
679679
) -> tuple[CertificateAuthority, BaseModel]:
680+
"""Get CA and key backend options for the given input data."""
680681
serial = sanitize_serial(serial)
681682
hr_serial = add_colons(serial)
682683
try:
@@ -687,6 +688,6 @@ def get_key_backend_options(
687688
try:
688689
return ca, ca.key_backend.get_use_private_key_options(ca, options)
689690
except ValidationError as ex:
690-
self.validation_error_to_command_error(ex)
691-
except Exception as ex: # pragma: no cover # pylint: disable=broad-exception-caught
691+
self.validation_error_to_command_error(ex) # type: ignore[attr-defined]
692+
except Exception as ex: # pragma: no cover
692693
raise CommandError(str(ex)) from ex

ca/django_ca/tasks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ def generate_crls(data: UseCertificateAuthoritiesTaskArgs | None = None) -> None
126126
# NOTE: When using Celery, an exception will only be raised here if task.delay() itself raises an
127127
# exception, e.g. if the connection to the broker fails. Without celery, exceptions in
128128
# `generate_crl()` are raised here directly.
129-
log.exception("Error caching CRL for %s", serial)
129+
log.exception("Error generating CRL for %s", serial)
130130

131131

132132
@shared_task(base=DjangoCaTask)

ca/django_ca/tests/commands/test_generate_crls.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,4 +95,4 @@ def test_deprecated_cmd_name(usable_root: CertificateAuthority) -> None:
9595
def test_with_serial_and_exclude() -> None:
9696
"""Test passing both an explicit serial and an exclusion (makes no sense)."""
9797
with assert_command_error(r"^Cannot name serials and exclude list at the same time\.$"):
98-
cmd("generate_crls", serials=["abc"], exclude=["def"])
98+
cmd("generate_crls", ["abc"], exclude=["def"])

ca/django_ca/tests/commands/test_generate_ocsp_keys.py

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313

1414
"""Test the generate_ocsp_keys management command."""
1515

16-
import io
1716
import typing
1817
from collections.abc import Iterable
1918
from datetime import UTC, datetime, timedelta
@@ -143,32 +142,28 @@ def test_with_celery(settings: SettingsWrapper, usable_root: CertificateAuthorit
143142
assert_no_key(usable_root.serial)
144143

145144

146-
def test_without_serial(
147-
settings: SettingsWrapper, root: CertificateAuthority, ec: CertificateAuthority
148-
) -> None:
145+
def test_without_serial(settings: SettingsWrapper) -> None:
149146
"""Test for all CAs."""
150147
settings.CA_USE_CELERY = True
151-
kwargs = {"key_backend_options": {"password": None}, "force": False}
152148
with mock_celery_task(
153-
"django_ca.tasks.generate_ocsp_key",
154-
(mock.call(tuple(), {"data": {"serial": root.serial, **kwargs}})),
155-
(mock.call(tuple(), {"data": {"serial": ec.serial, **kwargs}})),
149+
"django_ca.tasks.generate_ocsp_keys",
150+
(mock.call(tuple(), {"data": {"serials": [], "exclude": [], "force": False}})),
156151
):
157152
cmd("generate_ocsp_keys")
158153

159154

160155
@pytest.mark.django_db
161-
def test_wrong_serial() -> None:
162-
"""Try passing an unknown CA."""
163-
generate_ocsp_keys("ZZZZZ", stderr="0Z:ZZ:ZZ: Unknown CA.\n", no_color=True)
156+
def test_unknown_serial() -> None:
157+
"""Try passing an invalid CA."""
158+
with assert_command_error(r"^0A:BC: Unknown CA\.$"):
159+
generate_ocsp_keys("abc")
164160

165161

166-
def test_no_ocsp_profile(settings: SettingsWrapper, root: CertificateAuthority) -> None:
167-
"""Try when there is no OCSP profile."""
168-
settings.CA_PROFILES = {"ocsp": None}
169-
with assert_command_error(r"^ocsp: Undefined profile\.$"):
170-
generate_ocsp_keys(root.serial)
171-
assert_no_key(root.serial)
162+
@pytest.mark.django_db
163+
def test_invalid_serial() -> None:
164+
"""Try passing an invalid CA."""
165+
with assert_command_error(r"^ZZZZZ: Serial has invalid characters$"):
166+
generate_ocsp_keys("ZZZZZ")
172167

173168

174169
def test_deprecated_command_name(usable_root: CertificateAuthority) -> None:
@@ -185,10 +180,8 @@ def test_deprecated_command_name(usable_root: CertificateAuthority) -> None:
185180
@pytest.mark.usefixtures("tmpcadir")
186181
def test_without_private_key(root: CertificateAuthority) -> None:
187182
"""Try regenerating the OCSP key when no CA private key is available."""
188-
stderr_buffer = io.StringIO()
189-
with assert_command_error(r"Generation of 1 OCSP key\(s\) failed\."):
190-
cmd("generate_ocsp_keys", root.serial, stderr=stderr_buffer)
191-
assert "No such file or directory" in stderr_buffer.getvalue()
183+
with assert_command_error(r"No such file or directory"):
184+
cmd("generate_ocsp_keys", root.serial)
192185

193186

194187
def test_model_validation_error(root: CertificateAuthority) -> None:

0 commit comments

Comments
 (0)