Skip to content

Commit

Permalink
Merge pull request #39 from dajiaji/add-docstring
Browse files Browse the repository at this point in the history
Add docstring
  • Loading branch information
dajiaji authored Apr 29, 2021
2 parents 6a4ecfc + 8b10ecc commit 4029174
Show file tree
Hide file tree
Showing 8 changed files with 259 additions and 37 deletions.
4 changes: 2 additions & 2 deletions cwt/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .claims import Claims, claims
from .claims_builder import ClaimsBuilder, claims
from .cose import COSE
from .cose_key import COSEKey
from .cwt import CWT, decode, encode_and_encrypt, encode_and_mac, encode_and_sign
Expand All @@ -23,7 +23,7 @@
"decode",
"CWT",
"COSE",
"Claims",
"ClaimsBuilder",
"COSEKey",
"KeyBuilder",
"cose_key",
Expand Down
14 changes: 10 additions & 4 deletions cwt/claims.py → cwt/claims_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,19 @@
from .const import CWT_CLAIM_NAMES


class Claims:
class ClaimsBuilder:
"""
CBOR Web Token (CWT) Claims Generator.
CBOR Web Token (CWT) Claims Builder.
``cwt.claims`` is a global object of this class initialized with default settings.
"""

def __init__(self, options: Optional[Dict[str, Any]] = None):
""""""
"""
Constructor.
At the current implementation, any ``options`` will be ignored.
"""
self._options = options
return

Expand Down Expand Up @@ -51,4 +57,4 @@ def from_json(self, claims: Union[str, bytes, Dict[str, Any]]) -> Dict[int, Any]


# export
claims = Claims()
claims = ClaimsBuilder()
91 changes: 88 additions & 3 deletions cwt/cose.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,18 @@

class COSE(CBORProcessor):
"""
A COSE (CBOR Object Signing and Encryption) Implementaion.
A COSE (CBOR Object Signing and Encryption) Implementaion built on top of
`cbor2 <https://cbor2.readthedocs.io/en/stable/>`_.
``cwt.cose_key`` is a global object of this class initialized with default settings.
"""

def __init__(self, options: Optional[Dict[str, Any]] = None):
"""
Constructor.
At the current implementation, any ``options`` will be ignored.
"""
self._options = options
self._recipients_builder = RecipientsBuilder()

Expand All @@ -23,8 +31,29 @@ def encode_and_mac(
payload: Union[Dict[int, Any], bytes],
key: COSEKey,
recipients: Optional[List[Recipient]] = None,
out: Optional[str] = None,
out: str = "",
) -> Union[bytes, CBORTag]:
"""
Encode data and add MAC to it.
Args:
protected (Dict[int, Any]): Parameters that are to be cryptographically
protected.
unprotected (Dict[int, Any]): Parameters that are not cryptographically
protected.
payload (Union[Dict[int, Any], bytes]): A content to be MACed.
key (COSEKey): A COSE key as a MAC Authentication key.
recipients (Optional[List[Recipient]]): A list of recipient information structures.
out(str): An output format. Only ``"cbor2/CBORTag"`` can be used. If ``"cbor2/CBORTag"``
is specified. This function will return encoded data as
`cbor2 <https://cbor2.readthedocs.io/en/stable/>`_'s ``CBORTag`` object.
If any other value is specified, it will return encoded data as bytes.
Returns:
Union[bytes, CBORTag]: A byte string of the encoded COSE or a cbor2.CBORTag object.
Raises:
ValueError: Invalid arguments.
EncodeError: Failed to encode data.
"""

ctx = "MAC0" if not recipients else "MAC"

Expand Down Expand Up @@ -59,8 +88,28 @@ def encode_and_sign(
unprotected: Dict[int, Any],
payload: Union[Dict[int, Any], bytes],
key: Union[COSEKey, List[COSEKey]],
out: Optional[str] = None,
out: str = "",
) -> Union[bytes, CBORTag]:
"""
Encode data and sign it.
Args:
protected (Dict[int, Any]): Parameters that are to be cryptographically
protected.
unprotected (Dict[int, Any]): Parameters that are not cryptographically
protected.
payload (Union[Dict[int, Any], bytes]): A content to be signed.
key (Union[COSEKey, List[COSEKey]]): One or more COSE keys as signing keys.
out(str): An output format. Only ``"cbor2/CBORTag"`` can be used. If ``"cbor2/CBORTag"``
is specified. This function will return encoded data as
`cbor2 <https://cbor2.readthedocs.io/en/stable/>`_'s ``CBORTag`` object.
If any other value is specified, it will return encoded data as bytes.
Returns:
Union[bytes, CBORTag]: A byte string of the encoded COSE or a cbor2.CBORTag object.
Raises:
ValueError: Invalid arguments.
EncodeError: Failed to encode data.
"""

