Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x OAuth improvements #2559

Merged
merged 6 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions picard/browser/browser.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
# Copyright (C) 2006-2007, 2011 Lukáš Lalinský
# Copyright (C) 2011-2013 Michael Wiencek
# Copyright (C) 2012 Chad Wilson
# Copyright (C) 2012-2013, 2018, 2021 Philipp Wolfer
# Copyright (C) 2013, 2018, 2020-2021 Laurent Monin
# Copyright (C) 2012-2013, 2018, 2021-2022, 2024 Philipp Wolfer
# Copyright (C) 2013, 2018, 2020-2021, 2024 Laurent Monin
# Copyright (C) 2016 Suhas
# Copyright (C) 2016-2017 Sambhav Kothari
# Copyright (C) 2018 Vishal Choudhary
Expand All @@ -25,7 +25,6 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.


from http.server import (
BaseHTTPRequestHandler,
HTTPServer,
Expand All @@ -47,6 +46,7 @@
)
from picard.browser import addrelease
from picard.config import get_config
from picard.oauth import OAuthInvalidStateError
from picard.util import mbid_validate
from picard.util.thread import to_main

Expand Down Expand Up @@ -184,6 +184,8 @@ def _handle_get(self):
self._load_mbid('nat', args)
elif action == '/add' and addrelease.is_available():
self._add_release(args)
elif action == '/auth':
self._auth(args)
else:
self._response(404, 'Unknown action.')

Expand Down Expand Up @@ -211,6 +213,26 @@ def _add_release(self, args):
else:
self._response(400, 'Missing parameter "token".')

def _auth(self, args):
if 'code' in args and args['code']:
tagger = QtCore.QCoreApplication.instance()
oauth_manager = tagger.webservice.oauth_manager
try:
state = args.get('state', [''])[0]
callback = oauth_manager.verify_state(state)
except OAuthInvalidStateError:
self._response(400, 'Invalid "state" parameter.')
return
to_main(
oauth_manager.exchange_authorization_code,
authorization_code=args['code'][0],
scopes='profile tag rating collection submit_isrc submit_barcode',
callback=callback,
)
self._response(200, "Authentication successful, you can close this window now.", 'text/html')
else:
self._response(400, 'Missing parameter "code".')

def _response(self, code, content='', content_type='text/plain'):
self.server_version = SERVER_VERSION
self.send_response(code)
Expand Down
152 changes: 143 additions & 9 deletions picard/oauth.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# Copyright (C) 2014 Lukáš Lalinský
# Copyright (C) 2015 Sophist-UK
# Copyright (C) 2015 Wieland Hoffmann
# Copyright (C) 2015, 2018, 2021 Philipp Wolfer
# Copyright (C) 2015, 2018, 2021-2022, 2024 Philipp Wolfer
# Copyright (C) 2016-2017 Sambhav Kothari
# Copyright (C) 2017 Frederik “Freso” S. Olesen
# Copyright (C) 2018-2022 Laurent Monin
Expand All @@ -24,9 +24,11 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.


from base64 import urlsafe_b64encode
from functools import partial
from hashlib import sha256
from json.decoder import JSONDecodeError
import secrets
import time
import urllib.parse

Expand All @@ -42,10 +44,35 @@
)


OOB_URI = 'urn:ietf:wg:oauth:2.0:oob'


class OAuthInvalidStateError(Exception):
pass


class OAuthManager(object):

def __init__(self, webservice):
self.webservice = webservice
# Associates state tokens with callbacks
self.__states = {}
self._redirect_uri = OOB_URI

@property
def redirect_uri(self):
return self._redirect_uri

@redirect_uri.setter
def redirect_uri(self, redirect_uri):
if not redirect_uri:
self._redirect_uri = OOB_URI
else:
self._redirect_uri = redirect_uri

@property
def is_oob(self):
return self.redirect_uri == OOB_URI

@property
def setting(self):
Expand Down Expand Up @@ -127,10 +154,48 @@ def is_authorized(self):
def is_logged_in(self):
return self.is_authorized() and bool(self.username)

def revoke_tokens(self):
# TODO actually revoke the tokens on MB (I think it's not implementented there)
self.forget_refresh_token()
self.forget_access_token()
def revoke_tokens(self, callback):
# Actually revoke the tokens on MB.
# From https://musicbrainz.org/doc/Development/OAuth2#Revoking_a_token :
# "If your application is installed or offline and token is a
# refresh token, we'll revoke the entire authorization grant associated
# with that token."
log.debug("OAuth: Revoking authorization grant")
self._revoke_token(self.refresh_token, callback)

