Skip to content

Commit

Permalink
refactor(auth): remove OtfAuthConfig to simplify authentication confi…
Browse files Browse the repository at this point in the history
…guration

Remove the OtfAuthConfig class and its usage across the codebase to
simplify the authentication configuration process. This change reduces
complexity by eliminating unnecessary configuration options and
streamlining the caching mechanism using a unified CacheableData class.

This refactor aims to improve code maintainability and readability
while maintaining the same functionality.
  • Loading branch information
NodeJSmith committed Jan 8, 2025
1 parent b31a008 commit fdf1383
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 167 deletions.
44 changes: 16 additions & 28 deletions examples/auth_examples.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import os
from getpass import getpass

from otf_api import Otf, OtfAuth, OtfAuthConfig, OtfUser
from otf_api import Otf, OtfUser
from otf_api.auth.auth import OtfAuth

USERNAME = os.getenv("OTF_EMAIL") or input("Enter your OTF email: ")
PASSWORD = os.getenv("OTF_PASSWORD") or getpass("Enter your OTF password: ")
Expand All @@ -10,52 +11,37 @@
def main():
"""
OtfAuthConfig provides three options currently:
- cache_tokens_plaintext: bool - Whether to cache the tokens in plaintext in the config file - this is an obvious
security risk, but it's useful for development purposes. If you want to do this, it is at your own risk. The
benefit is that after you log in with your username/password once, you can use the cached tokens to log in
without providing your password again.
"""

# This is the most configurable way to access the API but also the most verbose

auth_config = OtfAuthConfig(cache_tokens_plaintext=True)

auth = OtfAuth.create(USERNAME, PASSWORD, config=auth_config)
user = OtfUser(auth=auth)
print(user.otf_auth.auth_type)

# You can also use `login` to log in with a username and password, with the config being optional
user = OtfUser.login(USERNAME, PASSWORD, config=auth_config)
# You can use `login` to log in with a username and password
user = OtfUser.login(USERNAME, PASSWORD)
print(user.otf_auth.auth_type)

# If you have tokens available you can provide them using `from_tokens` instead of `login`

user = OtfUser.from_tokens(
user.cognito.access_token, user.cognito.id_token, user.cognito.refresh_token, config=auth_config
)
user = OtfUser.from_tokens(**user.cognito.tokens)
print(user.otf_auth.auth_type)

# Likewise, if you have a Cognito instance you can provide that as well

user = OtfUser.from_cognito(cognito=user.cognito, config=auth_config)
user = OtfUser.from_cognito(cognito=user.cognito)
print(user.otf_auth.auth_type)

# If you have already logged in and you cached your tokens, you can use `from_cache` to create a user object
# without providing any tokens

user = OtfUser.from_cache(config=auth_config)
user = OtfUser.from_cache()
print(user.otf_auth.auth_type)

# if you want to clear the cached tokens, you can use `clear_cached_tokens`
# if you want to clear the cached device data, you can use `clear_cached_device_data`
# if you want to clear both, you can use `clear_cache`
user.clear_cached_tokens()
user.clear_cached_device_data()
user.clear_cache()

# for more granular control you can access the `otf_auth` attribute
# if you want to clear the cached tokens, you can use `clear_cached_tokens`
# if you want to clear the cached device data, you can use `clear_cached_device_data`
user.otf_auth.clear_cached_tokens()
user.otf_auth.clear_cached_device_data()

# now if we tried to log in from cache, it would raise a ValueError
try:
user = OtfUser.from_cache(config=auth_config)
user = OtfUser.from_cache()
except ValueError as e:
print(e)

Expand All @@ -66,6 +52,8 @@ def main():
print(otf_from_user.member_uuid)

# whereas if you provide an auth object, authenticate will be called
auth = OtfAuth.create(USERNAME, PASSWORD)

otf_from_auth = Otf(auth=auth)
print(otf_from_auth.member_uuid)

Expand Down
4 changes: 2 additions & 2 deletions src/otf_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

from otf_api.api import Otf
from otf_api import models
from otf_api.auth import OtfUser, OtfAuth, OtfAuthConfig
from otf_api.auth import OtfUser, OtfAuth

__version__ = "0.9.0"


__all__ = ["Otf", "OtfAuth", "OtfAuthConfig", "OtfUser", "models"]
__all__ = ["Otf", "OtfAuth", "OtfUser", "models"]
3 changes: 1 addition & 2 deletions src/otf_api/auth/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from .auth import OTF_AUTH_TYPE, OtfAuth
from .config import OtfAuthConfig
from .user import OtfUser
from .utils import HttpxCognitoAuth

