diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 426e80d..95d6773 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -19,7 +19,7 @@ jobs: strategy: matrix: platform: [ubuntu-latest, windows-latest] - python-version: ["3.6", "3.7", "3.8", "3.9", "3.10"] + python-version: ["3.6", "3.7", "3.8", "3.9"] steps: - uses: actions/checkout@v2 @@ -29,8 +29,7 @@ jobs: python-version: ${{ matrix.python-version }} - name: Install dependencies run: | - python -m pip install --upgrade pip setuptools wheel - python -m pip install --upgrade coverage[toml] virtualenv tox tox-gh-actions + python -m pip install --upgrade coverage[toml] virtualenv tox tox-gh-actions poetry - name: Run tox targets for ${{ matrix.python-version }} run: python -m tox @@ -63,11 +62,12 @@ jobs: with: python-version: "3.8" - - name: Install pep517 and twine - run: python -m pip install pep517 twine + - name: Install poetry + run: python -m pip install poetry twine - name: Build package - run: python -m pep517.build --source --binary . + run: | + python -m poetry build - name: List result run: ls -l dist @@ -90,7 +90,7 @@ jobs: python-version: "3.8" - name: Install in dev mode - run: python -m pip install -e .[dev] + run: python -m pip install . - name: Import package run: python -c 'import cwt; print(cwt.__version__)' diff --git a/.github/workflows/python-publish.yml b/.github/workflows/python-publish.yml index 1a03a7b..3816494 100644 --- a/.github/workflows/python-publish.yml +++ b/.github/workflows/python-publish.yml @@ -13,7 +13,8 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - name: Checkout sources + uses: actions/checkout@v2 - name: Set up Python uses: actions/setup-python@v2 with: @@ -21,11 +22,11 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install setuptools wheel twine + pip install poetry - name: Build and publish env: - TWINE_USERNAME: ${{ secrets.PYPI_USERNAME }} - TWINE_PASSWORD: ${{ secrets.PYPI_PASSWORD }} + POETRY_USERNAME: ${{ secrets.PYPI_USERNAME }} + POETRY_PASSWORD: ${{ secrets.PYPI_PASSWORD }} run: | - python setup.py sdist bdist_wheel - twine upload dist/* + poetry build + poetry publish -u $POETRY_USERNAME -p $POETRY_PASSWORD diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index ac3e88c..7ff5e29 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -3,34 +3,30 @@ repos: rev: 21.12b0 hooks: - id: black - args: ["--target-version=py36"] + args: [--line-length, "128"] - repo: https://github.com/asottile/blacken-docs rev: v1.12.0 hooks: - id: blacken-docs - args: ["--target-version=py36"] - repo: https://github.com/PyCQA/flake8 rev: 4.0.1 hooks: - id: flake8 - language_version: python3.8 + args: [--ignore, "E203,E501,B006,W503"] + additional_dependencies: [flake8-bugbear] - repo: https://github.com/PyCQA/isort rev: 5.10.1 hooks: - id: isort + args: [--profile, "black"] - repo: https://github.com/pre-commit/pre-commit-hooks rev: v4.0.1 hooks: - - id: trailing-whitespace - - id: end-of-file-fixer - id: debug-statements - - - repo: https://github.com/mgedmin/check-manifest - rev: "0.47" - hooks: - - id: check-manifest - args: [--no-build-isolation] + - id: end-of-file-fixer + - id: fix-byte-order-marker + - id: trailing-whitespace diff --git a/.readthedocs.yml b/.readthedocs.yml index af9fedc..9515c3f 100644 --- a/.readthedocs.yml +++ b/.readthedocs.yml @@ -1,2 +1,9 @@ +version: 2 + python: - setup_py_install: true + version: "3.8" + install: + - method: pip + path: . + extra_requirements: + - docs diff --git a/CHANGES.rst b/CHANGES.rst index af38bb7..bcd6452 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -4,6 +4,10 @@ Changes Unreleased ---------- +- Migrate to poetry. `#191 `__ +- Change max line length to 128. `#191 `__ +- Fix updated flake8 warning. `#191 `__ + Version 1.4.2 ------------- diff --git a/MANIFEST.in b/MANIFEST.in deleted file mode 100644 index 43e5a11..0000000 --- a/MANIFEST.in +++ /dev/null @@ -1,12 +0,0 @@ -include .pre-commit-config.yaml -include CHANGES.rst -include CODE_OF_CONDUCT.md -include LICENSE -include tox.ini -include cwt/py.typed -graft docs -prune docs/_build -graft tests -global-exclude *.py[co] -exclude .readthedocs.yml -recursive-exclude * __pycache__ diff --git a/cwt/algs/asymmetric.py b/cwt/algs/asymmetric.py index 5407b9b..5e2ae96 100644 --- a/cwt/algs/asymmetric.py +++ b/cwt/algs/asymmetric.py @@ -33,12 +33,8 @@ def validate_certificate(self, ca_certs: List[bytes]) -> bool: ctx = ValidationContext(trust_roots=ca_certs) try: - validator = CertificateValidator( - self._cert, self._intermediates, validation_context=ctx - ) + validator = CertificateValidator(self._cert, self._intermediates, validation_context=ctx) validator.validate_usage(set(["digital_signature"]), extended_optional=True) except Exception as err: - raise VerifyError( - "Failed to validate the certificate bound to the key." - ) from err + raise VerifyError("Failed to validate the certificate bound to the key.") from err return True diff --git a/cwt/algs/ec2.py b/cwt/algs/ec2.py index aad973e..e14d7de 100644 --- a/cwt/algs/ec2.py +++ b/cwt/algs/ec2.py @@ -92,9 +92,7 @@ def __init__(self, params: Dict[int, Any]): if not (set(self._key_ops) & set([1, 2])): raise ValueError("Invalid key_ops for signing key.") if set(self._key_ops) & set([7, 8]): - raise ValueError( - "Signing key should not be used for key derivation." - ) + raise ValueError("Signing key should not be used for key derivation.") else: # public key for signing. if 2 not in self._key_ops or len(self._key_ops) != 1: @@ -114,14 +112,10 @@ def __init__(self, params: Dict[int, Any]): if not (set(self._key_ops) & set([7, 8])): raise ValueError("Invalid key_ops for key derivation.") if set(self._key_ops) & set([1, 2]): - raise ValueError( - "ECDHE key should not be used for signing." - ) + raise ValueError("ECDHE key should not be used for signing.") else: # public key for key derivation. - raise ValueError( - "Public key for ECDHE should not have key_ops." - ) + raise ValueError("Public key for ECDHE should not have key_ops.") else: if -2 not in params and -3 not in params: # private key for key derivation. @@ -134,9 +128,7 @@ def __init__(self, params: Dict[int, Any]): if set(self._key_ops) & set([1, 2]): # private key for signing. if set(self._key_ops) & set([7, 8]): - raise ValueError( - "EC2 Private key should not be used for both signing and key derivation." - ) + raise ValueError("EC2 Private key should not be used for both signing and key derivation.") if self._crv == 1: self._alg = -7 # ES256 elif self._crv == 2: @@ -198,9 +190,7 @@ def __init__(self, params: Dict[int, Any]): raise ValueError("d(-4) should be bytes(bstr).") self._d = params[-4] if len(self._d) != len(self._x): - raise ValueError( - f"d(-4) should be {len(self._x)} bytes for curve {self._crv}" - ) + raise ValueError(f"d(-4) should be {len(self._x)} bytes for curve {self._crv}") try: self._private_key = ec.EllipticCurvePrivateNumbers( int.from_bytes(self._d, byteorder="big"), public_numbers @@ -211,9 +201,7 @@ def __init__(self, params: Dict[int, Any]): return @staticmethod - def to_cose_key( - k: Union[EllipticCurvePrivateKey, EllipticCurvePublicKey] - ) -> Dict[int, Any]: + def to_cose_key(k: Union[EllipticCurvePrivateKey, EllipticCurvePublicKey]) -> Dict[int, Any]: key_len: int = 32 cose_key: Dict[int, Any] = {} @@ -234,15 +222,9 @@ def to_cose_key( cose_key[-2] = k.public_numbers().x.to_bytes(key_len, byteorder="big") cose_key[-3] = k.public_numbers().y.to_bytes(key_len, byteorder="big") return cose_key - cose_key[-2] = ( - k.public_key().public_numbers().x.to_bytes(key_len, byteorder="big") - ) - cose_key[-3] = ( - k.public_key().public_numbers().y.to_bytes(key_len, byteorder="big") - ) - cose_key[-4] = k.private_numbers().private_value.to_bytes( - key_len, byteorder="big" - ) + cose_key[-2] = k.public_key().public_numbers().x.to_bytes(key_len, byteorder="big") + cose_key[-3] = k.public_key().public_numbers().y.to_bytes(key_len, byteorder="big") + cose_key[-4] = k.private_numbers().private_value.to_bytes(key_len, byteorder="big") return cose_key @property @@ -275,9 +257,7 @@ def verify(self, msg: bytes, sig: bytes): try: if self._private_key: der_sig = self._os_to_der(self._private_key.curve.key_size, sig) - self._private_key.public_key().verify( - der_sig, msg, ec.ECDSA(self._hash_alg()) - ) + self._private_key.public_key().verify(der_sig, msg, ec.ECDSA(self._hash_alg())) else: der_sig = self._os_to_der(self._public_key.curve.key_size, sig) self._public_key.verify(der_sig, msg, ec.ECDSA(self._hash_alg())) @@ -309,11 +289,7 @@ def derive_key( self._validate_context(context) # Derive key. - self._key = ( - self._private_key - if self._private_key - else ec.generate_private_key(self._crv_obj) - ) + self._key = self._private_key if self._private_key else ec.generate_private_key(self._crv_obj) shared_key = self._key.exchange(ec.ECDH(), public_key.key) hkdf = HKDF( algorithm=self._hash_alg(), diff --git a/cwt/algs/okp.py b/cwt/algs/okp.py index a1a5d4e..5bd5ad5 100644 --- a/cwt/algs/okp.py +++ b/cwt/algs/okp.py @@ -79,9 +79,7 @@ def __init__(self, params: Dict[int, Any]): elif self._alg in [-26, -28]: self._hash_alg = hashes.SHA512 else: - raise ValueError( - f"Unsupported or unknown alg used with X25519/X448: {self._alg}." - ) + raise ValueError(f"Unsupported or unknown alg used with X25519/X448: {self._alg}.") # Validate alg and key_ops. if self._key_ops: @@ -99,9 +97,7 @@ def __init__(self, params: Dict[int, Any]): if not (set(self._key_ops) & set([1, 2])): raise ValueError("Invalid key_ops for signing key.") if set(self._key_ops) & set([7, 8]): - raise ValueError( - "Signing key should not be used for key derivation." - ) + raise ValueError("Signing key should not be used for key derivation.") else: # public key for signing. if 2 not in self._key_ops or len(self._key_ops) != 1: @@ -112,15 +108,11 @@ def __init__(self, params: Dict[int, Any]): if not (set(self._key_ops) & set([7, 8])): raise ValueError("Invalid key_ops for key derivation.") if set(self._key_ops) & set([1, 2]): - raise ValueError( - "Private key for ECDHE should not be used for signing." - ) + raise ValueError("Private key for ECDHE should not be used for signing.") else: # public key for key derivation. if self._key_ops: - raise ValueError( - "Public key for ECDHE should not have key_ops." - ) + raise ValueError("Public key for ECDHE should not have key_ops.") else: raise ValueError(f"Unsupported or unknown alg(3) for OKP: {self._alg}.") else: @@ -129,9 +121,7 @@ def __init__(self, params: Dict[int, Any]): if set(self._key_ops) & set([1, 2]): # private key for signing. if set(self._key_ops) & set([7, 8]): - raise ValueError( - "OKP private key should not be used for both signing and key derivation." - ) + raise ValueError("OKP private key should not be used for both signing and key derivation.") self._alg = -8 # EdDSA else: # public key. @@ -206,45 +196,29 @@ def to_cose_key( if isinstance(k, Ed25519PublicKey): cose_key[-2] = k.public_bytes(Encoding.Raw, PublicFormat.Raw) else: - cose_key[-2] = k.public_key().public_bytes( - Encoding.Raw, PublicFormat.Raw - ) - cose_key[-4] = k.private_bytes( - Encoding.Raw, PrivateFormat.Raw, NoEncryption() - ) + cose_key[-2] = k.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw) + cose_key[-4] = k.private_bytes(Encoding.Raw, PrivateFormat.Raw, NoEncryption()) elif isinstance(k, (Ed448PublicKey, Ed448PrivateKey)): cose_key[-1] = 7 # Ed448 if isinstance(k, Ed448PublicKey): cose_key[-2] = k.public_bytes(Encoding.Raw, PublicFormat.Raw) else: - cose_key[-2] = k.public_key().public_bytes( - Encoding.Raw, PublicFormat.Raw - ) - cose_key[-4] = k.private_bytes( - Encoding.Raw, PrivateFormat.Raw, NoEncryption() - ) + cose_key[-2] = k.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw) + cose_key[-4] = k.private_bytes(Encoding.Raw, PrivateFormat.Raw, NoEncryption()) elif isinstance(k, (X25519PublicKey, X25519PrivateKey)): cose_key[-1] = 4 # X25519 if isinstance(k, X25519PublicKey): cose_key[-2] = k.public_bytes(Encoding.Raw, PublicFormat.Raw) else: - cose_key[-2] = k.public_key().public_bytes( - Encoding.Raw, PublicFormat.Raw - ) - cose_key[-4] = k.private_bytes( - Encoding.Raw, PrivateFormat.Raw, NoEncryption() - ) + cose_key[-2] = k.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw) + cose_key[-4] = k.private_bytes(Encoding.Raw, PrivateFormat.Raw, NoEncryption()) elif isinstance(k, (X448PublicKey, X448PrivateKey)): cose_key[-1] = 5 # X448 if isinstance(k, X448PublicKey): cose_key[-2] = k.public_bytes(Encoding.Raw, PublicFormat.Raw) else: - cose_key[-2] = k.public_key().public_bytes( - Encoding.Raw, PublicFormat.Raw - ) - cose_key[-4] = k.private_bytes( - Encoding.Raw, PrivateFormat.Raw, NoEncryption() - ) + cose_key[-2] = k.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw) + cose_key[-4] = k.private_bytes(Encoding.Raw, PrivateFormat.Raw, NoEncryption()) else: raise ValueError("Unsupported or unknown key for OKP.") return cose_key @@ -304,9 +278,7 @@ def derive_key( raise ValueError("Public key cannot be used for key derivation.") if not public_key: raise ValueError("public_key should be set.") - if not isinstance(public_key.key, X25519PublicKey) and not isinstance( - public_key.key, X448PublicKey - ): + if not isinstance(public_key.key, X25519PublicKey) and not isinstance(public_key.key, X448PublicKey): raise ValueError("public_key should be x25519/x448 public key.") # if self._alg not in COSE_ALGORITHMS_CKDM_KEY_AGREEMENT.values(): # raise ValueError(f"Invalid alg for key derivation: {self._alg}.") @@ -321,11 +293,7 @@ def derive_key( if self._private_key: self._key = self._private_key else: - self._key = ( - X25519PrivateKey.generate() - if self._crv == 4 - else X448PrivateKey.generate() - ) + self._key = X25519PrivateKey.generate() if self._crv == 4 else X448PrivateKey.generate() shared_key = self._key.exchange(public_key.key) hkdf = HKDF( algorithm=self._hash_alg(), diff --git a/cwt/algs/rsa.py b/cwt/algs/rsa.py index 92fdc07..2c6db67 100644 --- a/cwt/algs/rsa.py +++ b/cwt/algs/rsa.py @@ -49,9 +49,7 @@ def __init__(self, params: Dict[int, Any]): else: raise ValueError(f"Unsupported or unknown alg(3) for RSA: {params[3]}.") if params[3] in [-37, -38, -39]: - self._padding = padding.PSS( - mgf=padding.MGF1(self._hash()), salt_length=padding.PSS.MAX_LENGTH - ) + self._padding = padding.PSS(mgf=padding.MGF1(self._hash()), salt_length=padding.PSS.MAX_LENGTH) else: self._padding = padding.PKCS1v15() @@ -60,28 +58,16 @@ def __init__(self, params: Dict[int, Any]): if not self._key_ops: self._key_ops = RSAKey._ACCEPTABLE_PUBLIC_KEY_OPS else: - prohibited = [ - ops - for ops in self._key_ops - if ops not in RSAKey._ACCEPTABLE_PUBLIC_KEY_OPS - ] + prohibited = [ops for ops in self._key_ops if ops not in RSAKey._ACCEPTABLE_PUBLIC_KEY_OPS] if prohibited: - raise ValueError( - f"Unknown or not permissible key_ops(4) for RSAKey: {prohibited[0]}." - ) + raise ValueError(f"Unknown or not permissible key_ops(4) for RSAKey: {prohibited[0]}.") else: if not self._key_ops: self._key_ops = RSAKey._ACCEPTABLE_PRIVATE_KEY_OPS else: - prohibited = [ - ops - for ops in self._key_ops - if ops not in RSAKey._ACCEPTABLE_PRIVATE_KEY_OPS - ] + prohibited = [ops for ops in self._key_ops if ops not in RSAKey._ACCEPTABLE_PRIVATE_KEY_OPS] if prohibited: - raise ValueError( - f"Unknown or not permissible key_ops(4) for RSAKey: {prohibited[0]}." - ) + raise ValueError(f"Unknown or not permissible key_ops(4) for RSAKey: {prohibited[0]}.") # Validate RSA specific parameters. if -1 not in params or not isinstance(params[-1], bytes): @@ -97,9 +83,7 @@ def __init__(self, params: Dict[int, Any]): if -3 not in params: # the RSA private exponent d. private_props = [p for p in params.keys() if p in [-4, -5, -6, -7, -8]] if private_props: - raise ValueError( - f"RSA public key should not have private parameter: {private_props[0]}." - ) + raise ValueError(f"RSA public key should not have private parameter: {private_props[0]}.") self._key = public_numbers.public_key() return diff --git a/cwt/algs/symmetric.py b/cwt/algs/symmetric.py index a04bb15..9691d5a 100644 --- a/cwt/algs/symmetric.py +++ b/cwt/algs/symmetric.py @@ -60,15 +60,9 @@ def __init__(self, params: Dict[int, Any]): if not self._key_ops: self._key_ops = MACAuthenticationKey._ACCEPTABLE_KEY_OPS return - not_acceptable = [ - ops - for ops in self._key_ops - if ops not in MACAuthenticationKey._ACCEPTABLE_KEY_OPS - ] + not_acceptable = [ops for ops in self._key_ops if ops not in MACAuthenticationKey._ACCEPTABLE_KEY_OPS] if not_acceptable: - raise ValueError( - f"Unknown or not permissible key_ops(4) for MACAuthenticationKey: {not_acceptable[0]}." - ) + raise ValueError(f"Unknown or not permissible key_ops(4) for MACAuthenticationKey: {not_acceptable[0]}.") class ContentEncryptionKey(SymmetricKey): @@ -86,15 +80,9 @@ def __init__(self, params: Dict[int, Any]): if not self._key_ops: self._key_ops = ContentEncryptionKey._ACCEPTABLE_KEY_OPS return - not_acceptable = [ - ops - for ops in self._key_ops - if ops not in ContentEncryptionKey._ACCEPTABLE_KEY_OPS - ] + not_acceptable = [ops for ops in self._key_ops if ops not in ContentEncryptionKey._ACCEPTABLE_KEY_OPS] if not_acceptable: - raise ValueError( - f"Unknown or not permissible key_ops(4) for ContentEncryptionKey: {not_acceptable[0]}." - ) + raise ValueError(f"Unknown or not permissible key_ops(4) for ContentEncryptionKey: {not_acceptable[0]}.") class HMACKey(MACAuthenticationKey): @@ -104,17 +92,15 @@ def __init__(self, params: Dict[int, Any]): """ """ super().__init__(params) - self._hash_alg = None + self._hash_alg = hashlib.sha256 self._trunc = 0 # Validate alg. if self._alg == 4: # HMAC 256/64 - self._hash_alg = hashlib.sha256 self._trunc = 8 if not self._key: self._key = token_bytes(_CWT_DEFAULT_KEY_SIZE_HMAC256) elif self._alg == 5: # HMAC 256/256 - self._hash_alg = hashlib.sha256 self._trunc = 32 if not self._key: self._key = token_bytes(_CWT_DEFAULT_KEY_SIZE_HMAC256) @@ -160,72 +146,56 @@ def __init__(self, params: Dict[int, Any]): if not self._key: self._key = AESCCM.generate_key(bit_length=128) if len(self._key) != 16: - raise ValueError( - "The length of AES-CCM-16-64-128 key should be 16 bytes." - ) + raise ValueError("The length of AES-CCM-16-64-128 key should be 16 bytes.") self._cipher = AESCCM(self._key, tag_length=8) self._nonce_len = 13 elif self._alg == 11: # AES-CCM-16-64-256 if not self._key: self._key = AESCCM.generate_key(bit_length=256) if len(self._key) != 32: - raise ValueError( - "The length of AES-CCM-16-64-256 key should be 32 bytes." - ) + raise ValueError("The length of AES-CCM-16-64-256 key should be 32 bytes.") self._cipher = AESCCM(self._key, tag_length=8) self._nonce_len = 13 elif self._alg == 12: # AES-CCM-64-64-128 if not self._key: self._key = AESCCM.generate_key(bit_length=128) if len(self._key) != 16: - raise ValueError( - "The length of AES-CCM-64-64-128 key should be 16 bytes." - ) + raise ValueError("The length of AES-CCM-64-64-128 key should be 16 bytes.") self._cipher = AESCCM(self._key, tag_length=8) self._nonce_len = 7 elif self._alg == 13: # AES-CCM-64-64-256 if not self._key: self._key = AESCCM.generate_key(bit_length=256) if len(self._key) != 32: - raise ValueError( - "The length of AES-CCM-64-64-256 key should be 32 bytes." - ) + raise ValueError("The length of AES-CCM-64-64-256 key should be 32 bytes.") self._cipher = AESCCM(self._key, tag_length=8) self._nonce_len = 7 elif self._alg == 30: # AES-CCM-16-128-128 if not self._key: self._key = AESCCM.generate_key(bit_length=128) if len(self._key) != 16: - raise ValueError( - "The length of AES-CCM-16-128-128 key should be 16 bytes." - ) + raise ValueError("The length of AES-CCM-16-128-128 key should be 16 bytes.") self._cipher = AESCCM(self._key) self._nonce_len = 13 elif self._alg == 31: # AES-CCM-16-128-256 if not self._key: self._key = AESCCM.generate_key(bit_length=256) if len(self._key) != 32: - raise ValueError( - "The length of AES-CCM-16-128-256 key should be 32 bytes." - ) + raise ValueError("The length of AES-CCM-16-128-256 key should be 32 bytes.") self._cipher = AESCCM(self._key) self._nonce_len = 13 elif self._alg == 32: # AES-CCM-64-128-128 if not self._key: self._key = AESCCM.generate_key(bit_length=128) if len(self._key) != 16: - raise ValueError( - "The length of AES-CCM-64-128-128 key should be 16 bytes." - ) + raise ValueError("The length of AES-CCM-64-128-128 key should be 16 bytes.") self._cipher = AESCCM(self._key) self._nonce_len = 7 elif self._alg == 33: # AES-CCM-64-128-256 if not self._key: self._key = AESCCM.generate_key(bit_length=256) if len(self._key) != 32: - raise ValueError( - "The length of AES-CCM-64-128-256 key should be 32 bytes." - ) + raise ValueError("The length of AES-CCM-64-128-256 key should be 32 bytes.") self._cipher = AESCCM(self._key) self._nonce_len = 7 else: @@ -237,9 +207,7 @@ def generate_nonce(self): def encrypt(self, msg: bytes, nonce: bytes, aad: Optional[bytes] = None) -> bytes: """ """ if len(nonce) != self._nonce_len: - raise ValueError( - "The length of nonce should be %d bytes." % self._nonce_len - ) + raise ValueError("The length of nonce should be %d bytes." % self._nonce_len) try: return self._cipher.encrypt(nonce, msg, aad) except Exception as err: @@ -248,9 +216,7 @@ def encrypt(self, msg: bytes, nonce: bytes, aad: Optional[bytes] = None) -> byte def decrypt(self, msg: bytes, nonce: bytes, aad: Optional[bytes] = None) -> bytes: """ """ if len(nonce) != self._nonce_len: - raise ValueError( - "The length of nonce should be %d bytes." % self._nonce_len - ) + raise ValueError("The length of nonce should be %d bytes." % self._nonce_len) try: return self._cipher.decrypt(nonce, msg, aad) except Exception as err: @@ -312,9 +278,7 @@ def __init__(self, params: Dict[int, Any]): # Validate alg. if self._alg != 24: # ChaCha20/Poly1305 - raise ValueError( - f"Unsupported or unknown alg(3) for ChaCha20: {self._alg}." - ) + raise ValueError(f"Unsupported or unknown alg(3) for ChaCha20: {self._alg}.") if not self._key: self._key = ChaCha20Poly1305.generate_key() @@ -371,13 +335,9 @@ def __init__(self, params: Dict[int, Any]): if not self._key_ops: self._key_ops = AESKeyWrap._ACCEPTABLE_KEY_OPS return - not_acceptable = [ - ops for ops in self._key_ops if ops not in AESKeyWrap._ACCEPTABLE_KEY_OPS - ] + not_acceptable = [ops for ops in self._key_ops if ops not in AESKeyWrap._ACCEPTABLE_KEY_OPS] if not_acceptable: - raise ValueError( - f"Unknown or not permissible key_ops(4) for AES key wrap: {not_acceptable[0]}." - ) + raise ValueError(f"Unknown or not permissible key_ops(4) for AES key wrap: {not_acceptable[0]}.") def wrap_key(self, key_to_wrap: bytes) -> bytes: try: diff --git a/cwt/cbor_processor.py b/cwt/cbor_processor.py index 445cc6d..1025586 100644 --- a/cwt/cbor_processor.py +++ b/cwt/cbor_processor.py @@ -34,7 +34,5 @@ def _validate_context(self, context: List[Any]): if not isinstance(context[2], list) or len(context[2]) != 3: raise ValueError("PartyVInfo should be list(size=3).") # SuppPubInfo - if not isinstance(context[3], list) or ( - len(context[3]) != 2 and len(context[3]) != 3 - ): + if not isinstance(context[3], list) or (len(context[3]) != 2 and len(context[3]) != 3): raise ValueError("SuppPubInfo should be list(size=2 or 3).") diff --git a/cwt/claims.py b/cwt/claims.py index ad3ecca..df219c2 100644 --- a/cwt/claims.py +++ b/cwt/claims.py @@ -34,17 +34,11 @@ def __init__( for c in claims[3]: if not isinstance(c, str): raise ValueError("aud(3) should be str or list[str].") - if 4 in claims and not ( - isinstance(claims[4], int) or isinstance(claims[4], float) - ): + if 4 in claims and not (isinstance(claims[4], int) or isinstance(claims[4], float)): raise ValueError("exp(4) should be int or float.") - if 5 in claims and not ( - isinstance(claims[5], int) or isinstance(claims[5], float) - ): + if 5 in claims and not (isinstance(claims[5], int) or isinstance(claims[5], float)): raise ValueError("nbf(5) should be int or float.") - if 6 in claims and not ( - isinstance(claims[6], int) or isinstance(claims[6], float) - ): + if 6 in claims and not (isinstance(claims[6], int) or isinstance(claims[6], float)): raise ValueError("iat(6) should be int or float.") if 7 in claims and not isinstance(claims[7], bytes): raise ValueError("cti(7) should be bytes.") @@ -61,9 +55,7 @@ def __init__( if not isinstance(claims[8][3], bytes): raise ValueError("kid in cnf(8) should be bytes.") else: - raise ValueError( - "cnf(8) should include COSE_Key, Encrypted_COSE_Key, or kid." - ) + raise ValueError("cnf(8) should include COSE_Key, Encrypted_COSE_Key, or kid.") self._claims = claims self._claim_names = claim_names return diff --git a/cwt/cose.py b/cwt/cose.py index 14420a3..7940136 100644 --- a/cwt/cose.py +++ b/cwt/cose.py @@ -142,9 +142,7 @@ def encode_and_mac( ValueError: Invalid arguments. EncodeError: Failed to encode data. """ - p: Union[Dict[int, Any], bytes] = ( - to_cose_header(protected) if not isinstance(protected, bytes) else protected - ) + p: Union[Dict[int, Any], bytes] = to_cose_header(protected) if not isinstance(protected, bytes) else protected u = to_cose_header(unprotected) ctx = "MAC0" if not recipients else "MAC" @@ -176,9 +174,7 @@ def encode_and_mac( if self._kid_auto_inclusion and key.kid: u[4] = key.kid else: - raise NotImplementedError( - "Algorithms other than direct are not supported for recipients." - ) + raise NotImplementedError("Algorithms other than direct are not supported for recipients.") if isinstance(p, bytes): b_protected = p @@ -229,9 +225,7 @@ def encode_and_sign( ValueError: Invalid arguments. EncodeError: Failed to encode data. """ - p: Union[Dict[int, Any], bytes] = ( - to_cose_header(protected) if not isinstance(protected, bytes) else protected - ) + p: Union[Dict[int, Any], bytes] = to_cose_header(protected) if not isinstance(protected, bytes) else protected u = to_cose_header(unprotected) ctx = "Signature" if signers else "Signature1" @@ -301,9 +295,7 @@ def encode_and_encrypt( ValueError: Invalid arguments. EncodeError: Failed to encode data. """ - p: Union[Dict[int, Any], bytes] = ( - to_cose_header(protected) if not isinstance(protected, bytes) else protected - ) + p: Union[Dict[int, Any], bytes] = to_cose_header(protected) if not isinstance(protected, bytes) else protected u = to_cose_header(unprotected) ctx = "Encrypt0" if not recipients else "Encrypt" @@ -312,9 +304,7 @@ def encode_and_encrypt( try: nonce = key.generate_nonce() except NotImplementedError: - raise ValueError( - "Nonce generation is not supported for the key. Set a nonce explicitly." - ) + raise ValueError("Nonce generation is not supported for the key. Set a nonce explicitly.") # Encrypt0 if not recipients: @@ -344,9 +334,7 @@ def encode_and_encrypt( u[4] = key.kid u[5] = nonce else: - raise NotImplementedError( - "Algorithms other than direct are not supported for recipients." - ) + raise NotImplementedError("Algorithms other than direct are not supported for recipients.") if isinstance(p, bytes): b_protected = p else: @@ -437,7 +425,7 @@ def decode( aad = self._dumps(["Encrypt0", data.value[0], external_aad]) nonce = unprotected.get(5, None) if kid: - for i, k in enumerate(keys): + for _, k in enumerate(keys): if k.kid != kid: continue try: @@ -445,7 +433,7 @@ def decode( except Exception as e: err = e raise err - for i, k in enumerate(keys): + for _, k in enumerate(keys): try: return k.decrypt(data.value[2], nonce, aad) except Exception as e: @@ -465,7 +453,7 @@ def decode( kid = self._get_kid(protected, unprotected) msg = self._dumps(["MAC0", data.value[0], external_aad, data.value[2]]) if kid: - for i, k in enumerate(keys): + for _, k in enumerate(keys): if k.kid != kid: continue try: @@ -474,7 +462,7 @@ def decode( except Exception as e: err = e raise err - for i, k in enumerate(keys): + for _, k in enumerate(keys): try: k.verify(msg, data.value[3]) return data.value[2] @@ -484,9 +472,7 @@ def decode( # MAC if data.tag == 97: - to_be_maced = self._dumps( - ["MAC", data.value[0], external_aad, data.value[2]] - ) + to_be_maced = self._dumps(["MAC", data.value[0], external_aad, data.value[2]]) rs = Recipients.from_list(data.value[4], self._verify_kid) mac_auth_key = rs.extract(keys, context, alg) mac_auth_key.verify(to_be_maced, data.value[3]) @@ -495,11 +481,9 @@ def decode( # Signature1 if data.tag == 18: kid = self._get_kid(protected, unprotected) - to_be_signed = self._dumps( - ["Signature1", data.value[0], external_aad, data.value[2]] - ) + to_be_signed = self._dumps(["Signature1", data.value[0], external_aad, data.value[2]]) if kid: - for i, k in enumerate(keys): + for _, k in enumerate(keys): if k.kid != kid: continue try: @@ -510,7 +494,7 @@ def decode( except Exception as e: err = e raise err - for i, k in enumerate(keys): + for _, k in enumerate(keys): try: if self._ca_certs: k.validate_certificate(self._ca_certs) @@ -532,12 +516,10 @@ def decode( protected = self._loads(sig[0]) if sig[0] else b"" unprotected = sig[1] if not isinstance(unprotected, dict): - raise ValueError( - "unprotected header in signature structure should be dict." - ) + raise ValueError("unprotected header in signature structure should be dict.") kid = self._get_kid(protected, unprotected) if kid: - for i, k in enumerate(keys): + for _, k in enumerate(keys): if k.kid != kid: continue try: @@ -555,7 +537,7 @@ def decode( except Exception as e: err = e continue - for i, k in enumerate(keys): + for _, k in enumerate(keys): try: to_be_signed = self._dumps( [ @@ -572,9 +554,7 @@ def decode( err = e raise err - def _filter_by_key_ops( - self, keys: List[COSEKeyInterface], op: int - ) -> List[COSEKeyInterface]: + def _filter_by_key_ops(self, keys: List[COSEKeyInterface], op: int) -> List[COSEKeyInterface]: res: List[COSEKeyInterface] = [] for k in keys: if op in k.key_ops: diff --git a/cwt/cose_key.py b/cwt/cose_key.py index 07a09d9..e4cc7af 100644 --- a/cwt/cose_key.py +++ b/cwt/cose_key.py @@ -61,9 +61,7 @@ def new(params: Dict[int, Any]) -> COSEKeyInterface: if params[1] == 3: return RSAKey(params) if params[1] == 4: - if 3 not in params or ( - not isinstance(params[3], int) and not isinstance(params[3], str) - ): + if 3 not in params or (not isinstance(params[3], int) and not isinstance(params[3], str)): raise ValueError("alg(3) should be int or str(tstr).") if params[3] in [1, 2, 3]: return AESGCMKey(params) @@ -251,9 +249,7 @@ def from_pem( params[-7] = uint_to_bytes(priv_nums.dmq1) # dQ params[-8] = uint_to_bytes(priv_nums.iqmp) # qInv - elif isinstance(k, EllipticCurvePrivateKey) or isinstance( - k, EllipticCurvePublicKey - ): + elif isinstance(k, EllipticCurvePrivateKey) or isinstance(k, EllipticCurvePublicKey): if alg: if isinstance(alg, str): if alg in COSE_ALGORITHMS_CKDM_KEY_AGREEMENT: diff --git a/cwt/cose_key_interface.py b/cwt/cose_key_interface.py index deda82d..6517861 100644 --- a/cwt/cose_key_interface.py +++ b/cwt/cose_key_interface.py @@ -31,9 +31,7 @@ def __init__(self, params: Dict[int, Any]): raise ValueError(f"Unknown kty: {params[1]}") if isinstance(params[1], str) and params[1] not in COSE_KEY_TYPES: raise ValueError(f"Unknown kty: {params[1]}") - self._kty: int = ( - params[1] if isinstance(params[1], int) else COSE_KEY_TYPES[params[1]] - ) + self._kty: int = params[1] if isinstance(params[1], int) else COSE_KEY_TYPES[params[1]] # kid if 2 in params and not isinstance(params[2], bytes): @@ -45,16 +43,9 @@ def __init__(self, params: Dict[int, Any]): if 3 in params: if not isinstance(params[3], int) and not isinstance(params[3], str): raise ValueError("alg(3) should be int or str(tstr).") - if ( - isinstance(params[3], str) - and params[3] not in COSE_NAMED_ALGORITHMS_SUPPORTED - ): + if isinstance(params[3], str) and params[3] not in COSE_NAMED_ALGORITHMS_SUPPORTED: raise ValueError(f"Unsupported or unknown alg(3): {params[3]}.") - self._alg = ( - params[3] - if isinstance(params[3], int) - else COSE_NAMED_ALGORITHMS_SUPPORTED[params[3]] - ) + self._alg = params[3] if isinstance(params[3], int) else COSE_NAMED_ALGORITHMS_SUPPORTED[params[3]] # key_ops if 4 in params and not isinstance(params[4], list): diff --git a/cwt/cwt.py b/cwt/cwt.py index b9fad5d..7cc29b1 100644 --- a/cwt/cwt.py +++ b/cwt/cwt.py @@ -222,9 +222,7 @@ def encode_and_mac( claims = claims.to_dict() self._set_default_value(claims) b_claims = self._dumps(claims) - res = self._cose.encode_and_mac( - b_claims, key, {}, {}, recipients, out="cbor2/CBORTag" - ) + res = self._cose.encode_and_mac(b_claims, key, {}, {}, recipients, out="cbor2/CBORTag") if tagged: return self._dumps(CBORTag(CWT.CBOR_TAG, res)) return self._dumps(res) @@ -261,9 +259,7 @@ def encode_and_sign( claims = claims.to_dict() self._set_default_value(claims) b_claims = self._dumps(claims) - res = self._cose.encode_and_sign( - b_claims, key, {}, {}, signers=signers, out="cbor2/CBORTag" - ) + res = self._cose.encode_and_sign(b_claims, key, {}, {}, signers=signers, out="cbor2/CBORTag") if tagged: return self._dumps(CBORTag(CWT.CBOR_TAG, res)) return self._dumps(res) diff --git a/cwt/encrypted_cose_key.py b/cwt/encrypted_cose_key.py index a0853c1..f53ba8b 100644 --- a/cwt/encrypted_cose_key.py +++ b/cwt/encrypted_cose_key.py @@ -38,16 +38,12 @@ def from_cose_key( EncodeError: Failed to encrypt the COSE key. """ protected: Dict[int, Any] = {1: encryption_key.alg} - unprotected: Dict[int, Any] = ( - {4: encryption_key.kid} if encryption_key.kid else {} - ) + unprotected: Dict[int, Any] = {4: encryption_key.kid} if encryption_key.kid else {} if not nonce: try: nonce = encryption_key.generate_nonce() except NotImplementedError: - raise ValueError( - "Nonce generation is not supported for the key. Set a nonce explicitly." - ) + raise ValueError("Nonce generation is not supported for the key. Set a nonce explicitly.") unprotected[5] = nonce b_payload = cbor2.dumps(key.to_dict()) res: CBORTag = COSE().encode_and_encrypt( @@ -61,9 +57,7 @@ def from_cose_key( return res.value @staticmethod - def to_cose_key( - key: List[Any], encryption_key: COSEKeyInterface - ) -> COSEKeyInterface: + def to_cose_key(key: List[Any], encryption_key: COSEKeyInterface) -> COSEKeyInterface: """ Returns an decrypted COSE key. diff --git a/cwt/recipient.py b/cwt/recipient.py index c38e153..c7107be 100644 --- a/cwt/recipient.py +++ b/cwt/recipient.py @@ -157,9 +157,7 @@ def from_list(cls, recipient: List[Any]) -> RecipientInterface: ValueError: Invalid arguments. DecodeError: Failed to decode the key data. """ - if not isinstance(recipient, list) or ( - len(recipient) != 3 and len(recipient) != 4 - ): + if not isinstance(recipient, list) or (len(recipient) != 3 and len(recipient) != 4): raise ValueError("Invalid recipient format.") if not isinstance(recipient[0], bytes): raise ValueError("protected header should be bytes.") diff --git a/cwt/recipient_algs/ecdh_direct_hkdf.py b/cwt/recipient_algs/ecdh_direct_hkdf.py index 60825d3..086df1c 100644 --- a/cwt/recipient_algs/ecdh_direct_hkdf.py +++ b/cwt/recipient_algs/ecdh_direct_hkdf.py @@ -98,9 +98,7 @@ def apply( self._unprotected[-25] = self._applied_ctx[2][1] # Derive key. - derived_key = self._sender_key.derive_key( - self._applied_ctx, public_key=recipient_key - ) + derived_key = self._sender_key.derive_key(self._applied_ctx, public_key=recipient_key) if self._alg in [-25, -26]: # ECDH-ES self._unprotected[-1] = self._to_cose_key(self._sender_key.key.public_key()) diff --git a/cwt/recipient_interface.py b/cwt/recipient_interface.py index 186d25e..9f635b8 100644 --- a/cwt/recipient_interface.py +++ b/cwt/recipient_interface.py @@ -212,9 +212,7 @@ def extract( """ raise NotImplementedError - def _to_cose_key( - self, k: Union[EllipticCurvePublicKey, X25519PublicKey, X448PublicKey] - ) -> Dict[int, Any]: + def _to_cose_key(self, k: Union[EllipticCurvePublicKey, X25519PublicKey, X448PublicKey]) -> Dict[int, Any]: if isinstance(k, EllipticCurvePublicKey): return EC2Key.to_cose_key(k) return OKPKey.to_cose_key(k) diff --git a/cwt/signer.py b/cwt/signer.py index e1265dc..1965dda 100644 --- a/cwt/signer.py +++ b/cwt/signer.py @@ -79,9 +79,7 @@ def new( ValueError: Invalid arguments. """ p: Union[Dict[int, Any], bytes] = ( - to_cose_header(protected, algs=COSE_ALGORITHMS_SIGNATURE) - if isinstance(protected, dict) - else protected + to_cose_header(protected, algs=COSE_ALGORITHMS_SIGNATURE) if isinstance(protected, dict) else protected ) u = to_cose_header(unprotected, algs=COSE_ALGORITHMS_SIGNATURE) return cls(cose_key, p, u, signature) @@ -110,9 +108,7 @@ def from_jwk(cls, data: Union[str, bytes, Dict[str, Any]]): # alg if cose_key.alg not in COSE_ALGORITHMS_SIGNATURE.values(): - raise ValueError( - f"Unsupported or unknown alg for signature: {cose_key.alg}." - ) + raise ValueError(f"Unsupported or unknown alg for signature: {cose_key.alg}.") protected[1] = cose_key.alg # kid diff --git a/cwt/utils.py b/cwt/utils.py index f1b14ae..940cd3f 100644 --- a/cwt/utils.py +++ b/cwt/utils.py @@ -29,7 +29,7 @@ def i2osp(x: int, x_len: int) -> bytes: while x: digits.append(int(x % 256)) x //= 256 - for i in range(x_len - len(digits)): + for _ in range(x_len - len(digits)): digits.append(0) return bytes.fromhex("".join("%.2x" % x for x in digits[::-1])) @@ -120,13 +120,8 @@ def to_cis(context: Dict[str, Any], recipient_alg: Optional[int] = None) -> List if "alg" not in context: raise ValueError("alg not found.") # if context["alg"] not in COSE_NAMED_ALGORITHMS_SUPPORTED: - if ( - context["alg"] not in COSE_ALGORITHMS_CEK - and context["alg"] not in COSE_ALGORITHMS_MAC - ): - raise ValueError( - f'Unsupported or unknown alg for context information: {context["alg"]}.' - ) + if context["alg"] not in COSE_ALGORITHMS_CEK and context["alg"] not in COSE_ALGORITHMS_MAC: + raise ValueError(f'Unsupported or unknown alg for context information: {context["alg"]}.') alg = COSE_NAMED_ALGORITHMS_SUPPORTED[context["alg"]] res.append(alg) @@ -165,9 +160,7 @@ def to_cis(context: Dict[str, Any], recipient_alg: Optional[int] = None) -> List return res -def to_cose_header( - data: Optional[dict] = None, algs: Dict[str, int] = {} -) -> Dict[int, Any]: +def to_cose_header(data: Optional[dict] = None, algs: Dict[str, int] = {}) -> Dict[int, Any]: if data is None: return {} res: Dict[int, Any] = {} diff --git a/pyproject.toml b/pyproject.toml index 098c834..d86b45e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,49 @@ [build-system] -requires = ["setuptools", "wheel"] -build-backend = "setuptools.build_meta:__legacy__" +requires = ["poetry-core>=1.0.0"] +build-backend = "poetry.core.masonry.api" + +[tool.poetry] +name = "cwt" +version = "1.4.2" +description = "A Python implementation of CWT/COSE." +authors = ["Ajitomi Daisuke "] +readme = "README.md" +repository = "https://github.com/dajiaji/cwt" + +include = [ + ".pre-commit-config.yaml", + "CHANGES.rst", + "CODE_OF_CONDUCT.md", + "LICENSE", + "tox.ini", + "cwt/py.typed", +] + +[tool.poetry.dependencies] +python = "^3.6.1" +asn1crypto = "^1.4.0" +cbor2 = "^5.4.2" +certvalidator = "^0.11.1" +cryptography = "^36.0.0" +Sphinx = {version = "^4.3.1", extras = ["docs"]} +sphinx-autodoc-typehints = {version = "^1.12.0", extras = ["docs"]} +sphinx-rtd-theme = {version = "^1.0.0", extras = ["docs"]} + +[tool.poetry.extras] +docs = [ + "Sphinx", + "sphinx-rtd-theme", + "sphinx-autodoc-typehints", +] + +[tool.poetry.dev-dependencies] +pytest = "^5.2" +pytest-cov = "^3.0.0" +coverage = {extras = ["toml"], version = "^6.2"} +tox = "^3.24.4" +pre-commit = "^2.15.0" +mypy = "^0.910" +Sphinx = "^4.3.1" [tool.coverage.run] parallel = true @@ -13,8 +56,8 @@ source = ["cwt", ".tox/*/site-packages"] [tool.coverage.report] show_missing = true +[tool.flake8] +ignore = "E203,E501," -[tool.isort] -profile = "black" -atomic = true -combine_as_imports = true +[tool.mypy] +ignore_missing_imports = true diff --git a/samples/eudcc/verifier.py b/samples/eudcc/verifier.py index ca4b4de..2b0a5a6 100644 --- a/samples/eudcc/verifier.py +++ b/samples/eudcc/verifier.py @@ -26,17 +26,11 @@ def refresh_trustlist(self): headers = None # Get new DSCs - x_resume_token = ( - self._trustlist[len(self._trustlist) - 1]["x_resume_token"] - if self._trustlist - else "" - ) + x_resume_token = self._trustlist[len(self._trustlist) - 1]["x_resume_token"] if self._trustlist else "" while status == 200: if x_resume_token: headers = {"X-RESUME-TOKEN": x_resume_token} - r = requests.get( - self._base_url + "/signercertificateUpdate", headers=headers - ) + r = requests.get(self._base_url + "/signercertificateUpdate", headers=headers) status = r.status_code if status == 204: break @@ -66,9 +60,7 @@ def refresh_trustlist(self): # Update trustlist store. with open(self._trustlist_store_path, "w") as f: - json.dump( - [v for v in self._trustlist if v["x_kid"] in active_kids], f, indent=4 - ) + json.dump([v for v in self._trustlist if v["x_kid"] in active_kids], f, indent=4) return def verify_and_decode(self, eudcc: bytes) -> bytes: diff --git a/setup.cfg b/setup.cfg deleted file mode 100644 index 1ce2afb..0000000 --- a/setup.cfg +++ /dev/null @@ -1,69 +0,0 @@ -[metadata] -name = cwt -version = attr: cwt.__version__ -author = AJITOMI Daisuke -author_email = ajitomi@gmail.com -description = A Python implementation of CBOR Web Token (CWT) and CBOR Object Signing and Encryption (COSE). -long_description = file: README.md -long_description_content_type = text/markdown -license = MIT -keywords = cbor cwt cose security signature encryption token -url = https://github.com/dajiaji/python-cwt -classifiers = - Development Status :: 5 - Production/Stable - Intended Audience :: Developers - Natural Language :: English - License :: OSI Approved :: MIT License - Topic :: Utilities - Programming Language :: Python - Programming Language :: Python :: 3 - Programming Language :: Python :: 3 :: Only - Programming Language :: Python :: 3.6 - Programming Language :: Python :: 3.7 - Programming Language :: Python :: 3.8 - Programming Language :: Python :: 3.9 - Programming Language :: Python :: 3.10 - -[options] -zip_safe = false -include_package_data = true -python_requires = >=3.6 -packages = find: -install_requires = - asn1crypto>=1.4.0 - cbor2>=5.0.0 - certvalidator>=0.11.1 - cryptography>=3.3.1 - -[options.package_data] -* = py.typed - -[options.extras_require] -docs = - sphinx - sphinx-rtd-theme - sphinx-autodoc-typehints >= 1.2.0 -tests = - pytest>=6.0.0 - pytest-cov>=2.0 - coverage[toml]==5.0.4 -dev = - sphinx - sphinx-rtd-theme - cryptography>=3.3.1 - pytest>=6.0.0,<7.0.0 - coverage[toml]==5.0.4 - mypy - -[options.packages.find] -exclude = - tests - tests.* - -[flake8] -extend-ignore = E203, E501 - -[mypy] -python_version = 3.6 -ignore_missing_imports = true -warn_unused_ignores = true diff --git a/setup.py b/setup.py deleted file mode 100644 index 6068493..0000000 --- a/setup.py +++ /dev/null @@ -1,3 +0,0 @@ -from setuptools import setup - -setup() diff --git a/tests/test_algs_aes_key_wrap.py b/tests/test_algs_aes_key_wrap.py index 9654042..34b2d0a 100644 --- a/tests/test_algs_aes_key_wrap.py +++ b/tests/test_algs_aes_key_wrap.py @@ -37,9 +37,7 @@ def test_aes_key_wrap_constructor_with_invalid_key_ops(self): with pytest.raises(ValueError) as err: AESKeyWrap({1: 4, 3: -3, 4: [1, 2]}) pytest.fail("AESKeyWrap() should fail.") - assert "Unknown or not permissible key_ops(4) for AES key wrap: 1." in str( - err.value - ) + assert "Unknown or not permissible key_ops(4) for AES key wrap: 1." in str(err.value) def test_aes_key_wrap_unwrap_key_with_invalid_alg(self): key = AESKeyWrap({1: 4, 3: -3}) diff --git a/tests/test_algs_ec2.py b/tests/test_algs_ec2.py index 64f7f55..020ac92 100644 --- a/tests/test_algs_ec2.py +++ b/tests/test_algs_ec2.py @@ -57,10 +57,7 @@ def test_ec2_key_constructor_with_es256_key(self): == b'\xe9\x16\x0c\xa96\x8d\xfa\xbc\xd5\xda"ua\xec\xf7\x96\r\x15\xf7_\xf3rb{\xb1\xde;\x99\x88\xafNh' ) public_key_obj = public_key.to_dict() - assert ( - public_key_obj[-2] - == b"\xa7\xddc*\xff\xc2?\x8b\xf8\x9c:\xad\xccDF\x9cZ \x04P\xef\x99\x0c=\xe6 w1\x08&\xba\xd9" - ) + assert public_key_obj[-2] == b"\xa7\xddc*\xff\xc2?\x8b\xf8\x9c:\xad\xccDF\x9cZ \x04P\xef\x99\x0c=\xe6 w1\x08&\xba\xd9" try: sig = private_key.sign(b"Hello world!") public_key.verify(b"Hello world!", sig) @@ -755,10 +752,7 @@ def test_ec2_key_derive_key_with_invalid_alg(self, invalid_alg): with pytest.raises(ValueError) as err: private_key.derive_key({"alg": invalid_alg}, public_key=pub_key) pytest.fail("derive_key() should fail.") - assert ( - f"Unsupported or unknown alg for context information: {invalid_alg}." - in str(err.value) - ) + assert f"Unsupported or unknown alg for context information: {invalid_alg}." in str(err.value) def test_ec2_key_derive_key_without_public_key(self): private_key = EC2Key( diff --git a/tests/test_algs_okp.py b/tests/test_algs_okp.py index 95659a3..aae05df 100644 --- a/tests/test_algs_okp.py +++ b/tests/test_algs_okp.py @@ -53,8 +53,7 @@ def test_okp_key_constructor_with_ed25519_key(self): assert public_key.base_iv is None private_key_obj = private_key.to_dict() assert ( - private_key_obj[-4] - == b"B\xc6u\xd0|-\x07\xe7)\x8d\x1c\x13\x14\xa2\x8dFC1\xdf3sQ\x049|\x14\xc1\xed\x01\xe5\xdb\xa9" + private_key_obj[-4] == b"B\xc6u\xd0|-\x07\xe7)\x8d\x1c\x13\x14\xa2\x8dFC1\xdf3sQ\x049|\x14\xc1\xed\x01\xe5\xdb\xa9" ) public_key_obj = public_key.to_dict() assert ( @@ -151,8 +150,7 @@ def test_okp_key_constructor_with_x25519_key(self): assert public_key.base_iv is None private_key_obj = private_key.to_dict() assert ( - private_key_obj[-4] - == b"\xbe\xc2u\xa1~M6-\x08\x19\xdc\x06\x95\xd8\x9as\xbek\xf9Kf\xabrj\xe0\xb1\xaf\xe3\xc4?A\xce" + private_key_obj[-4] == b"\xbe\xc2u\xa1~M6-\x08\x19\xdc\x06\x95\xd8\x9as\xbek\xf9Kf\xabrj\xe0\xb1\xaf\xe3\xc4?A\xce" ) public_key_obj = public_key.to_dict() assert ( diff --git a/tests/test_claims.py b/tests/test_claims.py index deeb673..27f07b1 100644 --- a/tests/test_claims.py +++ b/tests/test_claims.py @@ -69,7 +69,7 @@ def test_claims_from_json(self, json, expected): for k, v in claims.items(): assert v == expected[k] assert isinstance(v, type(expected[k])) - len(claims) == len(expected) + assert len(claims) == len(expected) @pytest.mark.parametrize( "json", diff --git a/tests/test_cose.py b/tests/test_cose.py index 646bcb5..31f29e6 100644 --- a/tests/test_cose.py +++ b/tests/test_cose.py @@ -170,9 +170,7 @@ def test_cose_encode_and_decode_signature1_with_protected_bytes(self): assert b"Hello world!" == ctx.decode(encoded, sig_key) def test_cose_encode_and_decode_mac0_with_verify_kid(self): - ctx = COSE.new( - alg_auto_inclusion=True, kid_auto_inclusion=True, verify_kid=True - ) + ctx = COSE.new(alg_auto_inclusion=True, kid_auto_inclusion=True, verify_kid=True) # MAC0 mac_key = COSEKey.from_symmetric_key(alg="HS256", kid="01") @@ -180,9 +178,7 @@ def test_cose_encode_and_decode_mac0_with_verify_kid(self): assert b"Hello world!" == ctx.decode(encoded, mac_key) def test_cose_encode_and_decode_mac0_without_kid_with_verify_kid(self): - ctx = COSE.new( - alg_auto_inclusion=True, kid_auto_inclusion=True, verify_kid=True - ) + ctx = COSE.new(alg_auto_inclusion=True, kid_auto_inclusion=True, verify_kid=True) # MAC0 mac_key = COSEKey.from_symmetric_key(alg="HS256") @@ -262,9 +258,7 @@ def test_cose_encode_and_mac_with_recipient_has_unsupported_alg(self, ctx): recipients=[RecipientInterface(unprotected={1: 0, 4: b"our-secret"})], ) pytest.fail("encode_and_mac should fail.") - assert "Algorithms other than direct are not supported for recipients." in str( - err.value - ) + assert "Algorithms other than direct are not supported for recipients." in str(err.value) def test_cose_encode_and_encrypt_with_recipient_has_unsupported_alg(self, ctx): key = COSEKey.from_jwk( @@ -284,9 +278,7 @@ def test_cose_encode_and_encrypt_with_recipient_has_unsupported_alg(self, ctx): recipients=[RecipientInterface(unprotected={1: 0, 4: b"our-secret"})], ) pytest.fail("encode_and_encrypt should fail.") - assert "Algorithms other than direct are not supported for recipients." in str( - err.value - ) + assert "Algorithms other than direct are not supported for recipients." in str(err.value) def test_cose_encode_and_mac_with_invalid_payload(self, ctx): key = COSEKey.from_symmetric_key(alg="HS256") @@ -507,7 +499,7 @@ def test_cose_decode_mac0_with_multiple_kid(self): key2 = COSEKey.from_symmetric_key(alg="HS256", kid="01") key3 = COSEKey.from_symmetric_key(alg="HS256", kid="02") encoded = ctx.encode_and_mac(b"Hello world!", key2) - b"Hello world!" == ctx.decode(encoded, [key1, key2, key3]) + assert b"Hello world!" == ctx.decode(encoded, [key1, key2, key3]) def test_cose_decode_mac_with_multiple_keys_without_kid(self, ctx): key = COSEKey.from_symmetric_key(alg="HS256") @@ -525,7 +517,7 @@ def test_cose_decode_mac_with_multiple_keys_without_kid(self, ctx): shared_key1 = COSEKey.from_jwk({"kty": "oct", "alg": "A128KW", "k": material1}) shared_key3 = COSEKey.from_jwk({"kty": "oct", "alg": "A128KW", "k": material3}) - b"Hello world!" == ctx.decode(encoded, keys=[shared_key3, shared_key1]) + assert b"Hello world!" == ctx.decode(encoded, keys=[shared_key3, shared_key1]) def test_cose_decode_mac_with_multiple_keys_with_verify_kid(self): ctx = COSE.new(alg_auto_inclusion=True, verify_kid=True) @@ -535,23 +527,15 @@ def test_cose_decode_mac_with_multiple_keys_with_verify_kid(self): material2 = base64.urlsafe_b64encode(token_bytes(16)).decode() material3 = base64.urlsafe_b64encode(token_bytes(16)).decode() - r1 = Recipient.from_jwk( - {"kid": "01", "kty": "oct", "alg": "A128KW", "k": material1} - ) - r2 = Recipient.from_jwk( - {"kid": "02", "kty": "oct", "alg": "A128KW", "k": material2} - ) + r1 = Recipient.from_jwk({"kid": "01", "kty": "oct", "alg": "A128KW", "k": material1}) + r2 = Recipient.from_jwk({"kid": "02", "kty": "oct", "alg": "A128KW", "k": material2}) r1.apply(key) r2.apply(key) encoded = ctx.encode_and_mac(b"Hello world!", key, recipients=[r2, r1]) - shared_key1 = COSEKey.from_jwk( - {"kid": "01", "kty": "oct", "alg": "A128KW", "k": material1} - ) - shared_key3 = COSEKey.from_jwk( - {"kid": "03", "kty": "oct", "alg": "A128KW", "k": material3} - ) + shared_key1 = COSEKey.from_jwk({"kid": "01", "kty": "oct", "alg": "A128KW", "k": material1}) + shared_key3 = COSEKey.from_jwk({"kid": "03", "kty": "oct", "alg": "A128KW", "k": material3}) assert b"Hello world!" == ctx.decode(encoded, keys=[shared_key3, shared_key1]) @@ -574,12 +558,8 @@ def test_cose_decode_mac_with_multiple_keys_with_verify_kid_and_protected_kid(se encoded = ctx.encode_and_mac(b"Hello world!", key, recipients=[r2, r1]) - shared_key1 = COSEKey.from_jwk( - {"kid": "01", "kty": "oct", "alg": "A128KW", "k": material1} - ) - shared_key3 = COSEKey.from_jwk( - {"kid": "03", "kty": "oct", "alg": "A128KW", "k": material3} - ) + shared_key1 = COSEKey.from_jwk({"kid": "01", "kty": "oct", "alg": "A128KW", "k": material1}) + shared_key3 = COSEKey.from_jwk({"kid": "03", "kty": "oct", "alg": "A128KW", "k": material3}) assert b"Hello world!" == ctx.decode(encoded, keys=[shared_key3, shared_key1]) @@ -623,12 +603,8 @@ def test_cose_decode_mac_with_multiple_keys_without_recipient_kid_with_verify_ki encoded = ctx.encode_and_mac(b"Hello world!", key, recipients=[r2, r1]) - shared_key1 = COSEKey.from_jwk( - {"kid": "01", "kty": "oct", "alg": "A128KW", "k": material1} - ) - shared_key3 = COSEKey.from_jwk( - {"kid": "02", "kty": "oct", "alg": "A128KW", "k": material3} - ) + shared_key1 = COSEKey.from_jwk({"kid": "01", "kty": "oct", "alg": "A128KW", "k": material1}) + shared_key3 = COSEKey.from_jwk({"kid": "02", "kty": "oct", "alg": "A128KW", "k": material3}) with pytest.raises(ValueError) as err: ctx.decode(encoded, keys=[shared_key3, shared_key1]) @@ -643,19 +619,13 @@ def test_cose_decode_mac_with_different_multiple_keys(self, ctx): material2 = base64.urlsafe_b64encode(token_bytes(16)).decode() material3 = base64.urlsafe_b64encode(token_bytes(16)).decode() - r2 = Recipient.from_jwk( - {"kid": "03", "kty": "oct", "alg": "A128KW", "k": material2} - ) + r2 = Recipient.from_jwk({"kid": "03", "kty": "oct", "alg": "A128KW", "k": material2}) r2.apply(key) encoded = ctx.encode_and_mac(b"Hello world!", key, recipients=[r2]) - shared_key1 = COSEKey.from_jwk( - {"kid": "01", "kty": "oct", "alg": "A128KW", "k": material1} - ) - shared_key3 = COSEKey.from_jwk( - {"kid": "03", "kty": "oct", "alg": "A128KW", "k": material3} - ) + shared_key1 = COSEKey.from_jwk({"kid": "01", "kty": "oct", "alg": "A128KW", "k": material1}) + shared_key3 = COSEKey.from_jwk({"kid": "03", "kty": "oct", "alg": "A128KW", "k": material3}) with pytest.raises(DecodeError) as err: ctx.decode(encoded, keys=[shared_key1, shared_key3]) @@ -670,19 +640,13 @@ def test_cose_decode_mac_with_different_multiple_keys_2(self): material2 = base64.urlsafe_b64encode(token_bytes(16)).decode() material3 = base64.urlsafe_b64encode(token_bytes(16)).decode() - r2 = Recipient.from_jwk( - {"kid": "03", "kty": "oct", "alg": "A128KW", "k": material2} - ) + r2 = Recipient.from_jwk({"kid": "03", "kty": "oct", "alg": "A128KW", "k": material2}) r2.apply(key) encoded = ctx.encode_and_mac(b"Hello world!", key, recipients=[r2]) - shared_key1 = COSEKey.from_jwk( - {"kid": "01", "kty": "oct", "alg": "A128KW", "k": material1} - ) - shared_key3 = COSEKey.from_jwk( - {"kid": "02", "kty": "oct", "alg": "A128KW", "k": material3} - ) + shared_key1 = COSEKey.from_jwk({"kid": "01", "kty": "oct", "alg": "A128KW", "k": material1}) + shared_key3 = COSEKey.from_jwk({"kid": "02", "kty": "oct", "alg": "A128KW", "k": material3}) with pytest.raises(ValueError) as err: ctx.decode(encoded, keys=[shared_key1, shared_key3]) @@ -743,23 +707,15 @@ def test_cose_decode_encrypt_with_multiple_keys_with_verify_kid(self): material2 = base64.urlsafe_b64encode(token_bytes(16)).decode() material3 = base64.urlsafe_b64encode(token_bytes(16)).decode() - r1 = Recipient.from_jwk( - {"kid": "01", "kty": "oct", "alg": "A128KW", "k": material1} - ) - r2 = Recipient.from_jwk( - {"kid": "02", "kty": "oct", "alg": "A128KW", "k": material2} - ) + r1 = Recipient.from_jwk({"kid": "01", "kty": "oct", "alg": "A128KW", "k": material1}) + r2 = Recipient.from_jwk({"kid": "02", "kty": "oct", "alg": "A128KW", "k": material2}) r1.apply(key) r2.apply(key) encoded = ctx.encode_and_encrypt(b"Hello world!", key, recipients=[r2, r1]) - shared_key1 = COSEKey.from_jwk( - {"kid": "01", "kty": "oct", "alg": "A128KW", "k": material1} - ) - shared_key3 = COSEKey.from_jwk( - {"kid": "03", "kty": "oct", "alg": "A128KW", "k": material3} - ) + shared_key1 = COSEKey.from_jwk({"kid": "01", "kty": "oct", "alg": "A128KW", "k": material1}) + shared_key3 = COSEKey.from_jwk({"kid": "03", "kty": "oct", "alg": "A128KW", "k": material3}) assert b"Hello world!" == ctx.decode(encoded, keys=[shared_key3, shared_key1]) @@ -947,9 +903,7 @@ def test_cose_decode_signature_with_key_not_found(self): def test_cose_decode_ecdh_es_hkdf_256_without_context(self): with open(key_path("public_key_es256.pem")) as key_file: public_key = COSEKey.from_pem(key_file.read(), kid="01") - recipient = Recipient.from_jwk( - {"kty": "EC", "crv": "P-256", "alg": "ECDH-ES+HKDF-256"} - ) + recipient = Recipient.from_jwk({"kty": "EC", "crv": "P-256", "alg": "ECDH-ES+HKDF-256"}) enc_key = recipient.apply(recipient_key=public_key, context={"alg": "A128GCM"}) ctx = COSE.new(alg_auto_inclusion=True) encoded = ctx.encode_and_encrypt( @@ -969,9 +923,7 @@ def test_cose_decode_ecdh_aes_key_wrap_without_context(self): with open(key_path("public_key_es256.pem")) as key_file: public_key = COSEKey.from_pem(key_file.read(), kid="01") enc_key = COSEKey.from_symmetric_key(alg="A128GCM") - recipient = Recipient.from_jwk( - {"kty": "EC", "crv": "P-256", "alg": "ECDH-ES+A128KW"} - ) + recipient = Recipient.from_jwk({"kty": "EC", "crv": "P-256", "alg": "ECDH-ES+A128KW"}) recipient.apply(enc_key, recipient_key=public_key, context={"alg": "A128GCM"}) ctx = COSE.new(alg_auto_inclusion=True) encoded = ctx.encode_and_encrypt( diff --git a/tests/test_cose_key.py b/tests/test_cose_key.py index 756fd42..edddc08 100644 --- a/tests/test_cose_key.py +++ b/tests/test_cose_key.py @@ -540,17 +540,13 @@ def test_key_builder_from_jwk_without_use(self): ("private_key_rsa.json", "public_key_rsa.json"), ], ) - def test_key_builder_from_jwk_with_encode_and_sign( - self, private_key_path, public_key_path - ): + def test_key_builder_from_jwk_with_encode_and_sign(self, private_key_path, public_key_path): with open(key_path(private_key_path)) as key_file: private_key = COSEKey.from_jwk(key_file.read()) with open(key_path(public_key_path)) as key_file: public_key = COSEKey.from_jwk(key_file.read()) token = cwt.encode_and_sign( - Claims.from_json( - {"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"} - ), + Claims.from_json({"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}), private_key, ) # token = cwt.encode( diff --git a/tests/test_cose_sample.py b/tests/test_cose_sample.py index 2f9a8a6..6d5beee 100644 --- a/tests/test_cose_sample.py +++ b/tests/test_cose_sample.py @@ -75,9 +75,7 @@ def test_cose_usage_examples_cose_mac_direct_hkdf_sha_256(self): key=mac_key, recipients=[r], ) - assert b"Hello world!" == ctx.decode( - encoded, shared_key, context={"alg": "HS256"} - ) + assert b"Hello world!" == ctx.decode(encoded, shared_key, context={"alg": "HS256"}) def test_cose_usage_examples_cose_mac_aes_key_wrap(self): @@ -142,9 +140,7 @@ def test_cose_usage_examples_cose_mac_ecdh_direct_hkdf_p256(self): "d": "r_kHyZ-a06rmxM3yESK84r1otSg-aQcVStkRhA-iCM8", } ) - assert b"Hello world!" == ctx.decode( - encoded, priv_key, context={"alg": "HS256"} - ) + assert b"Hello world!" == ctx.decode(encoded, priv_key, context={"alg": "HS256"}) def test_cose_usage_examples_cose_mac_ecdh_direct_hkdf_x25519(self): @@ -181,9 +177,7 @@ def test_cose_usage_examples_cose_mac_ecdh_direct_hkdf_x25519(self): "d": "vsJ1oX5NNi0IGdwGldiac75r-Utmq3Jq4LGv48Q_Qc4", } ) - assert b"Hello world!" == ctx.decode( - encoded, priv_key, context={"alg": "HS256"} - ) + assert b"Hello world!" == ctx.decode(encoded, priv_key, context={"alg": "HS256"}) def test_cose_usage_examples_cose_mac_ecdh_direct_hkdf_x448(self): @@ -220,9 +214,7 @@ def test_cose_usage_examples_cose_mac_ecdh_direct_hkdf_x448(self): "d": "rJJRG3nshyCtd9CgXld8aNaB9YXKR0UOi7zj7hApg9YH4XdBO0G8NcAFNz_uPH2GnCZVcSDgV5c", } ) - assert b"Hello world!" == ctx.decode( - encoded, priv_key, context={"alg": "HS256"} - ) + assert b"Hello world!" == ctx.decode(encoded, priv_key, context={"alg": "HS256"}) def test_cose_usage_examples_cose_mac_ecdh_ss_a128kw(self): @@ -267,9 +259,7 @@ def test_cose_usage_examples_cose_mac_ecdh_ss_a128kw(self): "d": "r_kHyZ-a06rmxM3yESK84r1otSg-aQcVStkRhA-iCM8", } ) - assert b"Hello world!" == ctx.decode( - encoded, priv_key, context={"alg": "HS256"} - ) + assert b"Hello world!" == ctx.decode(encoded, priv_key, context={"alg": "HS256"}) def test_cose_usage_examples_cose_encrypt0(self): enc_key = COSEKey.from_symmetric_key(alg="ChaCha20/Poly1305", kid="01") @@ -354,9 +344,7 @@ def test_cose_usage_examples_cose_encrypt_direct_hkdf_sha_256(self): key=enc_key, recipients=[r], ) - assert b"Hello world!" == ctx.decode( - encoded, shared_key, context={"alg": "A256GCM"} - ) + assert b"Hello world!" == ctx.decode(encoded, shared_key, context={"alg": "A256GCM"}) def test_cose_usage_examples_cose_encrypt_aes_key_wrap_a128kw(self): # A key to wrap @@ -419,9 +407,7 @@ def test_cose_usage_examples_cose_encrypt_ecdh_direct_hkdf_p256(self): "d": "r_kHyZ-a06rmxM3yESK84r1otSg-aQcVStkRhA-iCM8", } ) - assert b"Hello world!" == ctx.decode( - encoded, priv_key, context={"alg": "A128GCM"} - ) + assert b"Hello world!" == ctx.decode(encoded, priv_key, context={"alg": "A128GCM"}) def test_cose_usage_examples_cose_encrypt_ecdh_direct_hkdf_x25519(self): @@ -458,9 +444,7 @@ def test_cose_usage_examples_cose_encrypt_ecdh_direct_hkdf_x25519(self): "d": "vsJ1oX5NNi0IGdwGldiac75r-Utmq3Jq4LGv48Q_Qc4", } ) - assert b"Hello world!" == ctx.decode( - encoded, priv_key, context={"alg": "A128GCM"} - ) + assert b"Hello world!" == ctx.decode(encoded, priv_key, context={"alg": "A128GCM"}) def test_cose_usage_examples_cose_encrypt_ecdh_direct_hkdf_x448(self): @@ -497,9 +481,7 @@ def test_cose_usage_examples_cose_encrypt_ecdh_direct_hkdf_x448(self): "d": "rJJRG3nshyCtd9CgXld8aNaB9YXKR0UOi7zj7hApg9YH4XdBO0G8NcAFNz_uPH2GnCZVcSDgV5c", } ) - assert b"Hello world!" == ctx.decode( - encoded, priv_key, context={"alg": "A128GCM"} - ) + assert b"Hello world!" == ctx.decode(encoded, priv_key, context={"alg": "A128GCM"}) def test_cose_usage_examples_cose_encrypt_ecdh_ss_a128kw(self): @@ -546,9 +528,7 @@ def test_cose_usage_examples_cose_encrypt_ecdh_ss_a128kw(self): "d": "r_kHyZ-a06rmxM3yESK84r1otSg-aQcVStkRhA-iCM8", } ) - assert b"Hello world!" == ctx.decode( - encoded, priv_key, context={"alg": "A128GCM"} - ) + assert b"Hello world!" == ctx.decode(encoded, priv_key, context={"alg": "A128GCM"}) def test_cose_usage_examples_cose_signature1(self): @@ -631,9 +611,7 @@ def test_cose_usage_examples_cose_encrypt_ecdh_aes_key_wrap(self): "d": "r_kHyZ-a06rmxM3yESK84r1otSg-aQcVStkRhA-iCM8", } ) - assert b"Hello world!" == ctx.decode( - encoded, priv_key, context={"alg": "A128GCM"} - ) + assert b"Hello world!" == ctx.decode(encoded, priv_key, context={"alg": "A128GCM"}) def test_cose_usage_examples_cose_signature(self): diff --git a/tests/test_cose_wg_examples.py b/tests/test_cose_wg_examples.py index 795a1aa..c34740a 100644 --- a/tests/test_cose_wg_examples.py +++ b/tests/test_cose_wg_examples.py @@ -107,12 +107,7 @@ def test_cose_wg_examples_sign1_pass_02(self): unprotected={4: b"11"}, external_aad=bytes.fromhex("11aa22bb33cc44dd55006699"), ) - assert ( - ctx.decode( - encoded, key, external_aad=bytes.fromhex("11aa22bb33cc44dd55006699") - ) - == b"This is the content." - ) + assert ctx.decode(encoded, key, external_aad=bytes.fromhex("11aa22bb33cc44dd55006699")) == b"This is the content." assert ( ctx.decode( bytes.fromhex(cwt_str), @@ -197,10 +192,7 @@ def test_cose_wg_examples_ecdsa_01(self): protected={3: 0}, ) assert ctx.decode(encoded, signer.cose_key) == b"This is the content." - assert ( - ctx.decode(bytes.fromhex(cwt_str), signer.cose_key) - == b"This is the content." - ) + assert ctx.decode(bytes.fromhex(cwt_str), signer.cose_key) == b"This is the content." def test_cose_wg_examples_eddsa_01(self): cwt_str = "D8628443A10300A054546869732069732074686520636F6E74656E742E818343A10127A104423131584077F3EACD11852C4BF9CB1D72FABE6B26FBA1D76092B2B5B7EC83B83557652264E69690DBC1172DDC0BF88411C0D25A507FDB247A20C40D5E245FABD3FC9EC106" @@ -209,18 +201,10 @@ def test_cose_wg_examples_eddsa_01(self): "kty": "OKP", "kid": "11", "crv": "Ed25519", - "x": base64.urlsafe_b64encode( - bytes.fromhex( - "d75a980182b10ab7d54bfed3c964073a0ee172f3daa62325af021a68f707511a" - ) - ) + "x": base64.urlsafe_b64encode(bytes.fromhex("d75a980182b10ab7d54bfed3c964073a0ee172f3daa62325af021a68f707511a")) .replace(b"=", b"") .decode("ascii"), - "d": base64.urlsafe_b64encode( - bytes.fromhex( - "9d61b19deffd5a60ba844af492ec2cc44449c5697b326919703bac031cae7f60" - ) - ) + "d": base64.urlsafe_b64encode(bytes.fromhex("9d61b19deffd5a60ba844af492ec2cc44449c5697b326919703bac031cae7f60")) .replace(b"=", b"") .decode("ascii"), } @@ -272,18 +256,10 @@ def test_cose_wg_examples_eddsa_sig_01(self): "kty": "OKP", "kid": "11", "crv": "Ed25519", - "x": base64.urlsafe_b64encode( - bytes.fromhex( - "d75a980182b10ab7d54bfed3c964073a0ee172f3daa62325af021a68f707511a" - ) - ) + "x": base64.urlsafe_b64encode(bytes.fromhex("d75a980182b10ab7d54bfed3c964073a0ee172f3daa62325af021a68f707511a")) .replace(b"=", b"") .decode("ascii"), - "d": base64.urlsafe_b64encode( - bytes.fromhex( - "9d61b19deffd5a60ba844af492ec2cc44449c5697b326919703bac031cae7f60" - ) - ) + "d": base64.urlsafe_b64encode(bytes.fromhex("9d61b19deffd5a60ba844af492ec2cc44449c5697b326919703bac031cae7f60")) .replace(b"=", b"") .decode("ascii"), } @@ -411,9 +387,7 @@ def test_cose_wg_examples_chacha_poly_enc_01(self, ctx): def test_cose_wg_examples_rfc8152_c_3_2(self): cwt_str = "D8608443A1010AA1054D89F52F65A1C580933B5261A76C581C753548A19B1307084CA7B2056924ED95F2E3B17006DFE931B687B847818343A10129A2335061616262636364646565666667676868044A6F75722D73656372657440" - recipient = Recipient.new( - {1: -10}, {-20: b"aabbccddeeffgghh", 4: b"our-secret"} - ) + recipient = Recipient.new({1: -10}, {-20: b"aabbccddeeffgghh", 4: b"our-secret"}) material = COSEKey.from_symmetric_key( key=base64url_decode("hJtXIZ2uSN5kbQfbtTNWbpdmhkV8FJG-Onbc6mxCcYg"), alg="A256GCM", @@ -452,9 +426,7 @@ def test_cose_wg_examples_rfc8152_c_3_2(self): def test_cose_wg_examples_rfc8152_c_3_2_with_json(self): cwt_str = "D8608443A1010AA1054D89F52F65A1C580933B5261A76C581C753548A19B1307084CA7B2056924ED95F2E3B17006DFE931B687B847818343A10129A2335061616262636364646565666667676868044A6F75722D73656372657440" - recipient = Recipient.new( - {1: -10}, {-20: b"aabbccddeeffgghh", 4: b"our-secret"} - ) + recipient = Recipient.new({1: -10}, {-20: b"aabbccddeeffgghh", 4: b"our-secret"}) context = { "alg": "AES-CCM-16-64-128", "apu": { @@ -572,9 +544,7 @@ def test_cose_wg_examples_ecdh_direct_p256_hkdf_256_01(self): "d": "r_kHyZ-a06rmxM3yESK84r1otSg-aQcVStkRhA-iCM8", } ) - assert b"This is the content." == ctx.decode( - encoded, priv_key, context={"alg": "A128GCM"} - ) + assert b"This is the content." == ctx.decode(encoded, priv_key, context={"alg": "A128GCM"}) def test_cose_wg_examples_ecdh_wrap_p256_ss_wrap_128_01(self): # The sender side: @@ -623,6 +593,4 @@ def test_cose_wg_examples_ecdh_wrap_p256_ss_wrap_128_01(self): "d": "r_kHyZ-a06rmxM3yESK84r1otSg-aQcVStkRhA-iCM8", } ) - assert b"This is the content." == ctx.decode( - encoded, priv_key, context={"alg": "A128GCM"} - ) + assert b"This is the content." == ctx.decode(encoded, priv_key, context={"alg": "A128GCM"}) diff --git a/tests/test_cwt.py b/tests/test_cwt.py index d11e6d5..b713f9b 100644 --- a/tests/test_cwt.py +++ b/tests/test_cwt.py @@ -108,9 +108,7 @@ def test_cwt_leeway_with_negative_value(self): def test_cwt_encode_with_claims_object(self, ctx): key = COSEKey.from_symmetric_key(alg="HS256", kid="01") token = ctx.encode( - Claims.from_json( - {"iss": "https://as.example", "sub": "someone", "cti": b"123"} - ), + Claims.from_json({"iss": "https://as.example", "sub": "someone", "cti": b"123"}), key, ) decoded = ctx.decode(token, key) @@ -135,9 +133,7 @@ def test_cwt_encode_with_invalid_key(self, ctx, invalid_key): def test_cwt_encode_and_mac_with_default_alg(self, ctx): key = COSEKey.from_symmetric_key("mysecret", alg="HS256", kid="01") - token = ctx.encode_and_mac( - {1: "https://as.example", 2: "someone", 7: b"123"}, key - ) + token = ctx.encode_and_mac({1: "https://as.example", 2: "someone", 7: b"123"}, key) decoded = ctx.decode(token, key) assert 1 in decoded and decoded[1] == "https://as.example" assert 2 in decoded and decoded[2] == "someone" @@ -159,9 +155,7 @@ def test_cwt_encode_and_mac_with_default_alg(self, ctx): ) def test_cwt_encode_and_mac_with_valid_alg_hmac(self, ctx, alg): key = COSEKey.from_symmetric_key("mysecret", alg=alg, kid="01") - token = ctx.encode_and_mac( - {1: "https://as.example", 2: "someone", 7: b"123"}, key - ) + token = ctx.encode_and_mac({1: "https://as.example", 2: "someone", 7: b"123"}, key) decoded = ctx.decode(token, key) assert 1 in decoded and decoded[1] == "https://as.example" assert 2 in decoded and decoded[2] == "someone" @@ -181,9 +175,7 @@ def test_cwt_encode_and_mac_with_tagged(self, ctx): def test_cwt_encode_and_mac_with_recipient(self, ctx): recipient = RecipientInterface(unprotected={1: -6, 4: b"our-secret"}) - key = COSEKey.from_symmetric_key( - "mysecret", alg="HMAC 256/64", kid="our-secret" - ) + key = COSEKey.from_symmetric_key("mysecret", alg="HMAC 256/64", kid="our-secret") token = ctx.encode_and_mac( {1: "https://as.example", 2: "someone", 7: b"123"}, key, @@ -318,10 +310,7 @@ def test_cwt_encode_and_encrypt_with_invalid_key_and_without_nonce(self, ctx): {1: "https://as.example", 2: "someone", 7: b"123"}, enc_key, ) - assert ( - "Nonce generation is not supported for the key. Set a nonce explicitly." - in str(err.value) - ) + assert "Nonce generation is not supported for the key. Set a nonce explicitly." in str(err.value) @pytest.mark.parametrize( "alg", @@ -387,9 +376,7 @@ def test_cwt_encode_and_encrypt_with_tagged(self, ctx): assert 7 in decoded and decoded[7] == b"123" def test_cwt_encode_and_encrypt_with_recipient_direct(self, ctx): - enc_key = COSEKey.from_symmetric_key( - token_bytes(16), alg="AES-CCM-16-64-128", kid="our-secret" - ) + enc_key = COSEKey.from_symmetric_key(token_bytes(16), alg="AES-CCM-16-64-128", kid="our-secret") recipient = RecipientInterface(unprotected={1: -6, 4: b"our-secret"}) token = ctx.encode_and_encrypt( {1: "https://as.example", 2: "someone", 7: b"123"}, @@ -413,9 +400,7 @@ def test_cwt_encode_and_encrypt_with_recipient_direct(self, ctx): # ("private_key_x448.pem", "public_key_x448.pem"), ], ) - def test_cwt_encode_and_sign_with_valid_alg( - self, ctx, private_key_path, public_key_path - ): + def test_cwt_encode_and_sign_with_valid_alg(self, ctx, private_key_path, public_key_path): with open(key_path(private_key_path)) as key_file: private_key = COSEKey.from_pem(key_file.read(), kid="01") with open(key_path(public_key_path)) as key_file: @@ -526,9 +511,7 @@ def test_cwt_encode_and_sign_with_signatures_kid_mismatch(self, ctx): def test_cwt_decode_with_invalid_mac_key(self, ctx): key = COSEKey.from_symmetric_key("mysecret", alg="HS256", kid="01") - token = ctx.encode_and_mac( - {1: "https://as.example", 2: "someone", 7: b"123"}, key - ) + token = ctx.encode_and_mac({1: "https://as.example", 2: "someone", 7: b"123"}, key) wrong_key = COSEKey.from_symmetric_key("xxxxxxxxxx", alg="HS256", kid="01") with pytest.raises(VerifyError) as err: res = ctx.decode(token, wrong_key) @@ -551,12 +534,8 @@ def test_cwt_decode_with_invalid_cwt(self, ctx, invalid): assert "Failed to decode." in str(err.value) def test_cwt_decode_with_invalid_enc_key(self, ctx): - enc_key = COSEKey.from_symmetric_key( - token_bytes(16), alg="AES-CCM-16-64-128", kid="01" - ) - wrong_key = COSEKey.from_symmetric_key( - token_bytes(16), alg="AES-CCM-16-64-128", kid="01" - ) + enc_key = COSEKey.from_symmetric_key(token_bytes(16), alg="AES-CCM-16-64-128", kid="01") + wrong_key = COSEKey.from_symmetric_key(token_bytes(16), alg="AES-CCM-16-64-128", kid="01") token = ctx.encode_and_encrypt( {1: "https://as.example", 2: "someone", 7: b"123"}, enc_key, diff --git a/tests/test_cwt_sample.py b/tests/test_cwt_sample.py index 5af79ba..c6be1bf 100644 --- a/tests/test_cwt_sample.py +++ b/tests/test_cwt_sample.py @@ -16,14 +16,11 @@ from .utils import key_path, now # A sample of 128-Bit Symmetric Key referred from RFC8392 -SAMPLE_COSE_KEY_RFC8392_A2_1 = ( - "a42050231f4c4d4d3051fdc2ec0a3851d5b3830104024c53796d6d6574726963313238030a" -) +SAMPLE_COSE_KEY_RFC8392_A2_1 = "a42050231f4c4d4d3051fdc2ec0a3851d5b3830104024c53796d6d6574726963313238030a" # A sample of 256-Bit Symmetric Key referred from RFC8392 SAMPLE_COSE_KEY_RFC8392_A2_2 = ( - "a4205820403697de87af64611c1d32a05dab0fe1fcb715a86ab435f1ec99192d" - "795693880104024c53796d6d6574726963323536030a" + "a4205820403697de87af64611c1d32a05dab0fe1fcb715a86ab435f1ec99192d" "795693880104024c53796d6d6574726963323536030a" ) # A sample of ECDSA P-256 256-Bit COSE Key referred from RFC8392 @@ -81,9 +78,7 @@ class TestSample: def test_sample_readme_maced_cwt_with_json_dict_old(self): key = COSEKey.from_symmetric_key("mysecretpassword", alg="HS256", kid="01") encoded = cwt.encode_and_mac( - Claims.from_json( - {"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"} - ), + Claims.from_json({"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}), key, ) decoded = cwt.decode(encoded, key) @@ -96,18 +91,14 @@ def test_sample_readme_maced_cwt_with_json_dict_old(self): def test_sample_readme_maced_cwt_with_json_dict(self): key = COSEKey.from_symmetric_key(alg="HMAC 256/256", kid="01") - token = cwt.encode( - {"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}, key - ) + token = cwt.encode({"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}, key) decoded = cwt.decode(token, key) assert 1 in decoded and decoded[1] == "coaps://as.example" def test_sample_readme_maced_cwt_with_json_str_old(self): key = COSEKey.from_symmetric_key("mysecretpassword", alg="HS256", kid="01") encoded = cwt.encode_and_mac( - Claims.from_json( - '{"iss":"coaps://as.example","sub":"dajiaji","cti":"123"}' - ), + Claims.from_json('{"iss":"coaps://as.example","sub":"dajiaji","cti":"123"}'), key, ) decoded = cwt.decode(encoded, key) @@ -115,18 +106,14 @@ def test_sample_readme_maced_cwt_with_json_str_old(self): def test_sample_readme_maced_cwt_with_json_str(self): key = COSEKey.from_symmetric_key(alg="HMAC 256/256", kid="01") - token = cwt.encode( - '{"iss":"coaps://as.example","sub":"dajiaji","cti":"123"}', key - ) + token = cwt.encode('{"iss":"coaps://as.example","sub":"dajiaji","cti":"123"}', key) decoded = cwt.decode(token, key) assert 1 in decoded and decoded[1] == "coaps://as.example" def test_sample_readme_maced_cwt_with_json_bytes_old(self): key = COSEKey.from_symmetric_key("mysecretpassword", alg="HS256", kid="01") encoded = cwt.encode_and_mac( - Claims.from_json( - b'{"iss":"coaps://as.example","sub":"dajiaji","cti":"123"}' - ), + Claims.from_json(b'{"iss":"coaps://as.example","sub":"dajiaji","cti":"123"}'), key, ) decoded = cwt.decode(encoded, key) @@ -134,17 +121,13 @@ def test_sample_readme_maced_cwt_with_json_bytes_old(self): def test_sample_readme_maced_cwt_with_json_bytes(self): key = COSEKey.from_symmetric_key(alg="HMAC 256/256", kid="01") - token = cwt.encode( - b'{"iss":"coaps://as.example","sub":"dajiaji","cti":"123"}', key - ) + token = cwt.encode(b'{"iss":"coaps://as.example","sub":"dajiaji","cti":"123"}', key) decoded = cwt.decode(token, key) assert 1 in decoded and decoded[1] == "coaps://as.example" def test_sample_readme_maced_cwt_old(self): key = COSEKey.from_symmetric_key("mysecretpassword", alg="HS256", kid="01") - encoded = cwt.encode_and_mac( - {1: "coaps://as.example", 2: "dajiaji", 7: b"123"}, key - ) + encoded = cwt.encode_and_mac({1: "coaps://as.example", 2: "dajiaji", 7: b"123"}, key) decoded = cwt.decode(encoded, key) assert 1 in decoded and decoded[1] == "coaps://as.example" @@ -161,9 +144,7 @@ def test_sample_readme_signed_cwt_es256_old(self): public_key = COSEKey.from_pem(key_file.read(), kid="01") encoded = cwt.encode_and_sign( - Claims.from_json( - {"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"} - ), + Claims.from_json({"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}), private_key, ) @@ -176,9 +157,7 @@ def test_sample_readme_signed_cwt_es256(self): with open(key_path("public_key_es256.pem")) as key_file: public_key = COSEKey.from_pem(key_file.read(), kid="01") - token = cwt.encode( - {"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}, private_key - ) + token = cwt.encode({"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}, private_key) decoded = cwt.decode(token, public_key) assert 1 in decoded and decoded[1] == "coaps://as.example" @@ -201,9 +180,7 @@ def test_sample_readme_signed_cwt_es256_with_cert(self): with open(key_path("cert_es256.json")) as f: public_key = COSEKey.from_jwk(f.read()) - token = cwt.encode( - {"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}, private_key - ) + token = cwt.encode({"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}, private_key) decoder = CWT.new(ca_certs=key_path("cacert.pem")) decoded = decoder.decode(token, public_key) @@ -216,9 +193,7 @@ def test_sample_readme_signed_cwt_es256_with_cert_without_intermediates(self): with open(key_path("cert_es256_2.json")) as f: public_key = COSEKey.from_jwk(f.read()) - token = cwt.encode( - {"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}, private_key - ) + token = cwt.encode({"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}, private_key) decoder = CWT.new(ca_certs=key_path("cacert.pem")) decoded = decoder.decode(token, public_key) @@ -231,9 +206,7 @@ def test_sample_readme_signed_cwt_es256_with_another_ca_cert(self): with open(key_path("cert_es256.json")) as f: public_key = COSEKey.from_jwk(f.read()) - token = cwt.encode( - {"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}, private_key - ) + token = cwt.encode({"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}, private_key) decoder = CWT.new(ca_certs=key_path("cacert_2.pem")) with pytest.raises(VerifyError) as err: @@ -268,9 +241,7 @@ def test_sample_readme_signed_cwt_es384_old(self): def test_sample_readme_signed_cwt_es384(self): with open(key_path("private_key_es384.pem")) as key_file: private_key = COSEKey.from_pem(key_file.read(), kid="01") - token = cwt.encode( - {"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}, private_key - ) + token = cwt.encode({"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}, private_key) with open(key_path("public_key_es384.pem")) as key_file: public_key = COSEKey.from_pem(key_file.read(), kid="01") @@ -281,9 +252,7 @@ def test_sample_readme_signed_cwt_es512_old(self): with open(key_path("private_key_es512.pem")) as key_file: private_key = COSEKey.from_pem(key_file.read(), kid="01") encoded = cwt.encode_and_sign( - Claims.from_json( - {"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"} - ), + Claims.from_json({"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}), private_key, ) @@ -295,9 +264,7 @@ def test_sample_readme_signed_cwt_es512_old(self): def test_sample_readme_signed_cwt_es512(self): with open(key_path("private_key_es512.pem")) as key_file: private_key = COSEKey.from_pem(key_file.read(), kid="01") - token = cwt.encode( - {"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}, private_key - ) + token = cwt.encode({"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}, private_key) with open(key_path("public_key_es512.pem")) as key_file: public_key = COSEKey.from_pem(key_file.read(), kid="01") @@ -308,9 +275,7 @@ def test_sample_readme_signed_cwt_es256k_old(self): with open(key_path("private_key_es256k.pem")) as key_file: private_key = COSEKey.from_pem(key_file.read(), kid="01") encoded = cwt.encode_and_sign( - Claims.from_json( - {"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"} - ), + Claims.from_json({"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}), private_key, ) @@ -322,9 +287,7 @@ def test_sample_readme_signed_cwt_es256k_old(self): def test_sample_readme_signed_cwt_es256k(self): with open(key_path("private_key_es256k.pem")) as key_file: private_key = COSEKey.from_pem(key_file.read(), kid="01") - token = cwt.encode( - {"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}, private_key - ) + token = cwt.encode({"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}, private_key) with open(key_path("public_key_es256k.pem")) as key_file: public_key = COSEKey.from_pem(key_file.read(), kid="01") @@ -335,9 +298,7 @@ def test_sample_readme_signed_cwt_ed25519_old(self): with open(key_path("private_key_ed25519.pem")) as key_file: private_key = COSEKey.from_pem(key_file.read(), kid="01") encoded = cwt.encode_and_sign( - Claims.from_json( - {"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"} - ), + Claims.from_json({"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}), private_key, ) @@ -349,9 +310,7 @@ def test_sample_readme_signed_cwt_ed25519_old(self): def test_sample_readme_signed_cwt_ed25519(self): with open(key_path("private_key_ed25519.pem")) as key_file: private_key = COSEKey.from_pem(key_file.read(), kid="01") - token = cwt.encode( - {"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}, private_key - ) + token = cwt.encode({"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}, private_key) with open(key_path("public_key_ed25519.pem")) as key_file: public_key = COSEKey.from_pem(key_file.read(), kid="01") @@ -372,9 +331,7 @@ def test_sample_readme_signed_cwt_ed25519_with_jwk(self): "d": "L8JS08VsFZoZxGa9JvzYmCWOwg7zaKcei3KZmYsj7dc", } ) - token = cwt.encode( - {"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}, private_key - ) + token = cwt.encode({"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}, private_key) # The recipient side: public_key = COSEKey.from_jwk( @@ -392,9 +349,7 @@ def test_sample_readme_signed_cwt_ed25519_with_jwk(self): def test_sample_readme_signed_cwt_rs256(self): with open(key_path("private_key_rsa.pem")) as key_file: private_key = COSEKey.from_pem(key_file.read(), alg="RS256", kid="01") - token = cwt.encode( - {"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}, private_key - ) + token = cwt.encode({"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}, private_key) with open(key_path("public_key_rsa.pem")) as key_file: public_key = COSEKey.from_pem(key_file.read(), alg="RS256", kid="01") @@ -404,9 +359,7 @@ def test_sample_readme_signed_cwt_rs256(self): def test_sample_readme_signed_cwt_ps256(self): with open(key_path("private_key_rsa.pem")) as key_file: private_key = COSEKey.from_pem(key_file.read(), alg="PS256", kid="01") - token = cwt.encode( - {"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}, private_key - ) + token = cwt.encode({"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}, private_key) with open(key_path("public_key_rsa.pem")) as key_file: public_key = COSEKey.from_pem(key_file.read(), alg="PS256", kid="01") @@ -416,13 +369,9 @@ def test_sample_readme_signed_cwt_ps256(self): def test_sample_readme_encrypted_cwt_old(self): nonce = token_bytes(13) mysecret = token_bytes(32) - enc_key = COSEKey.from_symmetric_key( - mysecret, alg="AES-CCM-16-64-256", kid="01" - ) + enc_key = COSEKey.from_symmetric_key(mysecret, alg="AES-CCM-16-64-256", kid="01") encoded = cwt.encode_and_encrypt( - Claims.from_json( - {"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"} - ), + Claims.from_json({"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}), enc_key, nonce=nonce, ) @@ -431,9 +380,7 @@ def test_sample_readme_encrypted_cwt_old(self): def test_sample_readme_encrypted_cwt(self): enc_key = COSEKey.from_symmetric_key(alg="ChaCha20/Poly1305", kid="01") - token = cwt.encode( - {"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}, enc_key - ) + token = cwt.encode({"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}, enc_key) decoded = cwt.decode(token, enc_key) assert 1 in decoded and decoded[1] == "coaps://as.example" @@ -441,16 +388,12 @@ def test_sample_readme_nested_cwt_old(self): with open(key_path("private_key_es256.pem")) as key_file: private_key = COSEKey.from_pem(key_file.read(), kid="01") encoded = cwt.encode_and_sign( - Claims.from_json( - {"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"} - ), + Claims.from_json({"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}), private_key, ) nonce = token_bytes(13) mysecret = token_bytes(32) - enc_key = COSEKey.from_symmetric_key( - mysecret, alg="AES-CCM-16-64-256", kid="02" - ) + enc_key = COSEKey.from_symmetric_key(mysecret, alg="AES-CCM-16-64-256", kid="02") nested = cwt.encode_and_encrypt(encoded, enc_key, nonce=nonce) with open(key_path("public_key_es256.pem")) as key_file: @@ -461,9 +404,7 @@ def test_sample_readme_nested_cwt_old(self): def test_sample_readme_nested_cwt(self): with open(key_path("private_key_es256.pem")) as key_file: private_key = COSEKey.from_pem(key_file.read(), kid="01") - token = cwt.encode( - {"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}, private_key - ) + token = cwt.encode({"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}, private_key) enc_key = COSEKey.from_symmetric_key(alg="ChaCha20/Poly1305", kid="02") nested = cwt.encode(token, enc_key) @@ -479,9 +420,7 @@ def test_sample_readme_nested_cwt_without_kid(self): private_key = COSEKey.from_pem(key_file.read()) ctx = CWT.new() ctx.cose.verify_kid = False - token = ctx.encode( - {"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}, private_key - ) + token = ctx.encode({"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}, private_key) nested = ctx.encode(token, enc_key) with open(key_path("public_key_es256.pem")) as key_file: @@ -823,9 +762,7 @@ def test_sample_rfc8392_a3_with_encoding(self): def test_sample_rfc8392_a4_old(self): key = COSEKey.new( { - -1: bytes.fromhex( - "403697de87af64611c1d32a05dab0fe1fcb715a86ab435f1ec99192d79569388" - ), + -1: bytes.fromhex("403697de87af64611c1d32a05dab0fe1fcb715a86ab435f1ec99192d79569388"), 1: 4, # Symmetric 2: bytes.fromhex("53796d6d6574726963323536"), 3: 4, # HMAC256/64 @@ -851,9 +788,7 @@ def test_sample_rfc8392_a4_old(self): def test_sample_rfc8392_a4(self): key = COSEKey.new( { - -1: bytes.fromhex( - "403697de87af64611c1d32a05dab0fe1fcb715a86ab435f1ec99192d79569388" - ), + -1: bytes.fromhex("403697de87af64611c1d32a05dab0fe1fcb715a86ab435f1ec99192d79569388"), 1: 4, # Symmetric 2: bytes.fromhex("53796d6d6574726963323536"), 3: 4, # HMAC256/64 diff --git a/tests/test_encrypted_cose_key.py b/tests/test_encrypted_cose_key.py index 53757fc..2e422ff 100644 --- a/tests/test_encrypted_cose_key.py +++ b/tests/test_encrypted_cose_key.py @@ -29,7 +29,4 @@ def test_encrypted_cose_key_from_cose_key_with_invalid_encryption_key(self): with pytest.raises(ValueError) as err: EncryptedCOSEKey.from_cose_key(pop_key, enc_key) pytest.fail("to_ should fail.") - assert ( - "Nonce generation is not supported for the key. Set a nonce explicitly." - in str(err.value) - ) + assert "Nonce generation is not supported for the key. Set a nonce explicitly." in str(err.value) diff --git a/tests/test_recipient.py b/tests/test_recipient.py index a6be1f7..facde71 100644 --- a/tests/test_recipient.py +++ b/tests/test_recipient.py @@ -117,9 +117,7 @@ def test_recipient_constructor_with_alg_a128kw(self): assert len(res) == 3 def test_recipient_constructor_with_alg_a128kw_with_iv(self): - r = RecipientInterface( - protected={1: -3}, unprotected={4: b"our-secret", 5: b"aabbccddee"} - ) + r = RecipientInterface(protected={1: -3}, unprotected={4: b"our-secret", 5: b"aabbccddee"}) assert isinstance(r, RecipientInterface) assert r.alg == -3 assert isinstance(r.protected, dict) @@ -190,9 +188,7 @@ def test_recipient_constructor_with_alg_a128kw_with_iv(self): ), ], ) - def test_recipient_constructor_with_invalid_args( - self, protected, unprotected, ciphertext, recipients, msg - ): + def test_recipient_constructor_with_invalid_args(self, protected, unprotected, ciphertext, recipients, msg): with pytest.raises(ValueError) as err: RecipientInterface(protected, unprotected, ciphertext, recipients) pytest.fail("RecipientInterface() should fail.") @@ -238,16 +234,12 @@ def test_recipient_from_jwk_with_str(self): assert recipient.alg == -6 def test_recipient_from_jwk_with_dict(self): - recipient = Recipient.from_jwk( - {"kty": "oct", "alg": "A128KW", "key_ops": ["wrapKey"]} - ) + recipient = Recipient.from_jwk({"kty": "oct", "alg": "A128KW", "key_ops": ["wrapKey"]}) assert isinstance(recipient, RecipientInterface) assert recipient.alg == -3 def test_recipient_from_jwk_with_dict_and_with_byte_formatted_kid(self): - recipient = Recipient.from_jwk( - {"kty": "oct", "kid": b"01", "alg": "A128KW", "key_ops": ["wrapKey"]} - ) + recipient = Recipient.from_jwk({"kty": "oct", "kid": b"01", "alg": "A128KW", "key_ops": ["wrapKey"]}) assert isinstance(recipient, RecipientInterface) assert recipient.alg == -3 assert recipient.kid == b"01" @@ -418,9 +410,7 @@ def test_recipients_constructor(self): assert isinstance(r, Recipients) def test_recipients_constructor_with_recipient_alg_direct(self): - key = COSEKey.from_symmetric_key( - "mysecret", alg="HMAC 256/64", kid="our-secret" - ) + key = COSEKey.from_symmetric_key("mysecret", alg="HMAC 256/64", kid="our-secret") r = Recipients([Recipient.new(unprotected={1: -6, 4: b"our-secret"})]) key = r.extract([key]) assert key.kty == 4 @@ -518,9 +508,7 @@ def test_recipients_extract_with_multiple_keys(self, material): assert key.kid == b"03" def test_recipients_extract_with_different_kid(self): - key = COSEKey.from_symmetric_key( - "mysecret", alg="HMAC 256/64", kid="our-secret" - ) + key = COSEKey.from_symmetric_key("mysecret", alg="HMAC 256/64", kid="our-secret") r = Recipients([RecipientInterface(unprotected={1: -6, 4: b"your-secret"})]) with pytest.raises(ValueError) as err: r.extract([key]) @@ -529,17 +517,13 @@ def test_recipients_extract_with_different_kid(self): def test_recipients_from_list(self): try: - Recipients.from_list( - [[cbor2.dumps({1: -10}), {-20: b"aabbccddeefff"}, b""]] - ) + Recipients.from_list([[cbor2.dumps({1: -10}), {-20: b"aabbccddeefff"}, b""]]) except Exception: pytest.fail("from_list() should not fail.") def test_recipients_from_list_with_empty_recipients(self): try: - Recipients.from_list( - [[cbor2.dumps({1: -10}), {-20: b"aabbccddeefff"}, b"", []]] - ) + Recipients.from_list([[cbor2.dumps({1: -10}), {-20: b"aabbccddeefff"}, b"", []]]) except Exception: pytest.fail("from_list() should not fail.") diff --git a/tests/test_recipient_algs_aes_key_wrap.py b/tests/test_recipient_algs_aes_key_wrap.py index 4227315..5ed5eb8 100644 --- a/tests/test_recipient_algs_aes_key_wrap.py +++ b/tests/test_recipient_algs_aes_key_wrap.py @@ -17,23 +17,17 @@ class TestAESKeyWrap: """ def test_aes_key_wrap_constructor_a128kw(self): - ctx = AESKeyWrap( - {1: -3}, {}, sender_key=COSEKey.from_symmetric_key(alg="A128KW") - ) + ctx = AESKeyWrap({1: -3}, {}, sender_key=COSEKey.from_symmetric_key(alg="A128KW")) assert isinstance(ctx, AESKeyWrap) assert ctx.alg == -3 def test_aes_key_wrap_constructor_a192kw(self): - ctx = AESKeyWrap( - {1: -4}, {}, sender_key=COSEKey.from_symmetric_key(alg="A192KW") - ) + ctx = AESKeyWrap({1: -4}, {}, sender_key=COSEKey.from_symmetric_key(alg="A192KW")) assert isinstance(ctx, AESKeyWrap) assert ctx.alg == -4 def test_aes_key_wrap_constructor_a256kw(self): - ctx = AESKeyWrap( - {1: -5}, {}, sender_key=COSEKey.from_symmetric_key(alg="A256KW") - ) + ctx = AESKeyWrap({1: -5}, {}, sender_key=COSEKey.from_symmetric_key(alg="A256KW")) assert isinstance(ctx, AESKeyWrap) assert ctx.alg == -5 @@ -112,18 +106,14 @@ def test_aes_key_wrap_constructor_with_invalid_alg(self): def test_aes_key_wrap_apply_with_invalid_key(self): key = COSEKey.from_symmetric_key(key="xxx", alg="HS256", kid="01") - ctx = AESKeyWrap( - {1: -3}, {}, sender_key=COSEKey.from_symmetric_key(alg="A128KW") - ) + ctx = AESKeyWrap({1: -3}, {}, sender_key=COSEKey.from_symmetric_key(alg="A128KW")) with pytest.raises(EncodeError) as err: ctx.apply(key, context={"alg": "A128GCM"}) pytest.fail("apply() should fail.") assert "Failed to wrap key." in str(err.value) def test_aes_key_wrap_apply_without_key(self): - ctx = AESKeyWrap( - {1: -3}, {}, sender_key=COSEKey.from_symmetric_key(alg="A128KW") - ) + ctx = AESKeyWrap({1: -3}, {}, sender_key=COSEKey.from_symmetric_key(alg="A128KW")) with pytest.raises(ValueError) as err: ctx.apply() pytest.fail("apply() should fail.") @@ -131,9 +121,7 @@ def test_aes_key_wrap_apply_without_key(self): def test_aes_key_wrap_wrap_key_without_alg(self): key = COSEKey.from_symmetric_key(alg="A128GCM", kid="01") - ctx = AESKeyWrap( - {1: -3}, {}, sender_key=COSEKey.from_symmetric_key(alg="A128KW") - ) + ctx = AESKeyWrap({1: -3}, {}, sender_key=COSEKey.from_symmetric_key(alg="A128KW")) with pytest.raises(ValueError) as err: ctx.extract(key=key) pytest.fail("extract() should fail.") @@ -141,9 +129,7 @@ def test_aes_key_wrap_wrap_key_without_alg(self): def test_aes_key_wrap_wrap_key_without_ciphertext(self): key = COSEKey.from_symmetric_key(alg="A128GCM", kid="01") - ctx = AESKeyWrap( - {1: -3}, {}, sender_key=COSEKey.from_symmetric_key(alg="A128KW") - ) + ctx = AESKeyWrap({1: -3}, {}, sender_key=COSEKey.from_symmetric_key(alg="A128KW")) with pytest.raises(DecodeError) as err: ctx.extract(key=key, alg="A128GCM") pytest.fail("extract() should fail.") diff --git a/tests/test_recipient_algs_direct.py b/tests/test_recipient_algs_direct.py index 411eede..f4716bb 100644 --- a/tests/test_recipient_algs_direct.py +++ b/tests/test_recipient_algs_direct.py @@ -130,9 +130,7 @@ def test_direct_hkdf_constructor_with_hkdf_sha_512(self): ), ], ) - def test_direct_hkdf_constructor_with_invalid_arg( - self, protected, unprotected, msg - ): + def test_direct_hkdf_constructor_with_invalid_arg(self, protected, unprotected, msg): with pytest.raises(ValueError) as err: DirectHKDF(protected, unprotected) pytest.fail("DirectHKDF() should fail.") diff --git a/tests/test_recipient_algs_ecdh_aes_key_wrap.py b/tests/test_recipient_algs_ecdh_aes_key_wrap.py index 4bc2fdc..252aaf2 100644 --- a/tests/test_recipient_algs_ecdh_aes_key_wrap.py +++ b/tests/test_recipient_algs_ecdh_aes_key_wrap.py @@ -96,31 +96,21 @@ def test_ecdh_aes_key_wrap_encode_and_extract_with_ecdh_es( ): enc_key = COSEKey.from_symmetric_key(alg="ChaCha20/Poly1305") sender = ECDH_AESKeyWrap({1: -29}, {4: b"01"}, sender_key=sender_key_es) - sender.apply( - enc_key, recipient_key=recipient_public_key, context={"alg": "A128GCM"} - ) + sender.apply(enc_key, recipient_key=recipient_public_key, context={"alg": "A128GCM"}) assert sender.ciphertext is not None encoded = sender.to_list() recipient = Recipient.from_list(encoded) - decoded_key = recipient.extract( - recipient_private_key, alg="ChaCha20/Poly1305", context={"alg": "A128GCM"} - ) + decoded_key = recipient.extract(recipient_private_key, alg="ChaCha20/Poly1305", context={"alg": "A128GCM"}) assert enc_key.key == decoded_key.key - def test_ecdh_aes_key_wrap_through_cose_api( - self, recipient_public_key, recipient_private_key - ): + def test_ecdh_aes_key_wrap_through_cose_api(self, recipient_public_key, recipient_private_key): enc_key = COSEKey.from_symmetric_key(alg="ChaCha20/Poly1305") rec = Recipient.from_jwk({"kty": "EC", "crv": "P-256", "alg": "ECDH-ES+A128KW"}) - rec.apply( - enc_key, recipient_key=recipient_public_key, context={"alg": "A128GCM"} - ) + rec.apply(enc_key, recipient_key=recipient_public_key, context={"alg": "A128GCM"}) ctx = COSE.new(alg_auto_inclusion=True) encoded = ctx.encode_and_encrypt(b"Hello world!", enc_key, recipients=[rec]) - assert b"Hello world!" == ctx.decode( - encoded, recipient_private_key, context={"alg": "A128GCM"} - ) + assert b"Hello world!" == ctx.decode(encoded, recipient_private_key, context={"alg": "A128GCM"}) def test_ecdh_aes_key_wrap_through_cose_api_without_kid(self): enc_key = COSEKey.from_symmetric_key(alg="ChaCha20/Poly1305") @@ -148,9 +138,7 @@ def test_ecdh_aes_key_wrap_through_cose_api_without_kid(self): } ) encoded = ctx.encode_and_encrypt(b"Hello world!", enc_key, recipients=[rec]) - assert b"Hello world!" == ctx.decode( - encoded, priv_key, context={"alg": "A128GCM"} - ) + assert b"Hello world!" == ctx.decode(encoded, priv_key, context={"alg": "A128GCM"}) def test_ecdh_aes_key_wrap_apply_without_key(self, sender_key_es): sender = ECDH_AESKeyWrap({1: -29}, {4: b"01"}, sender_key=sender_key_es) @@ -163,9 +151,7 @@ def test_ecdh_aes_key_wrap_apply_without_sender_key(self, recipient_public_key): enc_key = COSEKey.from_symmetric_key(alg="ChaCha20/Poly1305") sender = ECDH_AESKeyWrap({1: -29}, {4: b"01"}) with pytest.raises(ValueError) as err: - sender.apply( - enc_key, recipient_key=recipient_public_key, context={"alg": "A128GCM"} - ) + sender.apply(enc_key, recipient_key=recipient_public_key, context={"alg": "A128GCM"}) pytest.fail("apply() should fail.") assert "sender_key should be set in advance." in str(err.value) @@ -185,31 +171,19 @@ def test_ecdh_aes_key_wrap_apply_without_context(self, sender_key_es): pytest.fail("apply() should fail.") assert "context should be set." in str(err.value) - def test_ecdh_aes_key_wrap_apply_with_invalid_recipient_key( - self, sender_key_es, recipient_private_key - ): + def test_ecdh_aes_key_wrap_apply_with_invalid_recipient_key(self, sender_key_es, recipient_private_key): enc_key = COSEKey.from_symmetric_key(alg="ChaCha20/Poly1305") - rec = Recipient.new( - protected={"alg": "ECDH-ES+A128KW"}, sender_key=sender_key_es - ) + rec = Recipient.new(protected={"alg": "ECDH-ES+A128KW"}, sender_key=sender_key_es) with pytest.raises(ValueError) as err: - rec.apply( - enc_key, recipient_key=recipient_private_key, context={"alg": "A128GCM"} - ) + rec.apply(enc_key, recipient_key=recipient_private_key, context={"alg": "A128GCM"}) pytest.fail("apply() should fail.") assert "public_key should be elliptic curve public key." in str(err.value) - def test_ecdh_aes_key_wrap_apply_with_invalid_key_to_wrap( - self, sender_key_es, recipient_public_key - ): + def test_ecdh_aes_key_wrap_apply_with_invalid_key_to_wrap(self, sender_key_es, recipient_public_key): mac_key = COSEKey.from_symmetric_key(key="xxx", alg="HS256") - rec = Recipient.new( - protected={"alg": "ECDH-ES+A128KW"}, sender_key=sender_key_es - ) + rec = Recipient.new(protected={"alg": "ECDH-ES+A128KW"}, sender_key=sender_key_es) with pytest.raises(EncodeError) as err: - rec.apply( - mac_key, recipient_key=recipient_public_key, context={"alg": "A128GCM"} - ) + rec.apply(mac_key, recipient_key=recipient_public_key, context={"alg": "A128GCM"}) pytest.fail("apply() should fail.") assert "Failed to wrap key." in str(err.value) @@ -229,14 +203,10 @@ def test_ecdh_aes_key_wrap_extract_without_context(self): pytest.fail("extract() should fail.") assert "context should be set." in str(err.value) - def test_ecdh_aes_key_wrap_extract_with_invalid_recipient_private_key( - self, recipient_public_key - ): + def test_ecdh_aes_key_wrap_extract_with_invalid_recipient_private_key(self, recipient_public_key): enc_key = COSEKey.from_symmetric_key(alg="ChaCha20/Poly1305") rec = Recipient.from_jwk({"kty": "EC", "crv": "P-256", "alg": "ECDH-ES+A128KW"}) - rec.apply( - enc_key, recipient_key=recipient_public_key, context={"alg": "A128GCM"} - ) + rec.apply(enc_key, recipient_key=recipient_public_key, context={"alg": "A128GCM"}) ctx = COSE.new(alg_auto_inclusion=True) recipient_private_key = COSEKey.from_jwk( { diff --git a/tests/test_recipient_algs_ecdh_direct_hkdf.py b/tests/test_recipient_algs_ecdh_direct_hkdf.py index bf3af50..7f5b591 100644 --- a/tests/test_recipient_algs_ecdh_direct_hkdf.py +++ b/tests/test_recipient_algs_ecdh_direct_hkdf.py @@ -100,19 +100,13 @@ def test_ecdh_direct_hkdf_apply_with_ecdh_ss_p256(self): ) with open(key_path("public_key_es256.pem")) as key_file: pub_key = COSEKey.from_pem(key_file.read(), kid="01") - enc_key = rec.apply( - recipient_key=pub_key, salt=token_bytes(32), context={"alg": "A128GCM"} - ) + enc_key = rec.apply(recipient_key=pub_key, salt=token_bytes(32), context={"alg": "A128GCM"}) ctx = COSE.new(alg_auto_inclusion=True) encoded = ctx.encode_and_encrypt(b"Hello world!", enc_key, recipients=[rec]) with open(key_path("private_key_es256.pem")) as key_file: - priv_key = COSEKey.from_pem( - key_file.read(), kid="01", alg="ECDH-SS+HKDF-256" - ) - assert b"Hello world!" == ctx.decode( - encoded, priv_key, context={"alg": "A128GCM"} - ) + priv_key = COSEKey.from_pem(key_file.read(), kid="01", alg="ECDH-SS+HKDF-256") + assert b"Hello world!" == ctx.decode(encoded, priv_key, context={"alg": "A128GCM"}) def test_ecdh_direct_hkdf_apply_with_ecdh_ss_p521(self): rec = Recipient.from_jwk( @@ -128,24 +122,16 @@ def test_ecdh_direct_hkdf_apply_with_ecdh_ss_p521(self): ) with open(key_path("public_key_es512.pem")) as key_file: pub_key = COSEKey.from_pem(key_file.read(), kid="01") - enc_key = rec.apply( - recipient_key=pub_key, salt=token_bytes(64), context={"alg": "A128GCM"} - ) + enc_key = rec.apply(recipient_key=pub_key, salt=token_bytes(64), context={"alg": "A128GCM"}) ctx = COSE.new(alg_auto_inclusion=True) encoded = ctx.encode_and_encrypt(b"Hello world!", enc_key, recipients=[rec]) with open(key_path("private_key_es512.pem")) as key_file: - priv_key = COSEKey.from_pem( - key_file.read(), kid="01", alg="ECDH-SS+HKDF-512" - ) - assert b"Hello world!" == ctx.decode( - encoded, priv_key, context={"alg": "A128GCM"} - ) + priv_key = COSEKey.from_pem(key_file.read(), kid="01", alg="ECDH-SS+HKDF-512") + assert b"Hello world!" == ctx.decode(encoded, priv_key, context={"alg": "A128GCM"}) def test_ecdh_direct_hkdf_apply_with_raw_context(self): - rec = Recipient.from_jwk( - {"kty": "EC", "crv": "P-256", "alg": "ECDH-ES+HKDF-256"} - ) + rec = Recipient.from_jwk({"kty": "EC", "crv": "P-256", "alg": "ECDH-ES+HKDF-256"}) with open(key_path("public_key_es256.pem")) as key_file: pub_key = COSEKey.from_pem(key_file.read(), kid="01") enc_key = rec.apply( @@ -161,12 +147,8 @@ def test_ecdh_direct_hkdf_apply_with_raw_context(self): encoded = ctx.encode_and_encrypt(b"Hello world!", enc_key, recipients=[rec]) with open(key_path("private_key_es256.pem")) as key_file: - priv_key = COSEKey.from_pem( - key_file.read(), kid="01", alg="ECDH-ES+HKDF-256" - ) - assert b"Hello world!" == ctx.decode( - encoded, priv_key, context={"alg": "A128GCM"} - ) + priv_key = COSEKey.from_pem(key_file.read(), kid="01", alg="ECDH-ES+HKDF-256") + assert b"Hello world!" == ctx.decode(encoded, priv_key, context={"alg": "A128GCM"}) def test_ecdh_direct_hkdf_apply_with_ecdh_ss_p521_without_salt(self): rec = Recipient.from_jwk( @@ -187,12 +169,8 @@ def test_ecdh_direct_hkdf_apply_with_ecdh_ss_p521_without_salt(self): encoded = ctx.encode_and_encrypt(b"Hello world!", enc_key, recipients=[rec]) with open(key_path("private_key_es512.pem")) as key_file: - priv_key = COSEKey.from_pem( - key_file.read(), kid="01", alg="ECDH-SS+HKDF-512" - ) - assert b"Hello world!" == ctx.decode( - encoded, priv_key, context={"alg": "A128GCM"} - ) + priv_key = COSEKey.from_pem(key_file.read(), kid="01", alg="ECDH-SS+HKDF-512") + assert b"Hello world!" == ctx.decode(encoded, priv_key, context={"alg": "A128GCM"}) def test_ecdh_direct_hkdf_apply_with_ecdh_ss_p521_with_default_salt(self): rec = Recipient.from_jwk( @@ -214,17 +192,11 @@ def test_ecdh_direct_hkdf_apply_with_ecdh_ss_p521_with_default_salt(self): encoded = ctx.encode_and_encrypt(b"Hello world!", enc_key, recipients=[rec]) with open(key_path("private_key_es512.pem")) as key_file: - priv_key = COSEKey.from_pem( - key_file.read(), kid="01", alg="ECDH-SS+HKDF-512" - ) - assert b"Hello world!" == ctx.decode( - encoded, priv_key, context={"alg": "A128GCM"} - ) + priv_key = COSEKey.from_pem(key_file.read(), kid="01", alg="ECDH-SS+HKDF-512") + assert b"Hello world!" == ctx.decode(encoded, priv_key, context={"alg": "A128GCM"}) def test_ecdh_direct_hkdf_apply_without_salt(self): - rec = Recipient.from_jwk( - {"kty": "EC", "crv": "P-256", "alg": "ECDH-ES+HKDF-256"} - ) + rec = Recipient.from_jwk({"kty": "EC", "crv": "P-256", "alg": "ECDH-ES+HKDF-256"}) with open(key_path("public_key_es256.pem")) as key_file: pub_key = COSEKey.from_pem(key_file.read(), kid="01") enc_key = rec.apply( @@ -240,18 +212,12 @@ def test_ecdh_direct_hkdf_apply_without_salt(self): encoded = ctx.encode_and_encrypt(b"Hello world!", enc_key, recipients=[rec]) with open(key_path("private_key_es256.pem")) as key_file: - priv_key = COSEKey.from_pem( - key_file.read(), kid="01", alg="ECDH-ES+HKDF-256" - ) - assert b"Hello world!" == ctx.decode( - encoded, priv_key, context={"alg": "A128GCM"} - ) + priv_key = COSEKey.from_pem(key_file.read(), kid="01", alg="ECDH-ES+HKDF-256") + assert b"Hello world!" == ctx.decode(encoded, priv_key, context={"alg": "A128GCM"}) def test_ecdh_direct_hkdf_apply_with_party_u_nonce(self): nonce = token_bytes(32) - rec = Recipient.from_jwk( - {"kty": "EC", "crv": "P-256", "alg": "ECDH-ES+HKDF-256"} - ) + rec = Recipient.from_jwk({"kty": "EC", "crv": "P-256", "alg": "ECDH-ES+HKDF-256"}) with open(key_path("public_key_es256.pem")) as key_file: pub_key = COSEKey.from_pem(key_file.read(), kid="01") context = [ @@ -266,16 +232,12 @@ def test_ecdh_direct_hkdf_apply_with_party_u_nonce(self): encoded = ctx.encode_and_encrypt(b"Hello world!", enc_key, recipients=[rec]) with open(key_path("private_key_es256.pem")) as key_file: - priv_key = COSEKey.from_pem( - key_file.read(), kid="01", alg="ECDH-ES+HKDF-256" - ) + priv_key = COSEKey.from_pem(key_file.read(), kid="01", alg="ECDH-ES+HKDF-256") assert b"Hello world!" == ctx.decode(encoded, priv_key, context=context) def test_ecdh_direct_hkdf_apply_with_party_v_nonce(self): nonce = token_bytes(32) - rec = Recipient.from_jwk( - {"kty": "EC", "crv": "P-256", "alg": "ECDH-ES+HKDF-256"} - ) + rec = Recipient.from_jwk({"kty": "EC", "crv": "P-256", "alg": "ECDH-ES+HKDF-256"}) with open(key_path("public_key_es256.pem")) as key_file: pub_key = COSEKey.from_pem(key_file.read(), kid="01") enc_key = rec.apply( @@ -292,9 +254,7 @@ def test_ecdh_direct_hkdf_apply_with_party_v_nonce(self): def test_ecdh_direct_hkdf_apply_with_supp_pub_other(self): nonce = token_bytes(32) - rec = Recipient.from_jwk( - {"kty": "EC", "crv": "P-256", "alg": "ECDH-ES+HKDF-256"} - ) + rec = Recipient.from_jwk({"kty": "EC", "crv": "P-256", "alg": "ECDH-ES+HKDF-256"}) with open(key_path("public_key_es256.pem")) as key_file: pub_key = COSEKey.from_pem(key_file.read(), kid="01") enc_key = rec.apply( @@ -330,30 +290,20 @@ def test_ecdh_direct_hkdf_apply_without_context(self, sender_key_es): pytest.fail("apply() should fail.") assert "context should be set." in str(err.value) - def test_ecdh_direct_hkdf_apply_with_invalid_recipient_key( - self, sender_key_es, recipient_private_key - ): - rec = Recipient.new( - protected={"alg": "ECDH-ES+HKDF-256"}, sender_key=sender_key_es - ) + def test_ecdh_direct_hkdf_apply_with_invalid_recipient_key(self, sender_key_es, recipient_private_key): + rec = Recipient.new(protected={"alg": "ECDH-ES+HKDF-256"}, sender_key=sender_key_es) with pytest.raises(ValueError) as err: rec.apply(recipient_key=recipient_private_key, context={"alg": "A128GCM"}) pytest.fail("apply() should fail.") assert "public_key should be elliptic curve public key." in str(err.value) - def test_ecdh_direct_hkdf_apply_and_extract_with_ecdh_es( - self, sender_key_es, recipient_public_key, recipient_private_key - ): + def test_ecdh_direct_hkdf_apply_and_extract_with_ecdh_es(self, sender_key_es, recipient_public_key, recipient_private_key): sender = ECDH_DirectHKDF({1: -25}, {4: b"01"}, sender_key=sender_key_es) - enc_key = sender.apply( - recipient_key=recipient_public_key, context={"alg": "A128GCM"} - ) + enc_key = sender.apply(recipient_key=recipient_public_key, context={"alg": "A128GCM"}) encoded = sender.to_list() recipient = Recipient.from_list(encoded) - decoded_key = recipient.extract( - recipient_private_key, context={"alg": "A128GCM"} - ) + decoded_key = recipient.extract(recipient_private_key, context={"alg": "A128GCM"}) assert enc_key.key == decoded_key.key @pytest.mark.parametrize( @@ -363,9 +313,7 @@ def test_ecdh_direct_hkdf_apply_and_extract_with_ecdh_es( (-25, 1, "private_key_es256.pem", "public_key_es256.pem"), ], ) - def test_ecdh_direct_hkdf_through_cose_api_with_ecdh_es( - self, alg, crv, private_key_path, public_key_path - ): + def test_ecdh_direct_hkdf_through_cose_api_with_ecdh_es(self, alg, crv, private_key_path, public_key_path): sender_key = COSEKey.new({1: 2, -1: crv, 3: alg}) rec = Recipient.new(protected={1: alg}, sender_key=sender_key) with open(key_path(public_key_path)) as key_file: @@ -376,29 +324,17 @@ def test_ecdh_direct_hkdf_through_cose_api_with_ecdh_es( with open(key_path(private_key_path)) as key_file: priv_key = COSEKey.from_pem(key_file.read(), kid="01", alg=alg) - assert b"Hello world!" == ctx.decode( - encoded, priv_key, context={"alg": "A128GCM"} - ) + assert b"Hello world!" == ctx.decode(encoded, priv_key, context={"alg": "A128GCM"}) - def test_ecdh_direct_hkdf_through_cose_api( - self, recipient_public_key, recipient_private_key - ): - rec = Recipient.from_jwk( - {"kty": "EC", "crv": "P-256", "alg": "ECDH-ES+HKDF-256"} - ) - enc_key = rec.apply( - recipient_key=recipient_public_key, context={"alg": "A128GCM"} - ) + def test_ecdh_direct_hkdf_through_cose_api(self, recipient_public_key, recipient_private_key): + rec = Recipient.from_jwk({"kty": "EC", "crv": "P-256", "alg": "ECDH-ES+HKDF-256"}) + enc_key = rec.apply(recipient_key=recipient_public_key, context={"alg": "A128GCM"}) ctx = COSE.new(alg_auto_inclusion=True) encoded = ctx.encode_and_encrypt(b"Hello world!", enc_key, recipients=[rec]) - assert b"Hello world!" == ctx.decode( - encoded, recipient_private_key, context={"alg": "A128GCM"} - ) + assert b"Hello world!" == ctx.decode(encoded, recipient_private_key, context={"alg": "A128GCM"}) def test_ecdh_direct_hkdf_through_cose_api_without_kid(self): - rec = Recipient.from_jwk( - {"kty": "EC", "crv": "P-256", "alg": "ECDH-ES+HKDF-256"} - ) + rec = Recipient.from_jwk({"kty": "EC", "crv": "P-256", "alg": "ECDH-ES+HKDF-256"}) pub_key = COSEKey.from_jwk( { "kty": "EC", @@ -422,19 +358,11 @@ def test_ecdh_direct_hkdf_through_cose_api_without_kid(self): } ) encoded = ctx.encode_and_encrypt(b"Hello world!", enc_key, recipients=[rec]) - assert b"Hello world!" == ctx.decode( - encoded, priv_key, context={"alg": "A128GCM"} - ) + assert b"Hello world!" == ctx.decode(encoded, priv_key, context={"alg": "A128GCM"}) - def test_ecdh_direct_hkdf_extract_with_invalid_private_key( - self, recipient_public_key - ): - rec = Recipient.from_jwk( - {"kty": "EC", "crv": "P-256", "alg": "ECDH-ES+HKDF-256"} - ) - enc_key = rec.apply( - recipient_key=recipient_public_key, context={"alg": "A128GCM"} - ) + def test_ecdh_direct_hkdf_extract_with_invalid_private_key(self, recipient_public_key): + rec = Recipient.from_jwk({"kty": "EC", "crv": "P-256", "alg": "ECDH-ES+HKDF-256"}) + enc_key = rec.apply(recipient_key=recipient_public_key, context={"alg": "A128GCM"}) ctx = COSE.new(alg_auto_inclusion=True) encoded = ctx.encode_and_encrypt(b"Hello world!", enc_key, recipients=[rec]) another_priv_key = COSEKey.from_jwk( diff --git a/tests/test_signer.py b/tests/test_signer.py index 0d66970..d39a8af 100644 --- a/tests/test_signer.py +++ b/tests/test_signer.py @@ -178,9 +178,7 @@ def test_signer_from_jwt_with_key_ops(self): def test_signer_from_jwk_with_invalid_alg(self): with pytest.raises(ValueError) as err: - Signer.from_jwk( - {"kty": "oct", "alg": "HS256", "kid": "01", "k": "xxxxxxxxxx"} - ) + Signer.from_jwk({"kty": "oct", "alg": "HS256", "kid": "01", "k": "xxxxxxxxxx"}) assert "Unsupported or unknown alg for signature: 5." in str(err.value) def test_signer_from_jwt_with_invalid_key_ops(self): diff --git a/tox.ini b/tox.ini index 7a3ecfa..6782487 100644 --- a/tox.ini +++ b/tox.ini @@ -1,76 +1,65 @@ -[pytest] -addopts = -ra -testpaths = tests -filterwarnings = - once::Warning - ignore:::pympler[.*] +[tox] +envlist = + lint + typing + py{36,37,38,39,310} + docs + pypi + coverage +isolated_build = True [gh-actions] python = 3.6: py36 3.7: py37 - 3.8: py38, docs, typing + 3.8: py38, lint, typing, docs, pypi 3.9: py39 3.10: py310 -[tox] -envlist = - lint - typing - py{36,37,38,39,310} - docs - pypi-description - coverage-report -isolated_build = True - - [testenv] -# Prevent random setuptools/pip breakages like -# https://github.com/pypa/setuptools/issues/1042 from breaking our builds. -setenv = - VIRTUALENV_NO_DOWNLOAD=1 -extras = - tests -commands = {envpython} -b -m coverage run -m pytest {posargs} +whitelist_externals = poetry +commands = + poetry install + poetry run coverage run -m pytest -ra tests -[testenv:docs] -basepython = python3.8 -extras = docs +[testenv:lint] +whitelist_externals = poetry +skip_install = true commands = - sphinx-build -n -T -W -b html -d {envtmpdir}/doctrees docs docs/_build/html + poetry install + poetry run pre-commit run --all-files [testenv:typing] -basepython = python3.8 -extras = dev -commands = mypy cwt +whitelist_externals = poetry +skip_install = true +commands = + poetry install + poetry run mypy cwt -[testenv:lint] -basepython = python3.8 -extras = dev -passenv = HOMEPATH # needed on Windows -commands = pre-commit run --all-files +[testenv:docs] +whitelist_externals = poetry +skip_install = true +commands = + poetry install -E docs + sphinx-build -n -T -W -b html -d {envtmpdir}/doctrees docs docs/_build/html -[testenv:pypi-description] -basepython = python3.8 +[testenv:pypi] +whitelist_externals = poetry skip_install = true -deps = - twine - pip >= 18.0.0 commands = - pip wheel -w {envtmpdir}/build --no-deps . - twine check {envtmpdir}/build/* + poetry build -[testenv:coverage-report] -basepython = python3.8 +[testenv:coverage] +whitelist_externals = poetry skip_install = true -deps = coverage[toml]==5.0.4 commands = - coverage combine - coverage report + poetry install + poetry run coverage combine + poetry run coverage report