From fdf13838a2030d029559a3ec995c54214f2fa664 Mon Sep 17 00:00:00 2001 From: Jessica Smith <12jessicasmith34@gmail.com> Date: Tue, 7 Jan 2025 21:32:12 -0600 Subject: [PATCH] refactor(auth): remove OtfAuthConfig to simplify authentication configuration Remove the OtfAuthConfig class and its usage across the codebase to simplify the authentication configuration process. This change reduces complexity by eliminating unnecessary configuration options and streamlining the caching mechanism using a unified CacheableData class. This refactor aims to improve code maintainability and readability while maintaining the same functionality. --- examples/auth_examples.py | 44 +++++++---------- src/otf_api/__init__.py | 4 +- src/otf_api/auth/__init__.py | 3 +- src/otf_api/auth/auth.py | 93 +++++++++++++++--------------------- src/otf_api/auth/config.py | 34 ------------- src/otf_api/auth/user.py | 41 +++++----------- src/otf_api/utils.py | 39 ++++++++------- 7 files changed, 91 insertions(+), 167 deletions(-) delete mode 100644 src/otf_api/auth/config.py diff --git a/examples/auth_examples.py b/examples/auth_examples.py index acf6c9f..7398ea8 100644 --- a/examples/auth_examples.py +++ b/examples/auth_examples.py @@ -1,7 +1,8 @@ import os from getpass import getpass -from otf_api import Otf, OtfAuth, OtfAuthConfig, OtfUser +from otf_api import Otf, OtfUser +from otf_api.auth.auth import OtfAuth USERNAME = os.getenv("OTF_EMAIL") or input("Enter your OTF email: ") PASSWORD = os.getenv("OTF_PASSWORD") or getpass("Enter your OTF password: ") @@ -10,52 +11,37 @@ def main(): """ OtfAuthConfig provides three options currently: - - cache_tokens_plaintext: bool - Whether to cache the tokens in plaintext in the config file - this is an obvious - security risk, but it's useful for development purposes. If you want to do this, it is at your own risk. The - benefit is that after you log in with your username/password once, you can use the cached tokens to log in - without providing your password again. """ - # This is the most configurable way to access the API but also the most verbose - - auth_config = OtfAuthConfig(cache_tokens_plaintext=True) - - auth = OtfAuth.create(USERNAME, PASSWORD, config=auth_config) - user = OtfUser(auth=auth) - print(user.otf_auth.auth_type) - - # You can also use `login` to log in with a username and password, with the config being optional - user = OtfUser.login(USERNAME, PASSWORD, config=auth_config) + # You can use `login` to log in with a username and password + user = OtfUser.login(USERNAME, PASSWORD) print(user.otf_auth.auth_type) # If you have tokens available you can provide them using `from_tokens` instead of `login` - - user = OtfUser.from_tokens( - user.cognito.access_token, user.cognito.id_token, user.cognito.refresh_token, config=auth_config - ) + user = OtfUser.from_tokens(**user.cognito.tokens) print(user.otf_auth.auth_type) # Likewise, if you have a Cognito instance you can provide that as well - - user = OtfUser.from_cognito(cognito=user.cognito, config=auth_config) + user = OtfUser.from_cognito(cognito=user.cognito) print(user.otf_auth.auth_type) # If you have already logged in and you cached your tokens, you can use `from_cache` to create a user object # without providing any tokens - - user = OtfUser.from_cache(config=auth_config) + user = OtfUser.from_cache() print(user.otf_auth.auth_type) - # if you want to clear the cached tokens, you can use `clear_cached_tokens` - # if you want to clear the cached device data, you can use `clear_cached_device_data` # if you want to clear both, you can use `clear_cache` - user.clear_cached_tokens() - user.clear_cached_device_data() user.clear_cache() + # for more granular control you can access the `otf_auth` attribute + # if you want to clear the cached tokens, you can use `clear_cached_tokens` + # if you want to clear the cached device data, you can use `clear_cached_device_data` + user.otf_auth.clear_cached_tokens() + user.otf_auth.clear_cached_device_data() + # now if we tried to log in from cache, it would raise a ValueError try: - user = OtfUser.from_cache(config=auth_config) + user = OtfUser.from_cache() except ValueError as e: print(e) @@ -66,6 +52,8 @@ def main(): print(otf_from_user.member_uuid) # whereas if you provide an auth object, authenticate will be called + auth = OtfAuth.create(USERNAME, PASSWORD) + otf_from_auth = Otf(auth=auth) print(otf_from_auth.member_uuid) diff --git a/src/otf_api/__init__.py b/src/otf_api/__init__.py index a982967..de7c9a2 100644 --- a/src/otf_api/__init__.py +++ b/src/otf_api/__init__.py @@ -2,9 +2,9 @@ from otf_api.api import Otf from otf_api import models -from otf_api.auth import OtfUser, OtfAuth, OtfAuthConfig +from otf_api.auth import OtfUser, OtfAuth __version__ = "0.9.0" -__all__ = ["Otf", "OtfAuth", "OtfAuthConfig", "OtfUser", "models"] +__all__ = ["Otf", "OtfAuth", "OtfUser", "models"] diff --git a/src/otf_api/auth/__init__.py b/src/otf_api/auth/__init__.py index ed8e8bb..bd05a55 100644 --- a/src/otf_api/auth/__init__.py +++ b/src/otf_api/auth/__init__.py @@ -1,6 +1,5 @@ from .auth import OTF_AUTH_TYPE, OtfAuth -from .config import OtfAuthConfig from .user import OtfUser from .utils import HttpxCognitoAuth -__all__ = ["OTF_AUTH_TYPE", "HttpxCognitoAuth", "OtfAuth", "OtfAuthConfig", "OtfUser"] +__all__ = ["OTF_AUTH_TYPE", "HttpxCognitoAuth", "OtfAuth", "OtfUser"] diff --git a/src/otf_api/auth/auth.py b/src/otf_api/auth/auth.py index 828ec10..aa411b9 100644 --- a/src/otf_api/auth/auth.py +++ b/src/otf_api/auth/auth.py @@ -1,6 +1,7 @@ import socket import typing from logging import getLogger +from pathlib import Path from typing import ClassVar, Literal import attrs @@ -9,8 +10,8 @@ from botocore.config import Config from pycognito import AWSSRP, Cognito -from otf_api.auth.config import OtfAuthConfig from otf_api.auth.utils import CognitoTokens +from otf_api.utils import CacheableData if typing.TYPE_CHECKING: from mypy_boto3_cognito_idp import CognitoIdentityProviderClient @@ -22,6 +23,11 @@ REGION = "us-east-1" BOTO_CONFIG = Config(region_name=REGION, signature_version=UNSIGNED) +CRED_CACHE = CacheableData("creds", Path("~/.otf-api")) + +DEVICE_KEYS = ["device_key", "device_group_key", "device_password"] +DEVICE_KEYS_NO_PWD = ["device_key", "device_group_key"] +TOKEN_KEYS = ["access_token", "id_token", "refresh_token"] class OtfCognito(Cognito): @@ -36,7 +42,7 @@ def renew_access_token(self) -> None: auth_params = {"REFRESH_TOKEN": self.refresh_token} self._add_secret_hash(auth_params, "SECRET_HASH") - if dd := self.auth.config.dd_cache.get_cached_data(): + if dd := CRED_CACHE.get_cached_data(): auth_params["DEVICE_KEY"] = dd["device_key"] refresh_response = self.client.initiate_auth( @@ -44,16 +50,22 @@ def renew_access_token(self) -> None: ) self._set_tokens(refresh_response) + CRED_CACHE.write_to_cache(self.tokens) + + @property + def tokens(self) -> dict[str, str]: + tokens = { + "access_token": self.access_token, + "id_token": self.id_token, + "refresh_token": self.refresh_token, + } + return {k: v for k, v in tokens.items() if v} + class OtfAuth: auth_type: ClassVar[Literal["basic", "token", "cognito"]] cognito: OtfCognito - config: OtfAuthConfig - - def __attrs_post_init__(self) -> None: - if not hasattr(self, "config"): - self.config = OtfAuthConfig() @property def access_token(self) -> str: @@ -74,41 +86,31 @@ def refresh_token(self) -> str: raise AttributeError("No refresh token found") @staticmethod - def has_cached_credentials(config: OtfAuthConfig | None = None) -> bool: + def has_cached_credentials() -> bool: """Check if there are cached credentials. - Args: - config (OtfAuthConfig, optional): The configuration. Defaults to None. - Returns: bool: True if there are cached credentials, False otherwise. """ - config = config or OtfAuthConfig() - return bool(config.token_cache.get_cached_data()) + + return bool(CRED_CACHE.get_cached_data()) @staticmethod - def from_cache(config: OtfAuthConfig | None = None) -> "OtfTokenAuth": + def from_cache() -> "OtfTokenAuth": """Attempt to get an authentication object from the cache. If no tokens are found, raise a ValueError. If config is not provided the default configuration will be used, with plaintext token caching enabled. - Args: - config (OtfAuthConfig, optional): The configuration. Defaults to None. - Raises: ValueError: If no tokens are found in the cache. """ - config = config or OtfAuthConfig(cache_tokens_plaintext=True) - tokens = config.token_cache.get_cached_data() + tokens = CRED_CACHE.get_cached_data() if not tokens: raise ValueError("No tokens found in cache") return OtfTokenAuth( - access_token=tokens["access_token"], - id_token=tokens["id_token"], - refresh_token=tokens.get("refresh_token"), - auth_config=config, + access_token=tokens["access_token"], id_token=tokens["id_token"], refresh_token=tokens.get("refresh_token") ) @staticmethod @@ -119,7 +121,6 @@ def create( access_token: str | None = None, refresh_token: str | None = None, cognito: OtfCognito | None = None, - config: OtfAuthConfig | None = None, ) -> "OTF_AUTH_TYPE": """Create an authentication object. @@ -127,16 +128,13 @@ def create( Raises a ValueError if none of the required parameters are provided. """ - config = config or OtfAuthConfig() if username and password: - return OtfBasicAuth(username=username, password=password, config=config) + return OtfBasicAuth(username=username, password=password) if id_token and access_token: - return OtfTokenAuth( - _id_token=id_token, _access_token=access_token, _refresh_token=refresh_token, auth_config=config - ) + return OtfTokenAuth(id_token=id_token, access_token=access_token, refresh_token=refresh_token) if cognito: - return OtfCognitoAuth(cognito=cognito, auth_config=config) + return OtfCognitoAuth(cognito=cognito) raise ValueError("Must provide username/password or id/access tokens or cognito object") @@ -147,11 +145,11 @@ def clear_cache(self) -> None: def clear_cached_tokens(self) -> None: """Clear the cached tokens.""" - self.config.token_cache.clear_cache() + CRED_CACHE.clear_cache(TOKEN_KEYS) def clear_cached_device_data(self) -> None: """Clear the cached device data.""" - self.config.dd_cache.clear_cache() + CRED_CACHE.clear_cache(DEVICE_KEYS) def authenticate(self) -> None: raise NotImplementedError @@ -173,13 +171,7 @@ def validate_cognito_tokens(self) -> None: self.cognito.check_token() self.cognito.verify_tokens() - if self.config.cache_tokens_plaintext: - tokens = { - "access_token": self.cognito.access_token, - "id_token": self.cognito.id_token, - "refresh_token": self.cognito.refresh_token, - } - self.config.token_cache.write_to_cache(tokens) + CRED_CACHE.write_to_cache(self.cognito.tokens) # ensure the cognito instance has the auth object # we'll need this for the device key during refresh @@ -192,7 +184,6 @@ class OtfBasicAuth(OtfAuth): username: str password: str - config: OtfAuthConfig = attrs.field(factory=OtfAuthConfig) def get_awssrp(self) -> AWSSRP: # this is to avoid boto3 trying to find credentials in the environment/local machine @@ -205,7 +196,7 @@ def get_awssrp(self) -> AWSSRP: "client": boto3.client("cognito-idp", config=BOTO_CONFIG), } - dd = self.config.dd_cache.get_cached_data() + dd = CRED_CACHE.get_cached_data(DEVICE_KEYS) kwargs = kwargs | dd | {"username": self.username, "password": self.password} @@ -235,7 +226,7 @@ def handle_device_setup(self, tokens: CognitoTokens) -> None: Args: tokens (dict): The tokens from the AWS SRP instance. """ - if self.config.dd_cache.get_cached_data(): + if CRED_CACHE.get_cached_data(): LOGGER.debug("Skipping device setup") try: @@ -245,13 +236,8 @@ def handle_device_setup(self, tokens: CognitoTokens) -> None: LOGGER.exception("Failed to get device name") return - try: - if not tokens.device_key: - LOGGER.debug("No new device metadata") - return - - except KeyError: - LOGGER.error("Failed to get device key and group key") + if not tokens.device_key: + LOGGER.debug("No new device metadata") return aws = self.get_awssrp() @@ -270,7 +256,7 @@ def handle_device_setup(self, tokens: CognitoTokens) -> None: "device_group_key": tokens.device_group_key, "device_password": device_password, } - self.config.dd_cache.write_to_cache(dd) + CRED_CACHE.write_to_cache(dd) except Exception: LOGGER.exception("Failed to write device key cache") return @@ -279,7 +265,7 @@ def forget_device(self) -> None: """Forget the device from AWS and delete the device key cache.""" access_token = self.cognito.access_token - dd = self.config.dd_cache.get_cached_data() + dd = CRED_CACHE.get_cached_data() if not dd: LOGGER.debug("No device data to forget") return @@ -299,11 +285,9 @@ class OtfTokenAuth(OtfAuth): _access_token: str _id_token: str _refresh_token: str | None = None - auth_config: OtfAuthConfig = attrs.field(factory=OtfAuthConfig) def authenticate(self) -> None: - dd = self.auth_config.dd_cache.get_cached_data() - dd.pop("device_password", None) # remove device password, not attribute of CognitoTokens + dd = CRED_CACHE.get_cached_data(DEVICE_KEYS_NO_PWD) tokens = CognitoTokens( access_token=self._access_token, id_token=self._id_token, refresh_token=self._refresh_token, **dd ) @@ -337,7 +321,6 @@ class OtfCognitoAuth(OtfAuth): auth_type: ClassVar[Literal["basic", "token", "cognito"]] = "cognito" cognito: OtfCognito - auth_config: OtfAuthConfig = attrs.field(factory=OtfAuthConfig) def authenticate(self) -> None: self.cognito.check_token(renew=True) diff --git a/src/otf_api/auth/config.py b/src/otf_api/auth/config.py deleted file mode 100644 index 66a0dcb..0000000 --- a/src/otf_api/auth/config.py +++ /dev/null @@ -1,34 +0,0 @@ -import os -from pathlib import Path -from typing import ClassVar - -import attrs - -from otf_api.utils import CacheableData - - -@attrs.define -class OtfAuthConfig: - DEVICE_DATA_KEYS: ClassVar[list[str]] = ["device_key", "device_group_key", "device_password"] - TOKEN_KEYS: ClassVar[list[str]] = ["access_token", "id_token", "refresh_token"] - - cache_tokens_plaintext: bool = False - - cache_path: Path = attrs.field(init=False) - dd_cache: CacheableData = attrs.field(init=False) - token_cache: CacheableData = attrs.field(init=False) - - def __attrs_post_init__(self) -> None: - self.cache_path = self._get_cache_path() - self.dd_cache = CacheableData("device_key", self.DEVICE_DATA_KEYS, self.cache_path) - self.token_cache = CacheableData("tokens", self.TOKEN_KEYS, self.cache_path) - - def _get_cache_path(self) -> Path: - if "OTF_API_CACHE_PATH" in os.environ: - cache_path = Path(os.environ["OTF_API_CACHE_PATH"]) - else: - cache_path = Path("~/.otf-api") - - cache_path = cache_path.expanduser() - cache_path.mkdir(parents=True, exist_ok=True) - return cache_path diff --git a/src/otf_api/auth/user.py b/src/otf_api/auth/user.py index 59f4950..0130749 100644 --- a/src/otf_api/auth/user.py +++ b/src/otf_api/auth/user.py @@ -2,7 +2,7 @@ import attrs -from otf_api.auth.auth import OTF_AUTH_TYPE, OtfAuth, OtfAuthConfig, OtfBasicAuth, OtfCognito +from otf_api.auth.auth import OTF_AUTH_TYPE, OtfAuth, OtfBasicAuth, OtfCognito from otf_api.auth.utils import HttpxCognitoAuth LOGGER = getLogger(__name__) @@ -57,83 +57,64 @@ def clear_cache(self) -> None: """Clear the cache.""" self.otf_auth.clear_cache() - def clear_cached_tokens(self) -> None: - """Clear the cached tokens.""" - self.otf_auth.clear_cached_tokens() - - def clear_cached_device_data(self) -> None: - """Clear the cached device data.""" - self.otf_auth.clear_cached_device_data() - @classmethod - def login(cls, username: str, password: str, config: OtfAuthConfig | None = None) -> "OtfUser": + def login(cls, username: str, password: str) -> "OtfUser": """Create a User instance from a username and password. Args: username (str): The username. password (str): The password. - config (OtfAuthConfig, optional): The configuration. Defaults to None. Returns: OtfUser: The User instance """ - auth = OtfAuth.create(username=username, password=password, config=config) + auth = OtfAuth.create(username=username, password=password) return cls(auth) @classmethod - def from_tokens( - cls, access_token: str, id_token: str, refresh_token: str | None = None, config: OtfAuthConfig | None = None - ) -> "OtfUser": + def from_tokens(cls, access_token: str, id_token: str, refresh_token: str | None = None) -> "OtfUser": """Create a User instance from tokens. Args: access_token (str): The access token. id_token (str): The id token. refresh_token (str, optional): The refresh token. Defaults to None. - config (OtfAuthConfig, optional): The configuration. Defaults to None. Returns: OtfUser: The User instance """ - auth = OtfAuth.create(access_token=access_token, id_token=id_token, refresh_token=refresh_token, config=config) + auth = OtfAuth.create(access_token=access_token, id_token=id_token, refresh_token=refresh_token) return cls(auth) @classmethod - def from_cognito(cls, cognito: OtfCognito, config: OtfAuthConfig | None = None) -> "OtfUser": + def from_cognito(cls, cognito: OtfCognito) -> "OtfUser": """Create a User instance from a OtfCognito instance. Args: cognito (OtfCognito): The OtfCognito instance. - config (OtfAuthConfig, optional): The configuration. Defaults to None. Returns: OtfUser: The User instance """ - auth = OtfAuth.create(cognito=cognito, config=config) + auth = OtfAuth.create(cognito=cognito) return cls(auth) @classmethod - def from_cache(cls, config: OtfAuthConfig | None = None) -> "OtfUser": + def from_cache(cls) -> "OtfUser": """Create a User instance from cached tokens. - Args: - config (OtfAuthConfig, optional): The configuration. Defaults to None. - Returns: OtfUser: The User instance """ - auth = OtfAuth.from_cache(config) + auth = OtfAuth.from_cache() return cls(auth) @classmethod - def has_cached_credentials(cls, config: OtfAuthConfig | None = None) -> bool: + def has_cached_credentials(cls) -> bool: """Check if there are cached credentials. - Args: - config (OtfAuthConfig, optional): The configuration. Defaults to None. - Returns: bool: True if there are cached credentials, False otherwise. """ - return OtfAuth.has_cached_credentials(config) + return OtfAuth.has_cached_credentials() diff --git a/src/otf_api/utils.py b/src/otf_api/utils.py index 246746a..a40e4a2 100644 --- a/src/otf_api/utils.py +++ b/src/otf_api/utils.py @@ -12,15 +12,14 @@ class CacheableData: """Represents a cacheable data object, with methods to read and write to cache.""" name: str - keys: list[str] cache_dir: Path @property def cache_path(self) -> Path: """The path to the cache file.""" - return self.cache_dir.joinpath(f"{self.name}_cache.json") + return self.cache_dir.expanduser().joinpath(f"{self.name}_cache.json") - def get_cached_data(self) -> dict[str, str]: + def get_cached_data(self, keys: list[str] | None = None) -> dict[str, str]: """Reads the cache file and returns the data if it exists and is valid. Returns: @@ -35,10 +34,12 @@ def get_cached_data(self) -> dict[str, str]: return {} data: dict[str, str] = json.loads(self.cache_path.read_text()) - if set(data.keys()).issuperset(set(self.keys)): - return {k: v for k, v in data.items() if k in self.keys} + if not keys: + return data - return {} + if set(data.keys()).issuperset(set(keys)): + return {k: v for k, v in data.items() if k in keys} + raise ValueError(f"Data must contain all keys: {keys}") except Exception: LOGGER.exception(f"Failed to read {self.cache_path.name}") return {} @@ -47,18 +48,24 @@ def write_to_cache(self, data: dict[str, str]) -> None: """Writes the data to the cache file.""" LOGGER.debug(f"Writing {self.name} to cache ({self.cache_path})") - if not all(k in data for k in self.keys): - raise ValueError(f"Data must contain all keys: {self.keys}") - - data = {k: v for k, v in data.items() if k in self.keys} + existing_data = self.get_cached_data() + data = {**existing_data, **data} - try: - self.cache_path.write_text(json.dumps(data, indent=4, default=str)) - except Exception: - LOGGER.exception(f"Failed to write {self.cache_path.name}") + self.cache_path.write_text(json.dumps(data, indent=4, default=str)) - def clear_cache(self) -> None: + def clear_cache(self, keys: list[str] | None = None) -> None: """Deletes the cache file if it exists.""" - if self.cache_path.exists(): + if not self.cache_path.exists(): + return + + if not keys: self.cache_path.unlink() LOGGER.debug(f"{self.name} cache deleted") + + assert isinstance(keys, list), "Keys must be a list" + + data = self.get_cached_data() + for key in keys: + data.pop(key, None) + + self.write_to_cache(data)