Skip to content

Commit

Permalink
feat(insights): Dynamically query the device auth and token endpoints
Browse files Browse the repository at this point in the history
- While these have not changed for a long time, it is always a good idea
  to fetch the device_authorization_endpoint and token_endpoint from
  the Keycloak OpenID well-known configuration json.
  • Loading branch information
abellotti committed Nov 28, 2023
1 parent 275f2fe commit 15281c6
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 65 deletions.
60 changes: 40 additions & 20 deletions qpc/insights/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,29 @@
INSIGHTS_REALM = "redhat-external"
INSIGHTS_SCOPE = "api.console"
GRANT_TYPE = "urn:ietf:params:oauth:grant-type:device_code"
DEVICE_AUTH_ENDPOINT = (
f"/auth/realms/{INSIGHTS_REALM}/protocol/openid-connect/auth/device"
OPENID_CONFIG_ENDPOINT = (
f"/auth/realms/{INSIGHTS_REALM}/.well-known/openid-configuration"
)
TOKEN_ENDPOINT = f"/auth/realms/{INSIGHTS_REALM}/protocol/openid-connect/token"
DEVICE_AUTH_ENDPOINT_KEY = "device_authorization_endpoint"
TOKEN_ENDPOINT_KEY = "token_endpoint"


def get_sso_endpoint(endpoint):
"""Get the SSO OpenID Configuration endpoint."""
insights_sso_server = read_insights_config().get(CONFIG_SSO_HOST_KEY)
url = f"https://{insights_sso_server}{OPENID_CONFIG_ENDPOINT}" # Always SSL
try:
logger.info(_(messages.INSIGHTS_SSO_CONFIG_QUERY), url, endpoint)
response = requests.get(url)
except ConnectionError as err:
raise err
except BaseHTTPError as err:
raise err

config = response.json()
if endpoint not in config:
raise InsightsAuthError(_(messages.INSIGHTS_SSO_QUERY_FAILED % endpoint))
return config[endpoint]


class InsightsAuth:
Expand All @@ -39,18 +58,16 @@ def request_auth(self):
"""
self.auth_request = None

config = read_insights_config()
insights_sso_server = config.get(CONFIG_SSO_HOST_KEY)
url = f"https://{insights_sso_server}{DEVICE_AUTH_ENDPOINT}" # Always SSL
headers = {"Content-Type": "application/x-www-form-urlencoded"}
params = {
"grant_type": GRANT_TYPE,
"scope": INSIGHTS_SCOPE,
"client_id": DISCOVERY_CLIENT_ID,
}
try:
logger.info(_(messages.INSIGHTS_LOGIN_REQUEST), url)
response = requests.post(url, headers=headers, data=params)
device_auth_endpoint = get_sso_endpoint(DEVICE_AUTH_ENDPOINT_KEY)
logger.info(_(messages.INSIGHTS_LOGIN_REQUEST), device_auth_endpoint)
response = requests.post(device_auth_endpoint, headers=headers, data=params)
except ConnectionError as err:
raise InsightsAuthError(_(messages.INSIGHTS_LOGIN_REQUEST_FAILED % err))
except BaseHTTPError as err:
Expand All @@ -59,19 +76,19 @@ def request_auth(self):
if response.status_code == http.HTTPStatus.OK:
self.auth_request = response.json()
logger.debug(
_(messages.INSIGHTS_RESPONSE), insights_sso_server, self.auth_request
_(messages.INSIGHTS_RESPONSE), device_auth_endpoint, self.auth_request
)
else:
logger.debug(
_(messages.INSIGHTS_RESPONSE), insights_sso_server, response.text
_(messages.INSIGHTS_RESPONSE), device_auth_endpoint, response.text
)
raise InsightsAuthError(
_(messages.INSIGHTS_LOGIN_REQUEST_FAILED % response.reason)
)

return self.auth_request

def wait_for_authorization(self):
def wait_for_authorization(self): # noqa: C901 PLR0912
"""Wait for the user to log in and authorize the request.
:returns: user JWT token
Expand All @@ -83,19 +100,22 @@ def wait_for_authorization(self):

elapsed_time = 0
self.auth_token = None

token_endpoint = None
while not self.auth_token:
config = read_insights_config()
insights_sso_server = config.get(CONFIG_SSO_HOST_KEY)
url = f"https://{insights_sso_server}{TOKEN_ENDPOINT}" # Always SSL
headers = {"Content-Type": "application/x-www-form-urlencoded"}
params = {
"grant_type": GRANT_TYPE,
"client_id": DISCOVERY_CLIENT_ID,
"device_code": device_code,
}
try:
logger.debug(_(messages.INSIGHTS_LOGIN_VERIFYING), url)
response = requests.post(url, headers=headers, data=params)
if not token_endpoint:
token_endpoint = get_sso_endpoint(TOKEN_ENDPOINT_KEY)
logger.debug(_(messages.INSIGHTS_LOGIN_VERIFYING), token_endpoint)
response = requests.post(
token_endpoint, headers=headers, data=params
)
except ConnectionError as err:
raise InsightsAuthError(
_(messages.INSIGHTS_LOGIN_VERIFICATION_FAILED % err)
Expand All @@ -115,7 +135,7 @@ def wait_for_authorization(self):
if response_error == "expired_token":
logger.debug(
_(messages.INSIGHTS_RESPONSE),
insights_sso_server,
token_endpoint,
response.text,
)
raise InsightsAuthError(
Expand All @@ -124,7 +144,7 @@ def wait_for_authorization(self):
if response_error != "authorization_pending":
logger.debug(
_(messages.INSIGHTS_RESPONSE),
insights_sso_server,
token_endpoint,
response.text,
)
raise InsightsAuthError(
Expand All @@ -136,13 +156,13 @@ def wait_for_authorization(self):
else:
logger.debug(
_(messages.INSIGHTS_RESPONSE),
insights_sso_server,
token_endpoint,
self.token_response,
)
else:
logger.debug(
_(messages.INSIGHTS_RESPONSE),
insights_sso_server,
token_endpoint,
response.text,
)
raise InsightsAuthError(
Expand Down
118 changes: 73 additions & 45 deletions qpc/insights/test_insights_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,25 @@
from unittest.mock import patch

import pytest

from qpc.insights.auth import DEVICE_AUTH_ENDPOINT, TOKEN_ENDPOINT, InsightsAuth
from faker import Faker

from qpc.insights.auth import (
DEVICE_AUTH_ENDPOINT_KEY,
OPENID_CONFIG_ENDPOINT,
TOKEN_ENDPOINT_KEY,
InsightsAuth,
)
from qpc.insights.exceptions import InsightsAuthError
from qpc.utils import CONFIG_SSO_HOST_KEY, read_insights_config, write_insights_config

fake = Faker()


def get_sso_config_url():
"""Return the Insights SSO Configuration endpoint."""
sso_host = read_insights_config().get(CONFIG_SSO_HOST_KEY)
return f"https://{sso_host}{OPENID_CONFIG_ENDPOINT}"


class TestInsightsAuth:
"""Class for testing Insights Device Auth support."""
Expand Down Expand Up @@ -38,19 +52,20 @@ def stage_sso_host(self, faker):
"""Return an alternate staging sso host."""
return f"stage-{faker.hostname()}"

def test_insights_request_auth(self, login_auth_response, faker, requests_mock):
"""Testing that insights auth targets the right endpoint."""
sso_host = read_insights_config().get(CONFIG_SSO_HOST_KEY)
sso_url = f"https://{sso_host}{DEVICE_AUTH_ENDPOINT}"
requests_mock.post(sso_url, json=login_auth_response)
assert InsightsAuth().request_auth() == login_auth_response

def test_insights_request_auth_alt_sso_host(
self, stage_sso_host, login_auth_response, faker, requests_mock
@pytest.mark.parametrize(
"alt_sso_host",
[None, f"stage-{fake.hostname()}"],
ids=["default_sso_host", "stage_sso_host"],
)
def test_insights_request_auth(
self, login_auth_response, faker, requests_mock, alt_sso_host
):
"""Testing that insights auth targets alternate sso hosts."""
write_insights_config({CONFIG_SSO_HOST_KEY: stage_sso_host})
sso_url = f"https://{stage_sso_host}{DEVICE_AUTH_ENDPOINT}"
"""Testing that insights auth targets the right endpoint."""
if alt_sso_host:
write_insights_config({CONFIG_SSO_HOST_KEY: alt_sso_host})
sso_url = faker.url()
sso_config = {DEVICE_AUTH_ENDPOINT_KEY: sso_url}
requests_mock.get(get_sso_config_url(), json=sso_config)
requests_mock.post(sso_url, json=login_auth_response)
assert InsightsAuth().request_auth() == login_auth_response

Expand All @@ -59,28 +74,35 @@ def test_insights_request_auth_connection_error(self, stage_sso_host):
write_insights_config({CONFIG_SSO_HOST_KEY: stage_sso_host})
with pytest.raises(InsightsAuthError) as err:
InsightsAuth().request_auth()
assert "Failed to request login authorization" in err.value
assert "Failed to request login authorization" in str(err.value)

def test_insights_request_auth_invalid_sso_config(self, faker, requests_mock):
"""Testing that insights auth handles invalid SSO configuration errors."""
with pytest.raises(InsightsAuthError) as err:
sso_config = {faker.slug(): faker.hostname()}
requests_mock.get(get_sso_config_url(), json=sso_config)
InsightsAuth().request_auth()
assert (
"Failed to query the Insights SSO configuration:"
f" missing {DEVICE_AUTH_ENDPOINT_KEY}" in str(err.value)
)

@pytest.mark.parametrize(
"alt_sso_host",
[None, f"stage-{fake.hostname()}"],
ids=["default_sso_host", "stage_sso_host"],
)
def test_insights_wait_for_authorization_authorized(
self, login_auth_response, auth_token, requests_mock
self, login_auth_response, auth_token, faker, requests_mock, alt_sso_host
):
"""Testing that insights wait for authorization returns the auth token."""
if alt_sso_host:
write_insights_config({CONFIG_SSO_HOST_KEY: alt_sso_host})
insights_auth = InsightsAuth()
insights_auth.auth_request = login_auth_response
sso_host = read_insights_config().get(CONFIG_SSO_HOST_KEY)
sso_token_url = f"https://{sso_host}{TOKEN_ENDPOINT}"
token_response = {"access_token": auth_token}
requests_mock.post(sso_token_url, json=token_response)
assert insights_auth.wait_for_authorization() == auth_token

def test_insights_wait_for_authorization_authorized_alt_sso_host(
self, stage_sso_host, login_auth_response, auth_token, requests_mock
):
"""Testing that insights wait for authorization handles connection errors."""
write_insights_config({CONFIG_SSO_HOST_KEY: stage_sso_host})
insights_auth = InsightsAuth()
insights_auth.auth_request = login_auth_response
sso_token_url = f"https://{stage_sso_host}{TOKEN_ENDPOINT}"
sso_token_url = faker.url()
sso_config = {TOKEN_ENDPOINT_KEY: sso_token_url}
requests_mock.get(get_sso_config_url(), json=sso_config)
token_response = {"access_token": auth_token}
requests_mock.post(sso_token_url, json=token_response)
assert insights_auth.wait_for_authorization() == auth_token
Expand All @@ -94,38 +116,43 @@ def test_insights_wait_for_authorization_connection_error(
insights_auth.auth_request = login_auth_response
with pytest.raises(InsightsAuthError) as err:
insights_auth.wait_for_authorization()
assert "Failed to verify Login authorization" in err.value
assert "Failed to verify Login authorization" in str(err.value)

def test_insights_wait_for_authorization_bad_request(
self, login_auth_response, auth_token, requests_mock
self, login_auth_response, auth_token, faker, requests_mock
):
"""Testing that wait for authorization fails with failed checks."""
insights_auth = InsightsAuth()
insights_auth.auth_request = login_auth_response
sso_host = read_insights_config().get(CONFIG_SSO_HOST_KEY)
sso_token_url = f"https://{sso_host}{TOKEN_ENDPOINT}"
sso_token_url = faker.url()
sso_config = {TOKEN_ENDPOINT_KEY: sso_token_url}
requests_mock.get(get_sso_config_url(), json=sso_config)
token_response = {"error": "unknown_error"}
requests_mock.post(
sso_token_url, status_code=http.HTTPStatus.BAD_REQUEST, json=token_response
)
with pytest.raises(InsightsAuthError) as err:
insights_auth.wait_for_authorization()
assert "Failed to verify Login authorization" in err.value
assert "Failed to verify Login authorization" in str(err.value)

@patch("time.sleep")
def test_insights_wait_for_authorization_expired(self, faker, requests_mock):
def test_insights_wait_for_authorization_expired(
self, mock_sleep, faker, requests_mock
):
"""Testing that authorization pending checks for expired time."""
insights_auth = InsightsAuth()
insights_auth.auth_request = {
"device_code": faker.slug(),
"user_code": faker.slug(),
"verification_uri": faker.url(),
"verification_uri_complete": faker.url(),
"expires_in": 1,
"interval": 0,
"expires_in": 0,
"interval": 1,
}
sso_host = read_insights_config().get(CONFIG_SSO_HOST_KEY)
sso_token_url = f"https://{sso_host}{TOKEN_ENDPOINT}"
mock_sleep.return_value = None
sso_token_url = faker.url()
sso_config = {TOKEN_ENDPOINT_KEY: sso_token_url}
requests_mock.get(get_sso_config_url(), json=sso_config)
token_response = {
"error": "authorization_pending",
"error_description": "The authorization request is still pending",
Expand All @@ -135,16 +162,17 @@ def test_insights_wait_for_authorization_expired(self, faker, requests_mock):
)
with pytest.raises(InsightsAuthError) as err:
insights_auth.wait_for_authorization()
assert "Time-out while waiting for Login authorization" in err.value
assert "Time-out while waiting for Login authorization" in str(err.value)

def test_insights_wait_for_authorization_expired_from_sso(
self, login_auth_response, requests_mock
self, login_auth_response, faker, requests_mock
):
"""Testing that token authorization expired from the sso server."""
insights_auth = InsightsAuth()
insights_auth.auth_request = login_auth_response
sso_host = read_insights_config().get(CONFIG_SSO_HOST_KEY)
sso_token_url = f"https://{sso_host}{TOKEN_ENDPOINT}"
sso_token_url = faker.url()
sso_config = {TOKEN_ENDPOINT_KEY: sso_token_url}
requests_mock.get(get_sso_config_url(), json=sso_config)
token_response = {
"error": "expired_token",
"error_description": "Device code is expired",
Expand All @@ -154,4 +182,4 @@ def test_insights_wait_for_authorization_expired_from_sso(
)
with pytest.raises(InsightsAuthError) as err:
insights_auth.wait_for_authorization()
assert "Time-out while waiting for Login authorization" in err.value
assert "Time-out while waiting for Login authorization" in str(err.value)
2 changes: 2 additions & 0 deletions qpc/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,8 @@
INSIGHTS_TOKEN_EXPIRED = "Authorization token expired, please re-login to Insights"
INSIGHTS_TOKEN_CORRUPT = "Corrupt Authorization token, please re-login to Insights"
INSIGHTS_TOKEN_INVALID = "Invalid Authorization token, please re-login to Insights"
INSIGHTS_SSO_CONFIG_QUERY = "Querying Insights SSO configuration at %s for %s"
INSIGHTS_SSO_QUERY_FAILED = "Failed to query the Insights SSO configuration: missing %s"
INSIGHTS_AUTH_ERROR = "Authorization failed, please re-login to Insights"
INSIGHTS_LOGIN_REQUEST = "Requesting Login authorization from %s"
INSIGHTS_LOGIN_REQUEST_FAILED = "Failed to request login authorization: %s"
Expand Down

0 comments on commit 15281c6

Please sign in to comment.