Skip to content

Commit

Permalink
Update lint rule.
Browse files Browse the repository at this point in the history
  • Loading branch information
dajiaji committed Dec 11, 2021
1 parent 881eac9 commit 166b7e0
Show file tree
Hide file tree
Showing 36 changed files with 268 additions and 799 deletions.
4 changes: 4 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ Changes
Unreleased
----------

- Migrate to poetry. `#191 <https://github.com/dajiaji/python-cwt/pull/191>`__
- Change max line length to 128. `#191 <https://github.com/dajiaji/python-cwt/pull/191>`__
- Fix updated flake8 warning. `#191 <https://github.com/dajiaji/python-cwt/pull/191>`__

Version 1.4.2
-------------

Expand Down
8 changes: 2 additions & 6 deletions cwt/algs/asymmetric.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,8 @@ def validate_certificate(self, ca_certs: List[bytes]) -> bool:

ctx = ValidationContext(trust_roots=ca_certs)
try:
validator = CertificateValidator(
self._cert, self._intermediates, validation_context=ctx
)
validator = CertificateValidator(self._cert, self._intermediates, validation_context=ctx)
validator.validate_usage(set(["digital_signature"]), extended_optional=True)
except Exception as err:
raise VerifyError(
"Failed to validate the certificate bound to the key."
) from err
raise VerifyError("Failed to validate the certificate bound to the key.") from err
return True
46 changes: 11 additions & 35 deletions cwt/algs/ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,7 @@ def __init__(self, params: Dict[int, Any]):
if not (set(self._key_ops) & set([1, 2])):
raise ValueError("Invalid key_ops for signing key.")
if set(self._key_ops) & set([7, 8]):
raise ValueError(
"Signing key should not be used for key derivation."
)
raise ValueError("Signing key should not be used for key derivation.")
else:
# public key for signing.
if 2 not in self._key_ops or len(self._key_ops) != 1:
Expand All @@ -114,14 +112,10 @@ def __init__(self, params: Dict[int, Any]):
if not (set(self._key_ops) & set([7, 8])):
raise ValueError("Invalid key_ops for key derivation.")
if set(self._key_ops) & set([1, 2]):
raise ValueError(
"ECDHE key should not be used for signing."
)
raise ValueError("ECDHE key should not be used for signing.")
else:
# public key for key derivation.
raise ValueError(
"Public key for ECDHE should not have key_ops."
)
raise ValueError("Public key for ECDHE should not have key_ops.")
else:
if -2 not in params and -3 not in params:
# private key for key derivation.
Expand All @@ -134,9 +128,7 @@ def __init__(self, params: Dict[int, Any]):
if set(self._key_ops) & set([1, 2]):
# private key for signing.
if set(self._key_ops) & set([7, 8]):
raise ValueError(
"EC2 Private key should not be used for both signing and key derivation."
)
raise ValueError("EC2 Private key should not be used for both signing and key derivation.")
if self._crv == 1:
self._alg = -7 # ES256
elif self._crv == 2:
Expand Down Expand Up @@ -198,9 +190,7 @@ def __init__(self, params: Dict[int, Any]):
raise ValueError("d(-4) should be bytes(bstr).")
self._d = params[-4]
if len(self._d) != len(self._x):
raise ValueError(
f"d(-4) should be {len(self._x)} bytes for curve {self._crv}"
)
raise ValueError(f"d(-4) should be {len(self._x)} bytes for curve {self._crv}")
try:
self._private_key = ec.EllipticCurvePrivateNumbers(
int.from_bytes(self._d, byteorder="big"), public_numbers
Expand All @@ -211,9 +201,7 @@ def __init__(self, params: Dict[int, Any]):
return

@staticmethod
def to_cose_key(
k: Union[EllipticCurvePrivateKey, EllipticCurvePublicKey]
) -> Dict[int, Any]:
def to_cose_key(k: Union[EllipticCurvePrivateKey, EllipticCurvePublicKey]) -> Dict[int, Any]:
key_len: int = 32
cose_key: Dict[int, Any] = {}

Expand All @@ -234,15 +222,9 @@ def to_cose_key(
cose_key[-2] = k.public_numbers().x.to_bytes(key_len, byteorder="big")
cose_key[-3] = k.public_numbers().y.to_bytes(key_len, byteorder="big")
return cose_key
cose_key[-2] = (
k.public_key().public_numbers().x.to_bytes(key_len, byteorder="big")
)
cose_key[-3] = (
k.public_key().public_numbers().y.to_bytes(key_len, byteorder="big")
)
cose_key[-4] = k.private_numbers().private_value.to_bytes(
key_len, byteorder="big"
)
cose_key[-2] = k.public_key().public_numbers().x.to_bytes(key_len, byteorder="big")
cose_key[-3] = k.public_key().public_numbers().y.to_bytes(key_len, byteorder="big")
cose_key[-4] = k.private_numbers().private_value.to_bytes(key_len, byteorder="big")
return cose_key

@property
Expand Down Expand Up @@ -275,9 +257,7 @@ def verify(self, msg: bytes, sig: bytes):
try:
if self._private_key:
der_sig = self._os_to_der(self._private_key.curve.key_size, sig)
self._private_key.public_key().verify(
der_sig, msg, ec.ECDSA(self._hash_alg())
)
self._private_key.public_key().verify(der_sig, msg, ec.ECDSA(self._hash_alg()))
else:
der_sig = self._os_to_der(self._public_key.curve.key_size, sig)
self._public_key.verify(der_sig, msg, ec.ECDSA(self._hash_alg()))
Expand Down Expand Up @@ -309,11 +289,7 @@ def derive_key(
self._validate_context(context)

# Derive key.
self._key = (
self._private_key
if self._private_key
else ec.generate_private_key(self._crv_obj)
)
self._key = self._private_key if self._private_key else ec.generate_private_key(self._crv_obj)
shared_key = self._key.exchange(ec.ECDH(), public_key.key)
hkdf = HKDF(
algorithm=self._hash_alg(),
Expand Down
62 changes: 15 additions & 47 deletions cwt/algs/okp.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,7 @@ def __init__(self, params: Dict[int, Any]):
elif self._alg in [-26, -28]:
self._hash_alg = hashes.SHA512
else:
raise ValueError(
f"Unsupported or unknown alg used with X25519/X448: {self._alg}."
)
raise ValueError(f"Unsupported or unknown alg used with X25519/X448: {self._alg}.")

# Validate alg and key_ops.
if self._key_ops:
Expand All @@ -99,9 +97,7 @@ def __init__(self, params: Dict[int, Any]):
if not (set(self._key_ops) & set([1, 2])):
raise ValueError("Invalid key_ops for signing key.")
if set(self._key_ops) & set([7, 8]):
raise ValueError(
"Signing key should not be used for key derivation."
)
raise ValueError("Signing key should not be used for key derivation.")
else:
# public key for signing.
if 2 not in self._key_ops or len(self._key_ops) != 1:
Expand All @@ -112,15 +108,11 @@ def __init__(self, params: Dict[int, Any]):
if not (set(self._key_ops) & set([7, 8])):
raise ValueError("Invalid key_ops for key derivation.")
if set(self._key_ops) & set([1, 2]):
raise ValueError(
"Private key for ECDHE should not be used for signing."
)
raise ValueError("Private key for ECDHE should not be used for signing.")
else:
# public key for key derivation.
if self._key_ops:
raise ValueError(
"Public key for ECDHE should not have key_ops."
)
raise ValueError("Public key for ECDHE should not have key_ops.")
else:
raise ValueError(f"Unsupported or unknown alg(3) for OKP: {self._alg}.")
else:
Expand All @@ -129,9 +121,7 @@ def __init__(self, params: Dict[int, Any]):
if set(self._key_ops) & set([1, 2]):
# private key for signing.
if set(self._key_ops) & set([7, 8]):
raise ValueError(
"OKP private key should not be used for both signing and key derivation."
)
raise ValueError("OKP private key should not be used for both signing and key derivation.")
self._alg = -8 # EdDSA
else:
# public key.
Expand Down Expand Up @@ -206,45 +196,29 @@ def to_cose_key(
if isinstance(k, Ed25519PublicKey):
cose_key[-2] = k.public_bytes(Encoding.Raw, PublicFormat.Raw)
else:
cose_key[-2] = k.public_key().public_bytes(
Encoding.Raw, PublicFormat.Raw
)
cose_key[-4] = k.private_bytes(
Encoding.Raw, PrivateFormat.Raw, NoEncryption()
)
cose_key[-2] = k.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw)
cose_key[-4] = k.private_bytes(Encoding.Raw, PrivateFormat.Raw, NoEncryption())
elif isinstance(k, (Ed448PublicKey, Ed448PrivateKey)):
cose_key[-1] = 7 # Ed448
if isinstance(k, Ed448PublicKey):
cose_key[-2] = k.public_bytes(Encoding.Raw, PublicFormat.Raw)
else:
cose_key[-2] = k.public_key().public_bytes(
Encoding.Raw, PublicFormat.Raw
)
cose_key[-4] = k.private_bytes(
Encoding.Raw, PrivateFormat.Raw, NoEncryption()
)
cose_key[-2] = k.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw)
cose_key[-4] = k.private_bytes(Encoding.Raw, PrivateFormat.Raw, NoEncryption())
elif isinstance(k, (X25519PublicKey, X25519PrivateKey)):
cose_key[-1] = 4 # X25519
if isinstance(k, X25519PublicKey):
cose_key[-2] = k.public_bytes(Encoding.Raw, PublicFormat.Raw)
else:
cose_key[-2] = k.public_key().public_bytes(
Encoding.Raw, PublicFormat.Raw
)
cose_key[-4] = k.private_bytes(
Encoding.Raw, PrivateFormat.Raw, NoEncryption()
)
cose_key[-2] = k.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw)
cose_key[-4] = k.private_bytes(Encoding.Raw, PrivateFormat.Raw, NoEncryption())
elif isinstance(k, (X448PublicKey, X448PrivateKey)):
cose_key[-1] = 5 # X448
if isinstance(k, X448PublicKey):
cose_key[-2] = k.public_bytes(Encoding.Raw, PublicFormat.Raw)
else:
cose_key[-2] = k.public_key().public_bytes(
Encoding.Raw, PublicFormat.Raw
)
cose_key[-4] = k.private_bytes(
Encoding.Raw, PrivateFormat.Raw, NoEncryption()
)
cose_key[-2] = k.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw)
cose_key[-4] = k.private_bytes(Encoding.Raw, PrivateFormat.Raw, NoEncryption())
else:
raise ValueError("Unsupported or unknown key for OKP.")
return cose_key
Expand Down Expand Up @@ -304,9 +278,7 @@ def derive_key(
raise ValueError("Public key cannot be used for key derivation.")
if not public_key:
raise ValueError("public_key should be set.")
if not isinstance(public_key.key, X25519PublicKey) and not isinstance(
public_key.key, X448PublicKey
):
if not isinstance(public_key.key, X25519PublicKey) and not isinstance(public_key.key, X448PublicKey):
raise ValueError("public_key should be x25519/x448 public key.")
# if self._alg not in COSE_ALGORITHMS_CKDM_KEY_AGREEMENT.values():
# raise ValueError(f"Invalid alg for key derivation: {self._alg}.")
Expand All @@ -321,11 +293,7 @@ def derive_key(
if self._private_key:
self._key = self._private_key
else:
self._key = (
X25519PrivateKey.generate()
if self._crv == 4
else X448PrivateKey.generate()
)
self._key = X25519PrivateKey.generate() if self._crv == 4 else X448PrivateKey.generate()
shared_key = self._key.exchange(public_key.key)
hkdf = HKDF(
algorithm=self._hash_alg(),
Expand Down
28 changes: 6 additions & 22 deletions cwt/algs/rsa.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@ def __init__(self, params: Dict[int, Any]):
else:
raise ValueError(f"Unsupported or unknown alg(3) for RSA: {params[3]}.")
if params[3] in [-37, -38, -39]:
self._padding = padding.PSS(
mgf=padding.MGF1(self._hash()), salt_length=padding.PSS.MAX_LENGTH
)
self._padding = padding.PSS(mgf=padding.MGF1(self._hash()), salt_length=padding.PSS.MAX_LENGTH)
else:
self._padding = padding.PKCS1v15()

Expand All @@ -60,28 +58,16 @@ def __init__(self, params: Dict[int, Any]):
if not self._key_ops:
self._key_ops = RSAKey._ACCEPTABLE_PUBLIC_KEY_OPS
else:
prohibited = [
ops
for ops in self._key_ops
if ops not in RSAKey._ACCEPTABLE_PUBLIC_KEY_OPS
]
prohibited = [ops for ops in self._key_ops if ops not in RSAKey._ACCEPTABLE_PUBLIC_KEY_OPS]
if prohibited:
raise ValueError(
f"Unknown or not permissible key_ops(4) for RSAKey: {prohibited[0]}."
)
raise ValueError(f"Unknown or not permissible key_ops(4) for RSAKey: {prohibited[0]}.")
else:
if not self._key_ops:
self._key_ops = RSAKey._ACCEPTABLE_PRIVATE_KEY_OPS
else:
prohibited = [
ops
for ops in self._key_ops
if ops not in RSAKey._ACCEPTABLE_PRIVATE_KEY_OPS
]
prohibited = [ops for ops in self._key_ops if ops not in RSAKey._ACCEPTABLE_PRIVATE_KEY_OPS]
if prohibited:
raise ValueError(
f"Unknown or not permissible key_ops(4) for RSAKey: {prohibited[0]}."
)
raise ValueError(f"Unknown or not permissible key_ops(4) for RSAKey: {prohibited[0]}.")

# Validate RSA specific parameters.
if -1 not in params or not isinstance(params[-1], bytes):
Expand All @@ -97,9 +83,7 @@ def __init__(self, params: Dict[int, Any]):
if -3 not in params: # the RSA private exponent d.
private_props = [p for p in params.keys() if p in [-4, -5, -6, -7, -8]]
if private_props:
raise ValueError(
f"RSA public key should not have private parameter: {private_props[0]}."
)
raise ValueError(f"RSA public key should not have private parameter: {private_props[0]}.")
self._key = public_numbers.public_key()
return

Expand Down
Loading

0 comments on commit 166b7e0

Please sign in to comment.