__all__ = ["OTF_AUTH_TYPE", "HttpxCognitoAuth", "OtfAuth", "OtfAuthConfig", "OtfUser"]
__all__ = ["OTF_AUTH_TYPE", "HttpxCognitoAuth", "OtfAuth", "OtfUser"]
93 changes: 38 additions & 55 deletions src/otf_api/auth/auth.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import socket
import typing
from logging import getLogger
from pathlib import Path
from typing import ClassVar, Literal

import attrs
Expand All @@ -9,8 +10,8 @@
from botocore.config import Config
from pycognito import AWSSRP, Cognito

from otf_api.auth.config import OtfAuthConfig
from otf_api.auth.utils import CognitoTokens
from otf_api.utils import CacheableData

if typing.TYPE_CHECKING:
from mypy_boto3_cognito_idp import CognitoIdentityProviderClient
Expand All @@ -22,6 +23,11 @@
REGION = "us-east-1"

BOTO_CONFIG = Config(region_name=REGION, signature_version=UNSIGNED)
CRED_CACHE = CacheableData("creds", Path("~/.otf-api"))

DEVICE_KEYS = ["device_key", "device_group_key", "device_password"]
DEVICE_KEYS_NO_PWD = ["device_key", "device_group_key"]
TOKEN_KEYS = ["access_token", "id_token", "refresh_token"]


class OtfCognito(Cognito):
Expand All @@ -36,24 +42,30 @@ def renew_access_token(self) -> None:
auth_params = {"REFRESH_TOKEN": self.refresh_token}
self._add_secret_hash(auth_params, "SECRET_HASH")

if dd := self.auth.config.dd_cache.get_cached_data():
if dd := CRED_CACHE.get_cached_data():
auth_params["DEVICE_KEY"] = dd["device_key"]

refresh_response = self.client.initiate_auth(
ClientId=self.client_id, AuthFlow="REFRESH_TOKEN_AUTH", AuthParameters=auth_params
)
self._set_tokens(refresh_response)

CRED_CACHE.write_to_cache(self.tokens)

@property
def tokens(self) -> dict[str, str]:
tokens = {
"access_token": self.access_token,
"id_token": self.id_token,
"refresh_token": self.refresh_token,
}
return {k: v for k, v in tokens.items() if v}


class OtfAuth:
auth_type: ClassVar[Literal["basic", "token", "cognito"]]

cognito: OtfCognito
config: OtfAuthConfig

def __attrs_post_init__(self) -> None:
if not hasattr(self, "config"):
self.config = OtfAuthConfig()

@property
def access_token(self) -> str:
Expand All @@ -74,41 +86,31 @@ def refresh_token(self) -> str:
raise AttributeError("No refresh token found")

@staticmethod
def has_cached_credentials(config: OtfAuthConfig | None = None) -> bool:
def has_cached_credentials() -> bool:
"""Check if there are cached credentials.
Args:
config (OtfAuthConfig, optional): The configuration. Defaults to None.
Returns:
bool: True if there are cached credentials, False otherwise.
"""
config = config or OtfAuthConfig()
return bool(config.token_cache.get_cached_data())

return bool(CRED_CACHE.get_cached_data())

@staticmethod
def from_cache(config: OtfAuthConfig | None = None) -> "OtfTokenAuth":
def from_cache() -> "OtfTokenAuth":
"""Attempt to get an authentication object from the cache. If no tokens are found, raise a ValueError.
If config is not provided the default configuration will be used, with plaintext token caching enabled.
Args:
config (OtfAuthConfig, optional): The configuration. Defaults to None.
Raises:
ValueError: If no tokens are found in the cache.
"""

config = config or OtfAuthConfig(cache_tokens_plaintext=True)
tokens = config.token_cache.get_cached_data()
tokens = CRED_CACHE.get_cached_data()
if not tokens:
raise ValueError("No tokens found in cache")

return OtfTokenAuth(
access_token=tokens["access_token"],
id_token=tokens["id_token"],
refresh_token=tokens.get("refresh_token"),
auth_config=config,
access_token=tokens["access_token"], id_token=tokens["id_token"], refresh_token=tokens.get("refresh_token")
)

@staticmethod
Expand All @@ -119,24 +121,20 @@ def create(
access_token: str | None = None,
refresh_token: str | None = None,
cognito: OtfCognito | None = None,
config: OtfAuthConfig | None = None,
) -> "OTF_AUTH_TYPE":
"""Create an authentication object.
Accepts either username/password, id/access tokens, or a Cognito object.
Raises a ValueError if none of the required parameters are provided.
"""
config = config or OtfAuthConfig()

if username and password:
return OtfBasicAuth(username=username, password=password, config=config)
return OtfBasicAuth(username=username, password=password)
if id_token and access_token:
return OtfTokenAuth(
_id_token=id_token, _access_token=access_token, _refresh_token=refresh_token, auth_config=config
)
return OtfTokenAuth(id_token=id_token, access_token=access_token, refresh_token=refresh_token)
if cognito:
return OtfCognitoAuth(cognito=cognito, auth_config=config)
return OtfCognitoAuth(cognito=cognito)