ctx = "Signature" if not isinstance(key, COSEKey) else "Signature1"
if isinstance(key, COSEKey):
Expand Down Expand Up @@ -99,6 +148,28 @@ def encode_and_encrypt(
recipients: Optional[List[Recipient]] = None,
out: str = "",
) -> bytes:
"""
Encode data and encrypt it.
Args:
protected (Dict[int, Any]): Parameters that are to be cryptographically
protected.
unprotected (Dict[int, Any]): Parameters that are not cryptographically
protected.
payload (Union[Dict[int, Any], bytes]): A content to be encrypted.
key (COSEKey): A COSE key as an encryption key.
nonce (bytes): A nonce for encryption.
recipients (Optional[List[Recipient]]): A list of recipient information structures.
out(str): An output format. Only ``"cbor2/CBORTag"`` can be used. If ``"cbor2/CBORTag"``
is specified. This function will return encoded data as
`cbor2 <https://cbor2.readthedocs.io/en/stable/>`_'s ``CBORTag`` object.
If any other value is specified, it will return encoded data as bytes.
Returns:
Union[bytes, CBORTag]: A byte string of the encoded COSE or a cbor2.CBORTag object.
Raises:
ValueError: Invalid arguments.
EncodeError: Failed to encode data.
"""

ctx = "Encrypt0" if not recipients else "Encrypt"

