diff --git a/tidalapi/session.py b/tidalapi/session.py index 91e3cb2..4720334 100644 --- a/tidalapi/session.py +++ b/tidalapi/session.py @@ -81,12 +81,21 @@ class LinkLogin: verification_uri: str #: The link the user has to visit, with the code already included verification_uri_complete: str + #: After how much time the uri expires. + expires_in: float + #: The interval for authorization checks against the backend. + interval: float + #: The unique device code necessary for authorization. + device_code: str def __init__(self, json: JsonObj): self.expires_in = int(json["expiresIn"]) self.user_code = str(json["userCode"]) self.verification_uri = str(json["verificationUri"]) self.verification_uri_complete = str(json["verificationUriComplete"]) + self.expires_in = float(json["expiresIn"]) + self.interval = float(json["interval"]) + self.device_code = str(json["deviceCode"]) class Config: @@ -600,14 +609,17 @@ def login_oauth_simple(self, fn_print: Callable[[str], None] = print) -> None: def login_oauth(self) -> Tuple[LinkLogin, concurrent.futures.Future[Any]]: """Login to TIDAL with a remote link for limited input devices. The function will return everything you need to log in through a web browser, and will return - an future that will run until login. + a future that will run until login. :return: A :class:`LinkLogin` object containing all the data needed to log in remotely, and - a :class:`concurrent.futures.Future` that will poll until the login is completed, or until the link expires. + a :class:`concurrent.futures.Future` object that will poll until the login is completed, or until the link expires. + :rtype: :class:`LinkLogin` :raises: TimeoutError: If the login takes too long """ - login, future = self._login_with_link() - return login, future + link_login: LinkLogin = self.get_link_login() + executor = concurrent.futures.ThreadPoolExecutor() + + return link_login, executor.submit(self.process_link_login, link_login) def save_session_to_file(self, session_file: Path): # create a new session @@ -644,7 +656,13 @@ def load_session_from_file(self, session_file: Path): return self.load_oauth_session(**args) - def _login_with_link(self) -> Tuple[LinkLogin, concurrent.futures.Future[Any]]: + def get_link_login(self) -> LinkLogin: + """Return information required to login into TIDAL using a device authorization + link. + + :return: Login information for device authorization retrieved from the TIDAL backend. + :rtype: :class:`LinkLogin` + """ url = "https://auth.tidal.com/v1/oauth2/device_authorization" params = {"client_id": self.config.client_id, "scope": "r_usr w_usr w_sub"} @@ -655,16 +673,31 @@ def _login_with_link(self) -> Tuple[LinkLogin, concurrent.futures.Future[Any]]: request.raise_for_status() json = request.json() - executor = concurrent.futures.ThreadPoolExecutor() - return LinkLogin(json), executor.submit(self._process_link_login, json) - def _process_link_login(self, json: JsonObj) -> None: - json = self._wait_for_link_login(json) - self.process_auth_token(json, is_pkce_token=False) + return LinkLogin(json) + + def process_link_login( + self, link_login: LinkLogin, until_expiry: bool = True + ) -> bool: + """Checks if device authorization was successful and processes the retrieved + OAuth token from the Backend. + + :param link_login: Link login information containing the necessary device authorization information. + :type link_login: :class:`LinkLogin` + :param until_expiry: If `True` device authorization check is running until the link expires. If `False`check is running only once. + :type until_expiry: :class:`bool` + + :return: `True` if login was successful. + :rtype: bool + """ + result: JsonObj = self._check_link_login(link_login, until_expiry) + result_process: bool = self.process_auth_token(result, is_pkce_token=False) + + return result_process def process_auth_token( self, json: dict[str, Union[str, int]], is_pkce_token: bool = True - ) -> None: + ) -> bool: """Parses the authorization response and sets the token values to the specific variables for further usage. @@ -672,7 +705,8 @@ def process_auth_token( :type json: dict[str, str | int] :param is_pkce_token: Set true if current token is obtained using PKCE :type is_pkce_token: bool - :return: None + :return: `True` if no error occurs. + :rtype: bool """ self.access_token = json["access_token"] self.expiry_time = datetime.datetime.utcnow() + datetime.timedelta( @@ -687,28 +721,45 @@ def process_auth_token( self.user = user.User(self, user_id=json["userId"]).factory() self.is_pkce = is_pkce_token - def _wait_for_link_login(self, json: JsonObj) -> Any: - expiry = float(json["expiresIn"]) - interval = float(json["interval"]) - device_code = json["deviceCode"] + return True + + def _check_link_login( + self, link_login: LinkLogin, until_expiry: bool = True + ) -> TimeoutError | JsonObj: + """Checks if device authorization was successful and retrieves OAuth data. Can + check the backend for successful device authrization until the link expires + (with the given interval) or just once. + + :param link_login: Link login information containing the necessary device authorization information. + :type link_login: :class:`LinkLogin` + :param until_expiry: If `True` device authorization check is running until the link expires. If `False`check is running only once. + :type until_expiry: :class:`bool` + :return: Raise :class:`TimeoutError` if the link has expired otherwise returns retrieved OAuth information. + :rtype: :class:`TimeoutError` | :class:`JsonObj` + """ + expiry: float = link_login.expires_in if until_expiry else 1 url = self.config.api_oauth2_token params = { "client_id": self.config.client_id, "client_secret": self.config.client_secret, - "device_code": device_code, + "device_code": link_login.device_code, "grant_type": "urn:ietf:params:oauth:grant-type:device_code", "scope": "r_usr w_usr w_sub", } + while expiry > 0: request = self.request_session.post(url, params) - json = request.json() + result: JsonObj = request.json() + if request.ok: - return json + return result + # Because the requests take time, the expiry variable won't be accurate, so stop if TIDAL says it's expired - if json["error"] == "expired_token": + if result["error"] == "expired_token": break - time.sleep(interval) - expiry = expiry - interval + + time.sleep(link_login.interval) + expiry = expiry - link_login.interval raise TimeoutError("You took too long to log in")