diff --git a/qpc/insights/auth.py b/qpc/insights/auth.py new file mode 100644 index 00000000..9f6818d1 --- /dev/null +++ b/qpc/insights/auth.py @@ -0,0 +1,111 @@ +"""Insights Auth is used to handle the Insights authentication workflow.""" + +import http +import time + +import requests +from requests.exceptions import BaseHTTPError + +from qpc.insights.exceptions import InsightsAuthError +from qpc.translation import _ + +DISCOVERY_CLIENT_ID = "discovery-client-id" +INSIGHTS_SSO_SERVER = "sso.redhat.com" +INSIGHTS_REALM = "redhat-external" +INSIGHTS_SCOPE = "api.console" +GRANT_TYPE = "urn:ietf:params:oauth:grant-type:device_code" +DEVICE_AUTH_ENDPOINT = ( + f"/auth/realms/{INSIGHTS_REALM}/protocol/openid-connect/auth/device" +) +TOKEN_ENDPOINT = f"/auth/realms/{INSIGHTS_REALM}/protocol/openid-connect/token" + + +class InsightsAuth: + """Implement the Insights Device Authorization workflow.""" + + def __init__(self): + self.auth_request = None + self.token_response = None + self.auth_token = None + + def request_auth(self): + """Initialize a device authorization workflow request. + + :returns: authorization request object + """ + self.auth_request = None + + url = f"https://{INSIGHTS_SSO_SERVER}/{DEVICE_AUTH_ENDPOINT}" # Always SSL + headers = {"Content-Type": "application/x-www-form-urlencoded"} + params = { + "grant_type": GRANT_TYPE, + "scope": INSIGHTS_SCOPE, + "client_id": DISCOVERY_CLIENT_ID, + } + try: + response = requests.post(url, headers=headers, data=params) + except BaseHTTPError as err: + raise InsightsAuthError( + _("Failed to request login authorization - %s"), err.message + ) + + if response.status_code == http.HTTPStatus.OK: + self.auth_request = response.json() + else: + raise InsightsAuthError( + _("Failed to request login authorization - %s"), response.reason + ) + + return self.auth_request + + def wait_for_authorization(self): + """Wait for the user to log in and authorize the request. + + :returns: user JWT token + """ + if self.auth_request: + device_code = self.auth_request["device_code"] + interval = self.auth_request.get("interval") or 5 # SSO default + expires_in = self.auth_request.get("expires_in") or 600 # SSO default + + elapsed_time = 0 + self.auth_token = None + while not self.auth_token: + url = f"https://{INSIGHTS_SSO_SERVER}/{TOKEN_ENDPOINT}" # Always SSL + headers = {"Content-Type": "application/x-www-form-urlencoded"} + params = { + "grant_type": GRANT_TYPE, + "client_id": DISCOVERY_CLIENT_ID, + "device_code": device_code, + } + try: + response = requests.post(url, headers=headers, data=params) + except BaseHTTPError as err: + raise InsightsAuthError( + _("Failed to verify login authorization - %s"), err.message + ) + + if response.status_code == http.HTTPStatus.OK: + self.token_response = response.json() + self.auth_token = self.token_response["access_token"] + break + if response.status_code == http.HTTPStatus.BAD_REQUEST: + self.token_response = response.json() + if self.token_response.get("error") != "authorization_pending": + raise InsightsAuthError( + _("Failed to verify login authorization - %s"), + response.reason, + ) + else: + raise InsightsAuthError( + _("Failed to verify login authorization - %s"), response.reason + ) + + time.sleep(interval) + elapsed_time += interval + if elapsed_time > expires_in: + raise InsightsAuthError( + _("Time-out while waiting for login authorization") + ) + + return self.auth_token diff --git a/qpc/insights/exceptions.py b/qpc/insights/exceptions.py index 02fc0f32..4a50e680 100644 --- a/qpc/insights/exceptions.py +++ b/qpc/insights/exceptions.py @@ -4,6 +4,10 @@ from qpc.exceptions import QPCError +class InsightsAuthError(QPCError): + """Class for Insights device authorization errors.""" + + class QPCLoginConfigError(QPCError): """Class for errors in the login config file manipulation.""" diff --git a/qpc/insights/login.py b/qpc/insights/login.py index 8a7e8bf4..a5db7a74 100644 --- a/qpc/insights/login.py +++ b/qpc/insights/login.py @@ -2,11 +2,12 @@ from logging import getLogger -from qpc import insights, messages +from qpc import insights from qpc.clicommand import CliCommand -from qpc.exceptions import QPCError +from qpc.insights.auth import InsightsAuth +from qpc.insights.exceptions import InsightsAuthError from qpc.translation import _ -from qpc.utils import write_insights_auth_token +from qpc.utils import clear_insights_auth_token, write_insights_auth_token logger = getLogger(__name__) @@ -32,11 +33,19 @@ def __init__(self, subparsers): ) def _do_command(self): - """Persist insights login configuration.""" - user_token = None + """Request Insights login authorization.""" + auth_token = None try: - write_insights_auth_token(user_token) - except QPCError as err: + clear_insights_auth_token() + insights_auth = InsightsAuth() + auth_request = insights_auth.request_auth() + print("Insights login authorization requested") + print(f"User Code: {auth_request['user_code']}") + print(f"Authorization URL: {auth_request['verification_uri_complete']}") + print("Waiting for login authorization ...") + auth_token = insights_auth.wait_for_authorization() + print("Login authorization successful.") + write_insights_auth_token(auth_token) + except InsightsAuthError as err: logger.error(_(err.message)) SystemExit(1) - logger.info(_(messages.INSIGHTS_LOGIN_CONFIG_SUCCESS)) diff --git a/qpc/insights/tests_insights_login.py b/qpc/insights/tests_insights_login.py index b79aa27b..77610d64 100644 --- a/qpc/insights/tests_insights_login.py +++ b/qpc/insights/tests_insights_login.py @@ -1,17 +1,5 @@ """Test the CLI module.""" -import sys - -from qpc.cli import CLI - class TestInsightsLogin: """Class for testing insights login command.""" - - def test_insight_login(self, capsys): - """Testing if insights login command requires args.""" - sys.argv = ["/bin/qpc", "insights", "login"] - CLI().main() - out, err = capsys.readouterr() - assert out == "" - assert err == "" diff --git a/qpc/utils.py b/qpc/utils.py index 9858916d..bb47c3a1 100644 --- a/qpc/utils.py +++ b/qpc/utils.py @@ -325,16 +325,16 @@ def read_insights_auth_token(): return decrypt_password(INSIGHTS_AUTH_TOKEN.read_text()) -def write_insights_auth_token(insights_auth_token): +def write_insights_auth_token(auth_token): """Write insights user authentication. - :param insights_auth_token: Insight's user JWT auth token + :param auth_token: Insight's user JWT auth token """ ensure_config_dir_exists() - if insights_auth_token: + if auth_token: with Path(INSIGHTS_AUTH_TOKEN).open("w", encoding="utf-8") as auth_token_file: - os.write(auth_token_file, encrypt_password(insights_auth_token)) + auth_token_file.write(encrypt_password(auth_token)) else: clear_insights_auth_token()