def _revoke_token(self, token, callback):
params = {
'token': token,
'client_id': MUSICBRAINZ_OAUTH_CLIENT_ID,
'client_secret': MUSICBRAINZ_OAUTH_CLIENT_SECRET,
}
self.webservice.post_url(
url=self.url(path="/oauth2/revoke"),
data=self._query_data(params),
handler=partial(self._on_revoke_token_finished, callback),
mblogin=True,
priority=True,
important=True,
request_mimetype='application/x-www-form-urlencoded',
parse_response_type=False,
)

def _on_revoke_token_finished(self, callback, data, http, error):
successful = False
error_msg = None
try:
if error:
log.error("OAuth: revoking token failed: %s", error)
error_msg = self._extract_error_description(http, data)
else:
self.forget_refresh_token()
self.forget_access_token()
successful = True
except Exception as e:
log.error("OAuth: Unexpected error handling token revocation response: %r", e)
error_msg = _("Unexpected token revocation error")
finally:
callback(successful=successful, error_msg=error_msg)

def forget_refresh_token(self):
del self.refresh_token
Expand All @@ -156,13 +221,45 @@ def url(self, path=None, params=None):
queryargs=params
)

def get_authorization_url(self, scopes):
def _create_code_challenge(self):
# see https://datatracker.ietf.org/doc/html/rfc7636#section-4.1
# and https://datatracker.ietf.org/doc/html/rfc7636#appendix-B
code_verifier = base64url_encode(secrets.token_bytes(32))
self.__code_verifier = code_verifier.decode('ASCII')
code_challenge = s256_encode(code_verifier) # code_challenge_method=S256
return code_challenge.decode('ASCII')

def _create_auth_state(self, callback):
state = secrets.token_urlsafe(16)
self.__states[state] = callback
return state

def verify_state(self, state):
"""Verifies a state variable used in an authorization URL.

On success returns a callback associated with this state.
If the state is invalid raises OAuthInvalidStateError. Can only be
called once on a state, the state itself will be revoked afterwards.
"""
try:
callback = self.__states[state]
del self.__states[state]
return callback
except KeyError:
raise OAuthInvalidStateError

def get_authorization_url(self, scopes, callback: callable):
params = {
'response_type': 'code',
'client_id': MUSICBRAINZ_OAUTH_CLIENT_ID,
'redirect_uri': "urn:ietf:wg:oauth:2.0:oob",
'redirect_uri': self.redirect_uri,
'code_challenge_method': 'S256',
'code_challenge': self._create_code_challenge(),
'scope': scopes,
'access_type': 'offline',
}
if not self.is_oob:
params['state'] = self._create_auth_state(callback)
return bytes(self.url(path="/oauth2/authorize", params=params).toEncoded()).decode()