Expand Down Expand Up @@ -127,6 +198,20 @@ def encode_and_encrypt(
return res if out == "cbor2/CBORTag" else self._dumps(res)

def decode(self, data: Union[bytes, CBORTag], key: COSEKey) -> Dict[int, Any]:
"""
Verify and decode COSE data.
Args:
data (Union[bytes, CBORTag]): A byte string or cbor2.CBORTag of an
encoded data.
key (COSEKey): A COSE key to verify and decrypt the encoded data.
Returns:
Dict[int, Any]: A decoded CBOR-like object.
Raises:
ValueError: Invalid arguments.
DecodeError: Failed to decode data.
VerifyError: Failed to verify data.
"""

if isinstance(data, bytes):
data = self._loads(data)
Expand Down
64 changes: 63 additions & 1 deletion cwt/cose_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,16 @@

class COSEKey:
"""
The interface class for a COSE Key used for mac, signing/verifying and encryption/decryption.
The interface class for a COSE Key used for MAC, signing/verifying and encryption/decryption.
"""

def __init__(self, cose_key: Dict[int, Any]):
"""
Constructor.
Args:
cose_key (Dict[int, Any]): A COSE key formatted to a CBOR-like dictionary.
"""
# Validate COSE Key common parameters.
if 1 not in cose_key:
raise ValueError("kty(1) not found.")
Expand Down Expand Up @@ -36,46 +42,102 @@ def __init__(self, cose_key: Dict[int, Any]):

@property
def kty(self) -> int:
"""
Identification of the key type.
"""
return self._kty

@property
def kid(self) -> bytes:
"""
A key identification value.
"""
return self._object.get(2, None)

@property
def alg(self) -> int:
"""
An algorithm that is used with the key.
"""
return self._object.get(3, None)

@property
def key_ops(self) -> list:
"""
Restrict set of permissible operations.
"""
return self._object.get(4, None)

@property
def base_iv(self) -> bytes:
"""
Base IV to be xor-ed with Partial IVs.
"""
return self._object.get(5, None)

def sign(self, msg: bytes) -> bytes:
"""
Returns a digital signature for the specified message
using the specified key value.
Args:
msg (bytes): A message to be signed.
Returns:
bytes: A byte string of the encoded CWT.
Raises:
NotImplementedError: Not implemented.
ValueError: Invalid arguments.
EncodeError: Failed to sign the message.
"""
raise NotImplementedError

def verify(self, msg: bytes, sig: bytes):
"""
Verifies that the specified digital signature is valid
for the specified message.
Args:
msg (bytes): A message to be verified.
sig (bytes): A digital signature of the message.
Returns:
bytes: A byte string of the encoded CWT.
Raises:
NotImplementedError: Not implemented.
ValueError: Invalid arguments.
VerifyError: Failed to verify.
"""
raise NotImplementedError

def encrypt(self, msg: bytes, nonce: bytes, aad: bytes) -> bytes:
"""
Encrypts the specified message.
Args:
msg (bytes): A message to be encrypted.
nonce (bytes): A nonce for encryption.
aad (bytes): Additional authenticated data.
Returns:
bytes: A byte string of encrypted data.
Raises:
NotImplementedError: Not implemented.
ValueError: Invalid arguments.
EncodeError: Failed to encrypt the message.
"""
raise NotImplementedError

def decrypt(self, msg: bytes, nonce: bytes, aad: bytes) -> bytes:
"""
Decrypts the specified message.
Args:
msg (bytes): An encrypted message.
nonce (bytes): A nonce for encryption.
aad (bytes): Additional authenticated data.
Returns:
bytes: A byte string of the decrypted data.
Raises:
NotImplementedError: Not implemented.
ValueError: Invalid arguments.
DecodeError: Failed to decrypt the message.
"""
raise NotImplementedError
36 changes: 28 additions & 8 deletions cwt/cwt.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,34 @@
class CWT(CBORProcessor):
"""
A CWT (CBOR Web Token) Implementaion, which is built on top of
a COSE (CBOR Object Signing and Encryption) implementation.
:class:`COSE <cwt.COSE>`
``cwt.cwt`` is a global object of this class initialized with default settings.
"""

CBOR_TAG = 61

def __init__(self, options: Optional[Dict[str, Any]] = None):
"""
Constructor.
Args:
options (Optional[Dict[str, Any]]): Options for the initial
configuration of CWT. At this time, ``expires_in`` (default
value: ``3600`` ) and ``leaway`` (default value: ``60``) are
only supported. See also :func:`expires_in <cwt.CWT.expires_in>`,
:func:`leeway <cwt.CWT.leeway>`.
Examples:
>>> from cwt import CWT, claims, cose_key
>>> ctx = CWT({"expires_in": 3600*24, "leeway": 10})
>>> key = cose_key.from_symmetric_key("mysecret")
>>> token = ctx.encode_and_mac(
... claims.from_json({"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}),
... key,
... )
"""
self._expires_in = _CWT_DEFAULT_EXPIRES_IN
self._leeway = _CWT_DEFAULT_LEEWAY
self._cose = COSE(options)
Expand Down Expand Up @@ -53,15 +75,15 @@ def expires_in(self) -> int:
@property
def leeway(self) -> int:
"""
The default leeway in seconds for validating `exp` and `nbf`.
The default leeway in seconds for validating ``exp`` and ``nbf``.
"""
return self._leeway

def encode_and_mac(
self,
claims: Union[Dict[int, Any], bytes],
key: COSEKey,
tagged: Optional[bool] = False,
tagged: bool = False,
recipients: Optional[List[Recipient]] = None,
) -> bytes:
"""
Expand Down Expand Up @@ -94,7 +116,7 @@ def encode_and_sign(
self,
claims: Union[Dict[int, Any], bytes],
key: Union[COSEKey, List[COSEKey]],
tagged: Optional[bool] = False,
tagged: bool = False,
) -> bytes:
"""
Encode CWT claims and sign it.
Expand Down Expand Up @@ -127,15 +149,15 @@ def encode_and_encrypt(
claims: Union[Dict[int, Any], bytes],
key: COSEKey,
nonce: bytes,
tagged: Optional[bool] = False,
tagged: bool = False,
recipients: Optional[List[Recipient]] = None,
) -> bytes:
"""
Encode CWT claims and encrypt it.
Args:
claims (Union[Dict[int, Any], bytes]): CWT claims.
key (COSEKey): A COSE key used to sign the claims.
key (COSEKey): A COSE key used to encrypt the claims.
nonce (bytes): A nonce for encryption.
recipients (List[Recipient]): A list of recipient information structures.
tagged (bool): An indicator whether the response is wrapped by CWT tag(61)
Expand Down Expand Up @@ -188,7 +210,6 @@ def decode(
return cwt

def _validate(self, claims: Union[Dict[int, Any], bytes]):
""""""
if isinstance(claims, bytes):
nested = self._loads(claims)
if not isinstance(nested, CBORTag):
Expand Down Expand Up @@ -229,7 +250,6 @@ def _validate(self, claims: Union[Dict[int, Any], bytes]):
return

def _verify(self, claims: Dict[int, Any]):
""""""
now = timegm(datetime.utcnow().utctimetuple())

if 4 in claims: # exp
Expand Down
Loading

0 comments on commit 4029174

Please sign in to comment.