diff --git a/examples/auth_examples.py b/examples/auth_examples.py index acf6c9f..7398ea8 100644 --- a/examples/auth_examples.py +++ b/examples/auth_examples.py @@ -1,7 +1,8 @@ import os from getpass import getpass -from otf_api import Otf, OtfAuth, OtfAuthConfig, OtfUser +from otf_api import Otf, OtfUser +from otf_api.auth.auth import OtfAuth USERNAME = os.getenv("OTF_EMAIL") or input("Enter your OTF email: ") PASSWORD = os.getenv("OTF_PASSWORD") or getpass("Enter your OTF password: ") @@ -10,52 +11,37 @@ def main(): """ OtfAuthConfig provides three options currently: - - cache_tokens_plaintext: bool - Whether to cache the tokens in plaintext in the config file - this is an obvious - security risk, but it's useful for development purposes. If you want to do this, it is at your own risk. The - benefit is that after you log in with your username/password once, you can use the cached tokens to log in - without providing your password again. """ - # This is the most configurable way to access the API but also the most verbose - - auth_config = OtfAuthConfig(cache_tokens_plaintext=True) - - auth = OtfAuth.create(USERNAME, PASSWORD, config=auth_config) - user = OtfUser(auth=auth) - print(user.otf_auth.auth_type) - - # You can also use `login` to log in with a username and password, with the config being optional - user = OtfUser.login(USERNAME, PASSWORD, config=auth_config) + # You can use `login` to log in with a username and password + user = OtfUser.login(USERNAME, PASSWORD) print(user.otf_auth.auth_type) # If you have tokens available you can provide them using `from_tokens` instead of `login` - - user = OtfUser.from_tokens( - user.cognito.access_token, user.cognito.id_token, user.cognito.refresh_token, config=auth_config - ) + user = OtfUser.from_tokens(**user.cognito.tokens) print(user.otf_auth.auth_type) # Likewise, if you have a Cognito instance you can provide that as well - - user = OtfUser.from_cognito(cognito=user.cognito, config=auth_config) + user = OtfUser.from_cognito(cognito=user.cognito) print(user.otf_auth.auth_type) # If you have already logged in and you cached your tokens, you can use `from_cache` to create a user object # without providing any tokens - - user = OtfUser.from_cache(config=auth_config) + user = OtfUser.from_cache() print(user.otf_auth.auth_type) - # if you want to clear the cached tokens, you can use `clear_cached_tokens` - # if you want to clear the cached device data, you can use `clear_cached_device_data` # if you want to clear both, you can use `clear_cache` - user.clear_cached_tokens() - user.clear_cached_device_data() user.clear_cache() + # for more granular control you can access the `otf_auth` attribute + # if you want to clear the cached tokens, you can use `clear_cached_tokens` + # if you want to clear the cached device data, you can use `clear_cached_device_data` + user.otf_auth.clear_cached_tokens() + user.otf_auth.clear_cached_device_data() + # now if we tried to log in from cache, it would raise a ValueError try: - user = OtfUser.from_cache(config=auth_config) + user = OtfUser.from_cache() except ValueError as e: print(e) @@ -66,6 +52,8 @@ def main(): print(otf_from_user.member_uuid) # whereas if you provide an auth object, authenticate will be called + auth = OtfAuth.create(USERNAME, PASSWORD) + otf_from_auth = Otf(auth=auth) print(otf_from_auth.member_uuid) diff --git a/src/otf_api/__init__.py b/src/otf_api/__init__.py index a982967..de7c9a2 100644 --- a/src/otf_api/__init__.py +++ b/src/otf_api/__init__.py @@ -2,9 +2,9 @@ from otf_api.api import Otf from otf_api import models -from otf_api.auth import OtfUser, OtfAuth, OtfAuthConfig +from otf_api.auth import OtfUser, OtfAuth __version__ = "0.9.0" -__all__ = ["Otf", "OtfAuth", "OtfAuthConfig", "OtfUser", "models"] +__all__ = ["Otf", "OtfAuth", "OtfUser", "models"] diff --git a/src/otf_api/auth/__init__.py b/src/otf_api/auth/__init__.py index ed8e8bb..bd05a55 100644 --- a/src/otf_api/auth/__init__.py +++ b/src/otf_api/auth/__init__.py @@ -1,6 +1,5 @@ from .auth import OTF_AUTH_TYPE, OtfAuth -from .config import OtfAuthConfig from .user import OtfUser from .utils import HttpxCognitoAuth -__all__ = ["OTF_AUTH_TYPE", "HttpxCognitoAuth", "OtfAuth", "OtfAuthConfig", "OtfUser"] +__all__ = ["OTF_AUTH_TYPE", "HttpxCognitoAuth", "OtfAuth", "OtfUser"] diff --git a/src/otf_api/auth/auth.py b/src/otf_api/auth/auth.py index 828ec10..aa411b9 100644 --- a/src/otf_api/auth/auth.py +++ b/src/otf_api/auth/auth.py @@ -1,6 +1,7 @@ import socket import typing from logging import getLogger +from pathlib import Path from typing import ClassVar, Literal import attrs @@ -9,8 +10,8 @@ from botocore.config import Config from pycognito import AWSSRP, Cognito -from otf_api.auth.config import OtfAuthConfig from otf_api.auth.utils import CognitoTokens +from otf_api.utils import CacheableData if typing.TYPE_CHECKING: from mypy_boto3_cognito_idp import CognitoIdentityProviderClient @@ -22,6 +23,11 @@ REGION = "us-east-1" BOTO_CONFIG = Config(region_name=REGION, signature_version=UNSIGNED) +CRED_CACHE = CacheableData("creds", Path("~/.otf-api")) + +DEVICE_KEYS = ["device_key", "device_group_key", "device_password"] +DEVICE_KEYS_NO_PWD = ["device_key", "device_group_key"] +TOKEN_KEYS = ["access_token", "id_token", "refresh_token"] class OtfCognito(Cognito): @@ -36,7 +42,7 @@ def renew_access_token(self) -> None: auth_params = {"REFRESH_TOKEN": self.refresh_token} self._add_secret_hash(auth_params, "SECRET_HASH") - if dd := self.auth.config.dd_cache.get_cached_data(): + if dd := CRED_CACHE.get_cached_data(): auth_params["DEVICE_KEY"] = dd["device_key"] refresh_response = self.client.initiate_auth( @@ -44,16 +50,22 @@ def renew_access_token(self) -> None: ) self._set_tokens(refresh_response) + CRED_CACHE.write_to_cache(self.tokens) + + @property + def tokens(self) -> dict[str, str]: + tokens = { + "access_token": self.access_token, + "id_token": self.id_token, + "refresh_token": self.refresh_token, + } + return {k: v for k, v in tokens.items() if v} + class OtfAuth: auth_type: ClassVar[Literal["basic", "token", "cognito"]] cognito: OtfCognito - config: OtfAuthConfig - - def __attrs_post_init__(self) -> None: - if not hasattr(self, "config"): - self.config = OtfAuthConfig() @property def access_token(self) -> str: @@ -74,41 +86,31 @@ def refresh_token(self) -> str: raise AttributeError("No refresh token found") @staticmethod - def has_cached_credentials(config: OtfAuthConfig | None = None) -> bool: + def has_cached_credentials() -> bool: """Check if there are cached credentials. - Args: - config (OtfAuthConfig, optional): The configuration. Defaults to None. - Returns: bool: True if there are cached credentials, False otherwise. """ - config = config or OtfAuthConfig() - return bool(config.token_cache.get_cached_data()) + + return bool(CRED_CACHE.get_cached_data()) @staticmethod - def from_cache(config: OtfAuthConfig | None = None) -> "OtfTokenAuth": + def from_cache() -> "OtfTokenAuth": """Attempt to get an authentication object from the cache. If no tokens are found, raise a ValueError. If config is not provided the default configuration will be used, with plaintext token caching enabled. - Args: - config (OtfAuthConfig, optional): The configuration. Defaults to None. - Raises: ValueError: If no tokens are found in the cache. """ - config = config or OtfAuthConfig(cache_tokens_plaintext=True) - tokens = config.token_cache.get_cached_data() + tokens = CRED_CACHE.get_cached_data() if not tokens: raise ValueError("No tokens found in cache") return OtfTokenAuth( - access_token=tokens["access_token"], - id_token=tokens["id_token"], - refresh_token=tokens.get("refresh_token"), - auth_config=config, + access_token=tokens["access_token"], id_token=tokens["id_token"], refresh_token=tokens.get("refresh_token") ) @staticmethod @@ -119,7 +121,6 @@ def create( access_token: str | None = None, refresh_token: str | None = None, cognito: OtfCognito | None = None, - config: OtfAuthConfig | None = None, ) -> "OTF_AUTH_TYPE": """Create an authentication object. @@ -127,16 +128,13 @@ def create( Raises a ValueError if none of the required parameters are provided. """ - config = config or OtfAuthConfig() if username and password: - return OtfBasicAuth(username=username, password=password, config=config) + return OtfBasicAuth(username=username, password=password) if id_token and access_token: - return OtfTokenAuth( - _id_token=id_token, _access_token=access_token, _refresh_token=refresh_token, auth_config=config - ) + return OtfTokenAuth(id_token=id_token, access_token=access_token, refresh_token=refresh_token) if cognito: - return OtfCognitoAuth(cognito=cognito, auth_config=config) + return OtfCognitoAuth(cognito=cognito) raise ValueError("Must provide username/password or id/access tokens or cognito object") @@ -147,11 +145,11 @@ def clear_cache(self) -> None: def clear_cached_tokens(self) -> None: """Clear the cached tokens.""" - self.config.token_cache.clear_cache() + CRED_CACHE.clear_cache(TOKEN_KEYS) def clear_cached_device_data(self) -> None: """Clear the cached device data.""" - self.config.dd_cache.clear_cache() + CRED_CACHE.clear_cache(DEVICE_KEYS) def authenticate(self) -> None: raise NotImplementedError @@ -173,13 +171,7 @@ def validate_cognito_tokens(self) -> None: self.cognito.check_token() self.cognito.verify_tokens() - if self.config.cache_tokens_plaintext: - tokens = { - "access_token": self.cognito.access_token, - "id_token": self.cognito.id_token, - "refresh_token": self.cognito.refresh_token, - } - self.config.token_cache.write_to_cache(tokens) + CRED_CACHE.write_to_cache(self.cognito.tokens) # ensure the cognito instance has the auth object # we'll need this for the device key during refresh @@ -192,7 +184,6 @@ class OtfBasicAuth(OtfAuth): username: str password: str - config: OtfAuthConfig = attrs.field(factory=OtfAuthConfig) def get_awssrp(self) -> AWSSRP: # this is to avoid boto3 trying to find credentials in the environment/local machine @@ -205,7 +196,7 @@ def get_awssrp(self) -> AWSSRP: "client": boto3.client("cognito-idp", config=BOTO_CONFIG), } - dd = self.config.dd_cache.get_cached_data() + dd = CRED_CACHE.get_cached_data(DEVICE_KEYS) kwargs = kwargs | dd | {"username": self.username, "password": self.password} @@ -235,7 +226,7 @@ def handle_device_setup(self, tokens: CognitoTokens) -> None: Args: tokens (dict): The tokens from the AWS SRP instance. """ - if self.config.dd_cache.get_cached_data(): + if CRED_CACHE.get_cached_data(): LOGGER.debug("Skipping device setup") try: @@ -245,13 +236,8 @@ def handle_device_setup(self, tokens: CognitoTokens) -> None: LOGGER.exception("Failed to get device name") return - try: - if not tokens.device_key: - LOGGER.debug("No new device metadata") - return - - except KeyError: - LOGGER.error("Failed to get device key and group key") + if not tokens.device_key: + LOGGER.debug("No new device metadata") return aws = self.get_awssrp() @@ -270,7 +256,7 @@ def handle_device_setup(self, tokens: CognitoTokens) -> None: "device_group_key": tokens.device_group_key, "device_password": device_password, } - self.config.dd_cache.write_to_cache(dd) + CRED_CACHE.write_to_cache(dd) except Exception: LOGGER.exception("Failed to write device key cache") return @@ -279,7 +265,7 @@ def forget_device(self) -> None: """Forget the device from AWS and delete the device key cache.""" access_token = self.cognito.access_token - dd = self.config.dd_cache.get_cached_data() + dd = CRED_CACHE.get_cached_data() if not dd: LOGGER.debug("No device data to forget") return @@ -299,11 +285,9 @@ class OtfTokenAuth(OtfAuth): _access_token: str _id_token: str _refresh_token: str | None = None - auth_config: OtfAuthConfig = attrs.field(factory=OtfAuthConfig) def authenticate(self) -> None: - dd = self.auth_config.dd_cache.get_cached_data() - dd.pop("device_password", None) # remove device password, not attribute of CognitoTokens + dd = CRED_CACHE.get_cached_data(DEVICE_KEYS_NO_PWD) tokens = CognitoTokens( access_token=self._access_token, id_token=self._id_token, refresh_token=self._refresh_token, **dd ) @@ -337,7 +321,6 @@ class OtfCognitoAuth(OtfAuth): auth_type: ClassVar[Literal["basic", "token", "cognito"]] = "cognito" cognito: OtfCognito - auth_config: OtfAuthConfig = attrs.field(factory=OtfAuthConfig) def authenticate(self) -> None: self.cognito.check_token(renew=True) diff --git a/src/otf_api/auth/config.py b/src/otf_api/auth/config.py deleted file mode 100644 index 66a0dcb..0000000 --- a/src/otf_api/auth/config.py +++ /dev/null @@ -1,34 +0,0 @@ -import os -from pathlib import Path -from typing import ClassVar - -import attrs - -from otf_api.utils import CacheableData - - -@attrs.define -class OtfAuthConfig: - DEVICE_DATA_KEYS: ClassVar[list[str]] = ["device_key", "device_group_key", "device_password"] - TOKEN_KEYS: ClassVar[list[str]] = ["access_token", "id_token", "refresh_token"] - - cache_tokens_plaintext: bool = False - - cache_path: Path = attrs.field(init=False) - dd_cache: CacheableData = attrs.field(init=False) - token_cache: CacheableData = attrs.field(init=False) - - def __attrs_post_init__(self) -> None: - self.cache_path = self._get_cache_path() - self.dd_cache = CacheableData("device_key", self.DEVICE_DATA_KEYS, self.cache_path) - self.token_cache = CacheableData("tokens", self.TOKEN_KEYS, self.cache_path) - - def _get_cache_path(self) -> Path: - if "OTF_API_CACHE_PATH" in os.environ: - cache_path = Path(os.environ["OTF_API_CACHE_PATH"]) - else: - cache_path = Path("~/.otf-api") - - cache_path = cache_path.expanduser() - cache_path.mkdir(parents=True, exist_ok=True) - return cache_path diff --git a/src/otf_api/auth/user.py b/src/otf_api/auth/user.py index 59f4950..0130749 100644 --- a/src/otf_api/auth/user.py +++ b/src/otf_api/auth/user.py @@ -2,7 +2,7 @@ import attrs -from otf_api.auth.auth import OTF_AUTH_TYPE, OtfAuth, OtfAuthConfig, OtfBasicAuth, OtfCognito +from otf_api.auth.auth import OTF_AUTH_TYPE, OtfAuth, OtfBasicAuth, OtfCognito from otf_api.auth.utils import HttpxCognitoAuth LOGGER = getLogger(__name__) @@ -57,83 +57,64 @@ def clear_cache(self) -> None: """Clear the cache.""" self.otf_auth.clear_cache() - def clear_cached_tokens(self) -> None: - """Clear the cached tokens.""" - self.otf_auth.clear_cached_tokens() - - def clear_cached_device_data(self) -> None: - """Clear the cached device data.""" - self.otf_auth.clear_cached_device_data() - @classmethod - def login(cls, username: str, password: str, config: OtfAuthConfig | None = None) -> "OtfUser": + def login(cls, username: str, password: str) -> "OtfUser": """Create a User instance from a username and password. Args: username (str): The username. password (str): The password. - config (OtfAuthConfig, optional): The configuration. Defaults to None. Returns: OtfUser: The User instance """ - auth = OtfAuth.create(username=username, password=password, config=config) + auth = OtfAuth.create(username=username, password=password) return cls(auth) @classmethod - def from_tokens( - cls, access_token: str, id_token: str, refresh_token: str | None = None, config: OtfAuthConfig | None = None - ) -> "OtfUser": + def from_tokens(cls, access_token: str, id_token: str, refresh_token: str | None = None) -> "OtfUser": """Create a User instance from tokens. Args: access_token (str): The access token. id_token (str): The id token. refresh_token (str, optional): The refresh token. Defaults to None. - config (OtfAuthConfig, optional): The configuration. Defaults to None. Returns: OtfUser: The User instance """ - auth = OtfAuth.create(access_token=access_token, id_token=id_token, refresh_token=refresh_token, config=config) + auth = OtfAuth.create(access_token=access_token, id_token=id_token, refresh_token=refresh_token) return cls(auth) @classmethod - def from_cognito(cls, cognito: OtfCognito, config: OtfAuthConfig | None = None) -> "OtfUser": + def from_cognito(cls, cognito: OtfCognito) -> "OtfUser": """Create a User instance from a OtfCognito instance. Args: cognito (OtfCognito): The OtfCognito instance. - config (OtfAuthConfig, optional): The configuration. Defaults to None. Returns: OtfUser: The User instance """ - auth = OtfAuth.create(cognito=cognito, config=config) + auth = OtfAuth.create(cognito=cognito) return cls(auth) @classmethod - def from_cache(cls, config: OtfAuthConfig | None = None) -> "OtfUser": + def from_cache(cls) -> "OtfUser": """Create a User instance from cached tokens. - Args: - config (OtfAuthConfig, optional): The configuration. Defaults to None. - Returns: OtfUser: The User instance """ - auth = OtfAuth.from_cache(config) + auth = OtfAuth.from_cache() return cls(auth) @classmethod - def has_cached_credentials(cls, config: OtfAuthConfig | None = None) -> bool: + def has_cached_credentials(cls) -> bool: """Check if there are cached credentials. - Args: - config (OtfAuthConfig, optional): The configuration. Defaults to None. - Returns: bool: True if there are cached credentials, False otherwise. """ - return OtfAuth.has_cached_credentials(config) + return OtfAuth.has_cached_credentials() diff --git a/src/otf_api/utils.py b/src/otf_api/utils.py index 246746a..a40e4a2 100644 --- a/src/otf_api/utils.py +++ b/src/otf_api/utils.py @@ -12,15 +12,14 @@ class CacheableData: """Represents a cacheable data object, with methods to read and write to cache.""" name: str - keys: list[str] cache_dir: Path @property def cache_path(self) -> Path: """The path to the cache file.""" - return self.cache_dir.joinpath(f"{self.name}_cache.json") + return self.cache_dir.expanduser().joinpath(f"{self.name}_cache.json") - def get_cached_data(self) -> dict[str, str]: + def get_cached_data(self, keys: list[str] | None = None) -> dict[str, str]: """Reads the cache file and returns the data if it exists and is valid. Returns: @@ -35,10 +34,12 @@ def get_cached_data(self) -> dict[str, str]: return {} data: dict[str, str] = json.loads(self.cache_path.read_text()) - if set(data.keys()).issuperset(set(self.keys)): - return {k: v for k, v in data.items() if k in self.keys} + if not keys: + return data - return {} + if set(data.keys()).issuperset(set(keys)): + return {k: v for k, v in data.items() if k in keys} + raise ValueError(f"Data must contain all keys: {keys}") except Exception: LOGGER.exception(f"Failed to read {self.cache_path.name}") return {} @@ -47,18 +48,24 @@ def write_to_cache(self, data: dict[str, str]) -> None: """Writes the data to the cache file.""" LOGGER.debug(f"Writing {self.name} to cache ({self.cache_path})") - if not all(k in data for k in self.keys): - raise ValueError(f"Data must contain all keys: {self.keys}") - - data = {k: v for k, v in data.items() if k in self.keys} + existing_data = self.get_cached_data() + data = {**existing_data, **data} - try: - self.cache_path.write_text(json.dumps(data, indent=4, default=str)) - except Exception: - LOGGER.exception(f"Failed to write {self.cache_path.name}") + self.cache_path.write_text(json.dumps(data, indent=4, default=str)) - def clear_cache(self) -> None: + def clear_cache(self, keys: list[str] | None = None) -> None: """Deletes the cache file if it exists.""" - if self.cache_path.exists(): + if not self.cache_path.exists(): + return + + if not keys: self.cache_path.unlink() LOGGER.debug(f"{self.name} cache deleted") + + assert isinstance(keys, list), "Keys must be a list" + + data = self.get_cached_data() + for key in keys: + data.pop(key, None) + + self.write_to_cache(data)