raise ValueError("Must provide username/password or id/access tokens or cognito object")

Expand All @@ -147,11 +145,11 @@ def clear_cache(self) -> None:

def clear_cached_tokens(self) -> None:
"""Clear the cached tokens."""
self.config.token_cache.clear_cache()
CRED_CACHE.clear_cache(TOKEN_KEYS)

def clear_cached_device_data(self) -> None:
"""Clear the cached device data."""
self.config.dd_cache.clear_cache()
CRED_CACHE.clear_cache(DEVICE_KEYS)

def authenticate(self) -> None:
raise NotImplementedError
Expand All @@ -173,13 +171,7 @@ def validate_cognito_tokens(self) -> None:
self.cognito.check_token()
self.cognito.verify_tokens()

if self.config.cache_tokens_plaintext:
tokens = {
"access_token": self.cognito.access_token,
"id_token": self.cognito.id_token,
"refresh_token": self.cognito.refresh_token,
}
self.config.token_cache.write_to_cache(tokens)
CRED_CACHE.write_to_cache(self.cognito.tokens)

# ensure the cognito instance has the auth object
# we'll need this for the device key during refresh
Expand All @@ -192,7 +184,6 @@ class OtfBasicAuth(OtfAuth):

username: str
password: str
config: OtfAuthConfig = attrs.field(factory=OtfAuthConfig)

def get_awssrp(self) -> AWSSRP:
# this is to avoid boto3 trying to find credentials in the environment/local machine
Expand All @@ -205,7 +196,7 @@ def get_awssrp(self) -> AWSSRP:
"client": boto3.client("cognito-idp", config=BOTO_CONFIG),
}

dd = self.config.dd_cache.get_cached_data()
dd = CRED_CACHE.get_cached_data(DEVICE_KEYS)

kwargs = kwargs | dd | {"username": self.username, "password": self.password}

Expand Down Expand Up @@ -235,7 +226,7 @@ def handle_device_setup(self, tokens: CognitoTokens) -> None:
Args:
tokens (dict): The tokens from the AWS SRP instance.
"""
if self.config.dd_cache.get_cached_data():
if CRED_CACHE.get_cached_data():
LOGGER.debug("Skipping device setup")

try:
Expand All @@ -245,13 +236,8 @@ def handle_device_setup(self, tokens: CognitoTokens) -> None:
LOGGER.exception("Failed to get device name")
return

try:
if not tokens.device_key:
LOGGER.debug("No new device metadata")
return

except KeyError:
LOGGER.error("Failed to get device key and group key")
if not tokens.device_key:
LOGGER.debug("No new device metadata")
return

aws = self.get_awssrp()
Expand All @@ -270,7 +256,7 @@ def handle_device_setup(self, tokens: CognitoTokens) -> None:
"device_group_key": tokens.device_group_key,
"device_password": device_password,
}
self.config.dd_cache.write_to_cache(dd)
CRED_CACHE.write_to_cache(dd)
except Exception:
LOGGER.exception("Failed to write device key cache")
return
Expand All @@ -279,7 +265,7 @@ def forget_device(self) -> None:
"""Forget the device from AWS and delete the device key cache."""

access_token = self.cognito.access_token
dd = self.config.dd_cache.get_cached_data()
dd = CRED_CACHE.get_cached_data()
if not dd:
LOGGER.debug("No device data to forget")
return
Expand All @@ -299,11 +285,9 @@ class OtfTokenAuth(OtfAuth):
_access_token: str
_id_token: str
_refresh_token: str | None = None
auth_config: OtfAuthConfig = attrs.field(factory=OtfAuthConfig)

def authenticate(self) -> None:
dd = self.auth_config.dd_cache.get_cached_data()
dd.pop("device_password", None) # remove device password, not attribute of CognitoTokens
dd = CRED_CACHE.get_cached_data(DEVICE_KEYS_NO_PWD)
tokens = CognitoTokens(
access_token=self._access_token, id_token=self._id_token, refresh_token=self._refresh_token, **dd
)
Expand Down Expand Up @@ -337,7 +321,6 @@ class OtfCognitoAuth(OtfAuth):
auth_type: ClassVar[Literal["basic", "token", "cognito"]] = "cognito"

cognito: OtfCognito
auth_config: OtfAuthConfig = attrs.field(factory=OtfAuthConfig)

def authenticate(self) -> None:
self.cognito.check_token(renew=True)
Expand Down
34 changes: 0 additions & 34 deletions src/otf_api/auth/config.py

This file was deleted.

Loading

0 comments on commit fdf1383

Please sign in to comment.