-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: v.oleynikov <[email protected]>
- Loading branch information
Showing
38 changed files
with
2,433 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
#!/usr/bin/env python3 | ||
# | ||
# Copyright 2023 Flant JSC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
from deckhouse import hook | ||
from lib.module import module | ||
from typing import Callable | ||
import json | ||
import os | ||
import unittest | ||
|
||
|
||
NAMESPACE = "d8-sds-node-configurator" | ||
MODULE_NAME = "sdsNodeConfigurator" | ||
|
||
def json_load(path: str): | ||
with open(path, "r", encoding="utf-8") as f: | ||
data = json.load(f) | ||
return data | ||
|
||
def get_dir_path() -> str: | ||
return os.path.dirname(os.path.abspath(__file__)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
#!/usr/bin/env python3 | ||
# | ||
# Copyright 2023 Flant JSC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
|
||
from lib.hooks.internal_tls import GenerateCertificateHook, TlsSecret, default_sans | ||
from lib.module import values as module_values | ||
from deckhouse import hook | ||
from typing import Callable | ||
import common | ||
|
||
def main(): | ||
hook = GenerateCertificateHook( | ||
TlsSecret( | ||
cn="webhooks", | ||
name="webhooks-https-certs", | ||
sansGenerator=default_sans([ | ||
"webhooks", | ||
f"webhooks.{common.NAMESPACE}", | ||
f"webhooks.{common.NAMESPACE}.svc"]), | ||
values_path_prefix=f"{common.MODULE_NAME}.internal.customWebhookCert" | ||
), | ||
cn="node-configurator", | ||
common_ca=True, | ||
namespace=common.NAMESPACE) | ||
|
||
hook.run() | ||
|
||
if __name__ == "__main__": | ||
main() |
Empty file.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,265 @@ | ||
# | ||
# Copyright 2023 Flant JSC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import random | ||
import re | ||
from OpenSSL import crypto | ||
from ipaddress import ip_address | ||
from datetime import datetime, timedelta | ||
from lib.certificate.parse import parse_certificate, get_certificate_san | ||
|
||
class Certificate: | ||
def __init__(self, cn: str, expire: int, key_size: int, algo: str) -> None: | ||
self.key = crypto.PKey() | ||
self.__with_key(algo=algo, size=key_size) | ||
self.cert = crypto.X509() | ||
self.cert.set_version(version=2) | ||
self.cert.get_subject().CN = cn | ||
self.cert.set_serial_number(random.getrandbits(64)) | ||
self.cert.gmtime_adj_notBefore(0) | ||
self.cert.gmtime_adj_notAfter(expire) | ||
|
||
def get_subject(self) -> crypto.X509Name: | ||
return self.cert.get_subject() | ||
|
||
def __with_key(self, algo: str, size: int) -> None: | ||
if algo == "rsa": | ||
self.key.generate_key(crypto.TYPE_RSA, size) | ||
elif algo == "dsa": | ||
self.key.generate_key(crypto.TYPE_DSA, size) | ||
else: | ||
raise Exception(f"Algo {algo} is not support. Only [rsa, dsa]") | ||
|
||
def with_metadata(self, country: str = None, | ||
state: str = None, | ||
locality: str = None, | ||
organisation_name: str = None, | ||
organisational_unit_name: str = None): | ||
""" | ||
Adds subjects to certificate. | ||
:param country: Optional. The country of the entity. | ||
:type country: :py:class:`str` | ||
:param state: Optional. The state or province of the entity. | ||
:type state: :py:class:`str` | ||
:param locality: Optional. The locality of the entity | ||
:type locality: :py:class:`str` | ||
:param organisation_name: Optional. The organization name of the entity. | ||
:type organisation_name: :py:class:`str` | ||
:param organisational_unit_name: Optional. The organizational unit of the entity. | ||
:type organisational_unit_name: :py:class:`str` | ||
""" | ||
|
||
if country is not None: | ||
self.cert.get_subject().C = country | ||
if state is not None: | ||
self.cert.get_subject().ST = state | ||
if locality is not None: | ||
self.cert.get_subject().L = locality | ||
if organisation_name is not None: | ||
self.cert.get_subject().O = organisation_name | ||
if organisational_unit_name is not None: | ||
self.cert.get_subject().OU = organisational_unit_name | ||
return self | ||
|
||
def add_extension(self, type_name: str, | ||
critical: bool, | ||
value: str, | ||
subject: crypto.X509 = None, | ||
issuer: crypto.X509 = None): | ||
""" | ||
Adds extensions to certificate. | ||
:param type_name: The name of the type of extension_ to create. | ||
:type type_name: :py:class:`str` | ||
:param critical: A flag indicating whether this is a critical | ||
extension. | ||
:type critical: :py:class:`bool` | ||
:param value: The OpenSSL textual representation of the extension's | ||
value. | ||
:type value: :py:class:`str` | ||
:param subject: Optional X509 certificate to use as subject. | ||
:type subject: :py:class:`crypto.X509` | ||
:param issuer: Optional X509 certificate to use as issuer. | ||
:type issuer: :py:class:`crypto.X509` | ||
""" | ||
ext = crypto.X509Extension(type_name=str.encode(type_name), | ||
critical=critical, | ||
value=str.encode(value), | ||
subject=subject, | ||
issuer=issuer) | ||
self.cert.add_extensions(extensions=[ext]) | ||
return self | ||
|
||
def generate(self) -> (bytes, bytes): | ||
""" | ||
Generate certificate. | ||
:return: (certificate, key) | ||
:rtype: (:py:data:`bytes`, :py:data:`bytes`) | ||
""" | ||
pub = crypto.dump_certificate(crypto.FILETYPE_PEM, self.cert) | ||
priv = crypto.dump_privatekey(crypto.FILETYPE_PEM, self.key) | ||
return pub, priv | ||
|
||
|
||
class CACertificateGenerator(Certificate): | ||
""" | ||
A class representing a generator CA certificate. | ||
""" | ||
def __sign(self) -> None: | ||
self.cert.set_issuer(self.get_subject()) | ||
self.cert.set_pubkey(self.key) | ||
self.cert.sign(self.key, 'sha256') | ||
|
||
def generate(self) -> (bytes, bytes): | ||
""" | ||
Generate CA certificate. | ||
:return: (ca crt, ca key) | ||
:rtype: (:py:data:`bytes`, :py:data:`bytes`) | ||
""" | ||
self.add_extension(type_name="subjectKeyIdentifier", | ||
critical=False, value="hash", subject=self.cert) | ||
self.add_extension(type_name="authorityKeyIdentifier", | ||
critical=False, value="keyid:always", issuer=self.cert) | ||
self.add_extension(type_name="basicConstraints", | ||
critical=False, value="CA:TRUE") | ||
self.add_extension(type_name="keyUsage", critical=False, | ||
value="keyCertSign, cRLSign, keyEncipherment") | ||
self.__sign() | ||
return super().generate() | ||
|
||
|
||
class CertificateGenerator(Certificate): | ||
""" | ||
A class representing a generator certificate. | ||
""" | ||
def with_hosts(self, *hosts: str): | ||
""" | ||
This function is used to add subject alternative names to a certificate. | ||
It takes a variable number of hosts as parameters, and based on the type of host (IP or DNS). | ||
:param hosts: Variable number of hosts to be added as subject alternative names to the certificate. | ||
:type hosts: :py:class:`tuple` | ||
""" | ||
alt_names = [] | ||
for h in hosts: | ||
try: | ||
ip_address(h) | ||
alt_names.append(f"IP:{h}") | ||
except ValueError: | ||
if not is_valid_hostname(h): | ||
continue | ||
alt_names.append(f"DNS:{h}") | ||
self.add_extension("subjectAltName", False, ", ".join(alt_names)) | ||
return self | ||
|
||
def __sign(self, ca_subj: crypto.X509Name, ca_key: crypto.PKey) -> None: | ||
self.cert.set_issuer(ca_subj) | ||
self.cert.set_pubkey(self.key) | ||
self.cert.sign(ca_key, 'sha256') | ||
|
||
def generate(self, ca_subj: crypto.X509Name, ca_key: crypto.PKey) -> (bytes, bytes): | ||
""" | ||
Generate certificate. | ||
:param ca_subj: CA subject. | ||
:type ca_subj: :py:class:`crypto.X509Name` | ||
:param ca_key: CA Key. | ||
:type ca_key: :py:class:`crypto.PKey` | ||
:return: (certificate, key) | ||
:rtype: (:py:data:`bytes`, :py:data:`bytes`) | ||
""" | ||
self.__sign(ca_subj, ca_key) | ||
return super().generate() | ||
|
||
def is_valid_hostname(hostname: str) -> bool: | ||
if len(hostname) > 255: | ||
return False | ||
hostname.rstrip(".") | ||
allowed = re.compile("(?!-)[A-Z\d-]{1,63}(?<!-)$", re.IGNORECASE) | ||
return all(allowed.match(x) for x in hostname.split(".")) | ||
|
||
|
||
def cert_renew_deadline_exceeded(crt: crypto.X509, cert_outdated_duration: timedelta) -> bool: | ||
""" | ||
Check certificate | ||
:param crt: Certificate | ||
:type crt: :py:class:`crypto.X509` | ||
:param cert_outdated_duration: certificate outdated duration | ||
:type cert_outdated_duration: :py:class:`timedelta` | ||
:return: | ||
if timeNow > expire - cert_outdated_duration: | ||
return True | ||
return False | ||
:rtype: :py:class:`bool` | ||
""" | ||
not_after = datetime.strptime( | ||
crt.get_notAfter().decode('ascii'), '%Y%m%d%H%M%SZ') | ||
if datetime.now() > not_after - cert_outdated_duration: | ||
return True | ||
return False | ||
|
||
|
||
def is_outdated_ca(ca: str, cert_outdated_duration: timedelta) -> bool: | ||
""" | ||
Issue a new certificate if there is no CA in the secret. Without CA it is not possible to validate the certificate. | ||
Check CA duration. | ||
:param ca: Raw CA | ||
:type ca: :py:class:`str` | ||
:param cert_outdated_duration: certificate outdated duration | ||
:type cert_outdated_duration: :py:class:`timedelta` | ||
:rtype: :py:class:`bool` | ||
""" | ||
if len(ca) == 0: | ||
return True | ||
crt = parse_certificate(ca) | ||
return cert_renew_deadline_exceeded(crt, cert_outdated_duration) | ||
|
||
|
||
def is_irrelevant_cert(crt_data: str, sans: list, cert_outdated_duration: timedelta) -> bool: | ||
""" | ||
Check certificate duration and SANs list | ||
:param crt_data: Raw certificate | ||
:type crt_data: :py:class:`str` | ||
:param sans: List of sans. | ||
:type sans: :py:class:`list` | ||
:param cert_outdated_duration: certificate outdated duration | ||
:type cert_outdated_duration: :py:class:`timedelta` | ||
:rtype: :py:class:`bool` | ||
""" | ||
if len(crt_data) == 0: | ||
return True | ||
crt = parse_certificate(crt_data) | ||
if cert_renew_deadline_exceeded(crt, cert_outdated_duration): | ||
return True | ||
alt_names = [] | ||
for san in sans: | ||
try: | ||
ip_address(san) | ||
alt_names.append(f"IP Address:{san}") | ||
except ValueError: | ||
alt_names.append(f"DNS:{san}") | ||
cert_sans = get_certificate_san(crt) | ||
cert_sans.sort() | ||
alt_names.sort() | ||
if cert_sans != alt_names: | ||
return True | ||
return False |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
# | ||
# Copyright 2023 Flant JSC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
from OpenSSL import crypto | ||
from pprint import pprint | ||
|
||
|
||
def parse_certificate(crt: str) -> crypto.X509: | ||
return crypto.load_certificate(crypto.FILETYPE_PEM, crt) | ||
|
||
|
||
def get_certificate_san(crt: crypto.X509) -> list[str]: | ||
san = '' | ||
ext_count = crt.get_extension_count() | ||
for i in range(0, ext_count): | ||
ext = crt.get_extension(i) | ||
if 'subjectAltName' in str(ext.get_short_name()): | ||
san = ext.__str__() | ||
return san.split(', ') |
Empty file.
Oops, something went wrong.