Skip to content

Commit

Permalink
Merge pull request #303 from exislow/302-return-raw-values
Browse files Browse the repository at this point in the history
Moved `LinkLogin` and `futures` to `login_oauth`.
  • Loading branch information
tehkillerbee authored Nov 27, 2024
2 parents 6e3a4e6 + 9163394 commit 9c451c7
Showing 1 changed file with 73 additions and 22 deletions.
95 changes: 73 additions & 22 deletions tidalapi/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,21 @@ class LinkLogin:
verification_uri: str
#: The link the user has to visit, with the code already included
verification_uri_complete: str
#: After how much time the uri expires.
expires_in: float
#: The interval for authorization checks against the backend.
interval: float
#: The unique device code necessary for authorization.
device_code: str

def __init__(self, json: JsonObj):
self.expires_in = int(json["expiresIn"])
self.user_code = str(json["userCode"])
self.verification_uri = str(json["verificationUri"])
self.verification_uri_complete = str(json["verificationUriComplete"])
self.expires_in = float(json["expiresIn"])
self.interval = float(json["interval"])
self.device_code = str(json["deviceCode"])


class Config:
Expand Down Expand Up @@ -600,14 +609,17 @@ def login_oauth_simple(self, fn_print: Callable[[str], None] = print) -> None:
def login_oauth(self) -> Tuple[LinkLogin, concurrent.futures.Future[Any]]:
"""Login to TIDAL with a remote link for limited input devices. The function
will return everything you need to log in through a web browser, and will return
an future that will run until login.
a future that will run until login.
:return: A :class:`LinkLogin` object containing all the data needed to log in remotely, and
a :class:`concurrent.futures.Future` that will poll until the login is completed, or until the link expires.
a :class:`concurrent.futures.Future` object that will poll until the login is completed, or until the link expires.
:rtype: :class:`LinkLogin`
:raises: TimeoutError: If the login takes too long
"""
login, future = self._login_with_link()
return login, future
link_login: LinkLogin = self.get_link_login()
executor = concurrent.futures.ThreadPoolExecutor()

return link_login, executor.submit(self.process_link_login, link_login)

def save_session_to_file(self, session_file: Path):
# create a new session
Expand Down Expand Up @@ -644,7 +656,13 @@ def load_session_from_file(self, session_file: Path):

return self.load_oauth_session(**args)

def _login_with_link(self) -> Tuple[LinkLogin, concurrent.futures.Future[Any]]:
def get_link_login(self) -> LinkLogin:
"""Return information required to login into TIDAL using a device authorization
link.
:return: Login information for device authorization retrieved from the TIDAL backend.
:rtype: :class:`LinkLogin`
"""
url = "https://auth.tidal.com/v1/oauth2/device_authorization"
params = {"client_id": self.config.client_id, "scope": "r_usr w_usr w_sub"}

Expand All @@ -655,24 +673,40 @@ def _login_with_link(self) -> Tuple[LinkLogin, concurrent.futures.Future[Any]]:
request.raise_for_status()

json = request.json()
executor = concurrent.futures.ThreadPoolExecutor()
return LinkLogin(json), executor.submit(self._process_link_login, json)

def _process_link_login(self, json: JsonObj) -> None:
json = self._wait_for_link_login(json)
self.process_auth_token(json, is_pkce_token=False)
return LinkLogin(json)

def process_link_login(
self, link_login: LinkLogin, until_expiry: bool = True
) -> bool:
"""Checks if device authorization was successful and processes the retrieved
OAuth token from the Backend.
:param link_login: Link login information containing the necessary device authorization information.
:type link_login: :class:`LinkLogin`
:param until_expiry: If `True` device authorization check is running until the link expires. If `False`check is running only once.
:type until_expiry: :class:`bool`
:return: `True` if login was successful.
:rtype: bool
"""
result: JsonObj = self._check_link_login(link_login, until_expiry)
result_process: bool = self.process_auth_token(result, is_pkce_token=False)

return result_process

def process_auth_token(
self, json: dict[str, Union[str, int]], is_pkce_token: bool = True
) -> None:
) -> bool:
"""Parses the authorization response and sets the token values to the specific
variables for further usage.
:param json: Parsed JSON response after login / authorization.
:type json: dict[str, str | int]
:param is_pkce_token: Set true if current token is obtained using PKCE
:type is_pkce_token: bool
:return: None
:return: `True` if no error occurs.
:rtype: bool
"""
self.access_token = json["access_token"]
self.expiry_time = datetime.datetime.utcnow() + datetime.timedelta(
Expand All @@ -687,28 +721,45 @@ def process_auth_token(
self.user = user.User(self, user_id=json["userId"]).factory()
self.is_pkce = is_pkce_token

def _wait_for_link_login(self, json: JsonObj) -> Any:
expiry = float(json["expiresIn"])
interval = float(json["interval"])
device_code = json["deviceCode"]
return True

def _check_link_login(
self, link_login: LinkLogin, until_expiry: bool = True
) -> TimeoutError | JsonObj:
"""Checks if device authorization was successful and retrieves OAuth data. Can
check the backend for successful device authrization until the link expires
(with the given interval) or just once.
:param link_login: Link login information containing the necessary device authorization information.
:type link_login: :class:`LinkLogin`
:param until_expiry: If `True` device authorization check is running until the link expires. If `False`check is running only once.
:type until_expiry: :class:`bool`
:return: Raise :class:`TimeoutError` if the link has expired otherwise returns retrieved OAuth information.
:rtype: :class:`TimeoutError` | :class:`JsonObj`
"""
expiry: float = link_login.expires_in if until_expiry else 1
url = self.config.api_oauth2_token
params = {
"client_id": self.config.client_id,
"client_secret": self.config.client_secret,
"device_code": device_code,
"device_code": link_login.device_code,
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
"scope": "r_usr w_usr w_sub",
}

while expiry > 0:
request = self.request_session.post(url, params)
json = request.json()
result: JsonObj = request.json()

if request.ok:
return json
return result

# Because the requests take time, the expiry variable won't be accurate, so stop if TIDAL says it's expired
if json["error"] == "expired_token":
if result["error"] == "expired_token":
break
time.sleep(interval)
expiry = expiry - interval

time.sleep(link_login.interval)
expiry = expiry - link_login.interval

raise TimeoutError("You took too long to log in")

Expand Down

0 comments on commit 9c451c7

Please sign in to comment.