def set_refresh_token(self, refresh_token, scopes):
Expand Down Expand Up @@ -221,7 +318,8 @@ def exchange_authorization_code(self, authorization_code, scopes, callback):
'code': authorization_code,
'client_id': MUSICBRAINZ_OAUTH_CLIENT_ID,
'client_secret': MUSICBRAINZ_OAUTH_CLIENT_SECRET,
'redirect_uri': "urn:ietf:wg:oauth:2.0:oob",
'redirect_uri': self.redirect_uri,
'code_verifier': self.__code_verifier,
}
self.webservice.post_url(
url=self.url(path="/oauth2/token"),
Expand Down Expand Up @@ -286,3 +384,39 @@ def _extract_error_description(self, http, data):
return response['error_description']
except (JSONDecodeError, KeyError, TypeError):
return _("Unexpected request error (HTTP code %s)") % self._http_code(http)


def s256_encode(input: bytes) -> bytes:
"""Implements the S256 code challenge encoding as defined for PKCE in RFC 7636.

The input data gets hashed by SHA256 and Base64url encoded.

See also https://datatracker.ietf.org/doc/html/rfc7636#section-4.2

Args:
input (bytes): Input bytes to encode. Is expected to consist only of ASCII characters.

Returns:
bytes: encoded data
"""
return base64url_encode(sha256(input).digest())


def base64url_encode(input: bytes) -> bytes:
"""Implements the Base64url Encoding as defined for PKCE in RFC 7636.

Base64 encoding using the URL- and filename-safe character set
defined in Section 5 of [RFC4648], with all trailing '='
characters omitted (as permitted by Section 3.2 of [RFC4648]) and
without the inclusion of any line breaks, whitespace, or other
additional characters.

See also https://datatracker.ietf.org/doc/html/rfc7636#appendix-A

Args:
s (bytes): Input bytes to encode.

Returns:
bytes: Base64url encoded data
"""
return urlsafe_b64encode(input).rstrip(b'=')
48 changes: 34 additions & 14 deletions picard/tagger.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# Copyright (C) 2006-2009, 2011-2014, 2017 Lukáš Lalinský
# Copyright (C) 2008 Gary van der Merwe
# Copyright (C) 2008 amckinle
# Copyright (C) 2008-2010, 2014-2015, 2018-2023 Philipp Wolfer
# Copyright (C) 2008-2010, 2014-2015, 2018-2024 Philipp Wolfer
# Copyright (C) 2009 Carlin Mangar
# Copyright (C) 2010 Andrew Barnert
# Copyright (C) 2011-2014 Michael Wiencek
Expand Down Expand Up @@ -357,7 +357,7 @@ def __init__(self, picard_args, localedir, autoupdate, pipe_handler=None):
self.pluginmanager.load_plugins_from_directory(plugin_dir)

self.browser_integration = BrowserIntegration()
self.browser_integration.listen_port_changed.connect(self.listen_port_changed)
self.browser_integration.listen_port_changed.connect(self.on_listen_port_changed)

self._pending_files_count = 0
self.files = {}
Expand Down Expand Up @@ -638,6 +638,10 @@ def set_log_level(self, level):
self._debug = level == logging.DEBUG
log.set_level(level)

def on_listen_port_changed(self, port):
self.webservice.oauth_manager.redirect_uri = self._mb_login_redirect_uri()
self.listen_port_changed.emit(port)

def _mb_login_dialog(self, parent):
if not parent:
parent = self.window
Expand All @@ -651,17 +655,28 @@ def _mb_login_dialog(self, parent):
else:
return None

def _mb_login_redirect_uri(self):
if self.browser_integration and self.browser_integration.is_running:
return f'http://localhost:{self.browser_integration.port}/auth'
else:
# If browser integration is disabled or not running on the standard
# port use out-of-band flow (with manual copying of the token).
return None

def mb_login(self, callback, parent=None):
oauth_manager = self.webservice.oauth_manager
scopes = 'profile tag rating collection submit_isrc submit_barcode'
authorization_url = self.webservice.oauth_manager.get_authorization_url(scopes)
authorization_url = oauth_manager.get_authorization_url(
scopes, partial(self.on_mb_authorization_finished, callback))
webbrowser2.open(authorization_url)
authorization_code = self._mb_login_dialog(parent)
if authorization_code is not None:
self.webservice.oauth_manager.exchange_authorization_code(
authorization_code, scopes,
partial(self.on_mb_authorization_finished, callback))
else:
callback(False, None)
if oauth_manager.is_oob:
authorization_code = self._mb_login_dialog(parent)
if authorization_code is not None:
self.webservice.oauth_manager.exchange_authorization_code(
authorization_code, scopes,
partial(self.on_mb_authorization_finished, callback))
else:
callback(False, None)

def on_mb_authorization_finished(self, callback, successful=False, error_msg=None):
if successful:
Expand All @@ -670,15 +685,20 @@ def on_mb_authorization_finished(self, callback, successful=False, error_msg=Non
else:
callback(False, error_msg)

@classmethod
def on_mb_login_finished(self, callback, successful, error_msg):
if successful:
load_user_collections()
callback(successful, error_msg)

def mb_logout(self):
self.webservice.oauth_manager.revoke_tokens()
load_user_collections()
def mb_logout(self, callback):
self.webservice.oauth_manager.revoke_tokens(
partial(self.on_mb_logout_finished, callback)
)

def on_mb_logout_finished(self, callback, successful, error_msg):
if successful:
load_user_collections()
callback(successful, error_msg)

def move_files_to_album(self, files, albumid=None, album=None):
"""Move `files` to tracks on album `albumid`."""
Expand Down
Loading
Loading