diff --git a/src/azure-cli-core/azure/cli/core/auth/credential_adaptor.py b/src/azure-cli-core/azure/cli/core/auth/credential_adaptor.py index 15963bf44f6..69365cb45a8 100644 --- a/src/azure-cli-core/azure/cli/core/auth/credential_adaptor.py +++ b/src/azure-cli-core/azure/cli/core/auth/credential_adaptor.py @@ -24,15 +24,24 @@ def __init__(self, credential, auxiliary_credentials=None): self._auxiliary_credentials = auxiliary_credentials def get_token(self, *scopes, **kwargs): - """Get an access token from the main credential.""" + """Implement the old SDK token protocol azure.core.credentials.TokenCredential + Return azure.core.credentials.AccessToken + """ logger.debug("CredentialAdaptor.get_token: scopes=%r, kwargs=%r", scopes, kwargs) - # Discard unsupported kwargs: tenant_id, enable_cae - filtered_kwargs = {} - if 'data' in kwargs: - filtered_kwargs['data'] = kwargs['data'] + msal_kwargs = _prepare_msal_kwargs(kwargs) + msal_result = self._credential.acquire_token(list(scopes), **msal_kwargs) + return build_sdk_access_token(msal_result) + + def get_token_info(self, *scopes, options=None): + """Implement the new SDK token protocol azure.core.credentials.SupportsTokenInfo + Return azure.core.credentials.AccessTokenInfo + """ + logger.debug("CredentialAdaptor.get_token_info: scopes=%r, options=%r", scopes, options) - return build_sdk_access_token(self._credential.acquire_token(list(scopes), **filtered_kwargs)) + msal_kwargs = _prepare_msal_kwargs(options) + msal_result = self._credential.acquire_token(list(scopes), **msal_kwargs) + return _build_sdk_access_token_info(msal_result) def get_auxiliary_tokens(self, *scopes, **kwargs): """Get access tokens from auxiliary credentials.""" @@ -41,3 +50,33 @@ def get_auxiliary_tokens(self, *scopes, **kwargs): return [build_sdk_access_token(cred.acquire_token(list(scopes), **kwargs)) for cred in self._auxiliary_credentials] return None + + +def _prepare_msal_kwargs(options=None): + # Preserve supported options and discard unsupported options (tenant_id, enable_cae). + # Both get_token's kwargs and get_token_info's options are accepted as their schema is the same (at least for now). + msal_kwargs = {} + if options: + # For VM SSH. 'data' support is a CLI-specific extension. + # SDK doesn't support 'data': https://github.com/Azure/azure-sdk-for-python/pull/16397 + if 'data' in options: + msal_kwargs['data'] = options['data'] + # For CAE + if 'claims' in options: + msal_kwargs['claims_challenge'] = options['claims'] + return msal_kwargs + + +def _build_sdk_access_token_info(token_entry): + # MSAL token entry sample: + # { + # 'access_token': 'eyJ0eXAiOiJKV...', + # 'token_type': 'Bearer', + # 'expires_in': 1618, + # 'token_source': 'cache' + # } + from .constants import ACCESS_TOKEN, EXPIRES_IN + from .util import _now_timestamp + from azure.core.credentials import AccessTokenInfo + + return AccessTokenInfo(token_entry[ACCESS_TOKEN], _now_timestamp() + token_entry[EXPIRES_IN]) diff --git a/src/azure-cli-core/azure/cli/core/auth/msal_credentials.py b/src/azure-cli-core/azure/cli/core/auth/msal_credentials.py index c8f568df2c3..01467188291 100644 --- a/src/azure-cli-core/azure/cli/core/auth/msal_credentials.py +++ b/src/azure-cli-core/azure/cli/core/auth/msal_credentials.py @@ -43,22 +43,23 @@ def __init__(self, client_id, username, **kwargs): self._account = accounts[0] - def acquire_token(self, scopes, claims=None, **kwargs): + def acquire_token(self, scopes, claims_challenge=None, **kwargs): # scopes must be a list. # For acquiring SSH certificate, scopes is ['https://pas.windows.net/CheckMyAccess/Linux/.default'] # kwargs is already sanitized by CredentialAdaptor, so it can be safely passed to MSAL - logger.debug("UserCredential.acquire_token: scopes=%r, claims=%r, kwargs=%r", scopes, claims, kwargs) + logger.debug("UserCredential.acquire_token: scopes=%r, claims_challenge=%r, kwargs=%r", + scopes, claims_challenge, kwargs) - if claims: + if claims_challenge: logger.warning('Acquiring new access token silently for tenant %s with claims challenge: %s', - self._msal_app.authority.tenant, claims) - result = self._msal_app.acquire_token_silent_with_error(scopes, self._account, claims_challenge=claims, - **kwargs) + self._msal_app.authority.tenant, claims_challenge) + result = self._msal_app.acquire_token_silent_with_error( + scopes, self._account, claims_challenge=claims_challenge, **kwargs) from azure.cli.core.azclierror import AuthenticationError try: # Check if an access token is returned. - check_result(result, scopes=scopes, claims=claims) + check_result(result, scopes=scopes, claims_challenge=claims_challenge) except AuthenticationError as ex: # For VM SSH ('data' is passed), if getting access token fails because # Conditional Access MFA step-up or compliance check is required, re-launch diff --git a/src/azure-cli-core/azure/cli/core/auth/tests/test_credential_adaptor.py b/src/azure-cli-core/azure/cli/core/auth/tests/test_credential_adaptor.py new file mode 100644 index 00000000000..e9b4829d63c --- /dev/null +++ b/src/azure-cli-core/azure/cli/core/auth/tests/test_credential_adaptor.py @@ -0,0 +1,90 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + + +import unittest +from unittest import mock + +from ..credential_adaptor import CredentialAdaptor + + +MOCK_ACCESS_TOKEN = "mock_access_token" +MOCK_DATA = { + 'key_id': 'test', + 'req_cnf': 'test', + 'token_type': 'ssh-cert' +} +MOCK_CLAIMS = {"test_claims": "value2"} + +class MsalCredentialStub: + + def __init__(self, *args, **kwargs): + self.acquire_token_scopes = None + self.acquire_token_claims_challenge = None + self.acquire_token_kwargs = None + super().__init__() + + def acquire_token(self, scopes, claims_challenge=None, **kwargs): + self.acquire_token_scopes = scopes + self.acquire_token_claims_challenge = claims_challenge + self.acquire_token_kwargs = kwargs + return { + 'access_token': MOCK_ACCESS_TOKEN, + 'token_type': 'Bearer', + 'expires_in': 1800, + 'token_source': 'cache' + } + +def _now_timestamp_mock(): + # 2021-09-06 08:55:23 + return 1630918523 + + +class TestCredentialAdaptor(unittest.TestCase): + + @mock.patch('azure.cli.core.auth.util._now_timestamp', new=_now_timestamp_mock) + def test_get_token(self): + msal_cred = MsalCredentialStub() + sdk_cred = CredentialAdaptor(msal_cred) + access_token = sdk_cred.get_token('https://management.core.windows.net//.default') + assert msal_cred.acquire_token_scopes == ['https://management.core.windows.net//.default'] + + from ..util import AccessToken + assert isinstance(access_token, AccessToken) + assert access_token.token == MOCK_ACCESS_TOKEN + assert access_token.expires_on == 1630920323 + + # Note that SDK doesn't support 'data'. This is a CLI-specific extension. + sdk_cred.get_token('https://management.core.windows.net//.default', data=MOCK_DATA) + assert msal_cred.acquire_token_kwargs['data'] == MOCK_DATA + + sdk_cred.get_token('https://management.core.windows.net//.default', claims=MOCK_CLAIMS) + assert msal_cred.acquire_token_claims_challenge == MOCK_CLAIMS + + + @mock.patch('azure.cli.core.auth.util._now_timestamp', new=_now_timestamp_mock) + def test_get_token_info(self): + msal_cred = MsalCredentialStub() + sdk_cred = CredentialAdaptor(msal_cred) + access_token_info = sdk_cred.get_token_info('https://management.core.windows.net//.default') + + from azure.core.credentials import AccessTokenInfo + assert isinstance(access_token_info, AccessTokenInfo) + assert access_token_info.token == MOCK_ACCESS_TOKEN + assert access_token_info.expires_on == 1630920323 + assert access_token_info.token_type == 'Bearer' + + assert msal_cred.acquire_token_scopes == ['https://management.core.windows.net//.default'] + + # Note that SDK doesn't support 'data'. If 'data' were supported, it should be tested with: + sdk_cred.get_token_info('https://management.core.windows.net//.default', options={'data': MOCK_DATA}) + assert msal_cred.acquire_token_kwargs['data'] == MOCK_DATA + + sdk_cred.get_token_info('https://management.core.windows.net//.default', options={'claims': MOCK_CLAIMS}) + assert msal_cred.acquire_token_claims_challenge == MOCK_CLAIMS + + +if __name__ == '__main__': + unittest.main() diff --git a/src/azure-cli-core/azure/cli/core/auth/util.py b/src/azure-cli-core/azure/cli/core/auth/util.py index 43679c34616..ecbe67627d5 100644 --- a/src/azure-cli-core/azure/cli/core/auth/util.py +++ b/src/azure-cli-core/azure/cli/core/auth/util.py @@ -53,7 +53,7 @@ def aad_error_handler(error, **kwargs): raise AuthenticationError(error_description, msal_error=error, recommendation=recommendation) -def _generate_login_command(scopes=None, claims=None): +def _generate_login_command(scopes=None, claims_challenge=None): login_command = ['az login'] # Rejected by Conditional Access policy, like MFA @@ -61,7 +61,7 @@ def _generate_login_command(scopes=None, claims=None): login_command.append('--scope {}'.format(' '.join(scopes))) # Rejected by CAE - if claims: + if claims_challenge: # Explicit logout is needed: https://github.com/AzureAD/microsoft-authentication-library-for-python/issues/335 return 'az logout\n' + ' '.join(login_command)