Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for OAuth2 #284

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 144 additions & 19 deletions musicbrainzngs/musicbrainz.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
from musicbrainzngs import util
from musicbrainzngs import compat

from authlib.integrations.requests_client import OAuth2Session
from authlib.oauth2 import OAuth2Error

_version = "0.7.1"
_log = logging.getLogger("musicbrainzngs")
_max_retries = 8
Expand Down Expand Up @@ -81,6 +84,7 @@
'isrc': ["artists", "releases", "isrcs"],
'iswc': ["artists"],
'collection': ['releases'],
'userinfo': [],
}
VALID_BROWSE_INCLUDES = {
'artist': ["aliases"] + TAG_INCLUDES + RATING_INCLUDES + RELATION_INCLUDES,
Expand Down Expand Up @@ -303,6 +307,8 @@ def _decorator(func):
# Global authentication and endpoint details.

user = password = ""
client_id = client_secret = ""
access_token = ""
hostname = "musicbrainz.org"
https = True
_client = ""
Expand All @@ -316,6 +322,106 @@ def auth(u, p):
user = u
password = p

def oauth(oauth_id, oauth_secret):
"""Set the client id and its secret to be used in the OAuth2 authorization
request.
"""
global client_id, client_secret
client_id = oauth_id
client_secret = oauth_secret

def set_token(token):
"""Set the access token for OAuth to be used in subsequent queries to the MusicBrainz XML API that require authentication.
"""
global access_token
access_token = token

class OAuth():
"""Implement the OAuth2 API as described in MusicBrainz documentation
"""
def __init__(self, scope=[]):
self.redirect_uri = "urn:ietf:wg:oauth:2.0:oob"
self.auth_path = "/oauth2/authorize"
self.token_path ="/oauth2/token"
self.revoke_path ="/oauth2/revoke"
self.service = OAuth2Session(client_id,
redirect_uri=self.redirect_uri,
scope=scope)
def get_authorization_url(self):
"""Request an authorization URL
"""
url = compat.urlunparse((
'https',
hostname,
self.auth_path,
'',
'',
''))
return self.service.create_authorization_url(url)

def get_authorization(self, code):
"""Exchange the code against an authorization
"""
try:
url = compat.urlunparse((
'https',
hostname,
self.token_path,
'',
'',
''))
authorization = self.service.fetch_token(url,
grant_type='authorization_code',
redirect_uri=self.redirect_uri,
code=code,
client_secret=client_secret)
except OAuth2Error:
_log.debug("OAuth2 exception: code exchange failed")
return {}
set_token(authorization['access_token'])
return authorization

def refresh_token(self, refresh_token):
"""Get a new authorization with the refresh token
"""
try:
url = compat.urlunparse((
'https',
hostname,
self.token_path,
'',
'',
''))
authorization = self.service.refresh_token(url,
refresh_token=refresh_token,
client_secret=client_secret)
except OAuth2Error:
_log.debug("OAuth2 exception: refresh token failed")
return None
set_token(authorization['access_token'])
return authorization

def revoke_token(self, token):
"""Revoke an access token or a refresh token
"""
try:
url = compat.urlunparse((
'https',
hostname,
self.revoke_path,
'',
'',
''))
token = self.service.revoke_token(url,
token=token,
client_id=client_id,
client_secret=client_secret,
body="client_secret={}".format(client_secret))
except OAuth2Error:
_log.debug("OAuth2 exception: failed to revoke token")
set_token("")


def set_useragent(app, version, contact=None):
"""Set the User-Agent to be used for requests to the MusicBrainz webservice.
This must be set before requests are made."""
Expand Down Expand Up @@ -644,41 +750,53 @@ def _mb_request(path, method='GET', auth_required=AUTH_NO,
value = value.encode('utf8')
newargs.append((key, value))

# Construct the full URL for the request, including hostname and
# query string.
url = compat.urlunparse((
'https' if https else 'http',
hostname,
'/ws/2/%s' % path,
'',
compat.urlencode(newargs),
''
))
_log.debug("%s request for %s" % (method, url))

# Set up HTTP request handler and URL opener.
httpHandler = compat.HTTPHandler(debuglevel=0)
handlers = [httpHandler]

# Add credentials if required.
add_auth = False
if auth_required == AUTH_YES:
_log.debug("Auth required for %s" % url)
if not user:
_log.debug("Auth required for %s" % path)
if not (user or client_id):
raise UsageError("authorization required; "
"use auth(user, pass) first")
"use auth(user, pass) or oauth(client_id, client_secret) first")
add_auth = True

if auth_required == AUTH_IFSET and user:
_log.debug("Using auth for %s because user and pass is set" % url)
if auth_required == AUTH_IFSET and (user or client_id):
_log.debug("Using auth for %s because user and pass is set" % path)
add_auth = True

if add_auth:
if add_auth and user:
passwordMgr = _RedirectPasswordMgr()
authHandler = _DigestAuthHandler(passwordMgr)
authHandler.add_password("musicbrainz.org", (), user, password)
handlers.append(authHandler)

if add_auth and client_id:
if not access_token:
raise UsageError("The app must be authenticated; "
"use the OAuth class first")
path_prefix = '/oauth2'
# Save the previous format because OAuth requests are answered in json
# only
old_format = ws_format
set_format("json")
else:
path_prefix = '/ws/2'

# Construct the full URL for the request, including hostname and
# query string.
url = compat.urlunparse((
'https' if https else 'http',
hostname,
'%s/%s' % (path_prefix, path),
'',
compat.urlencode(newargs),
''
))
_log.debug("%s request for %s" % (method, url))

opener = compat.build_opener(*handlers)

# Make request.
Expand All @@ -691,15 +809,22 @@ def _mb_request(path, method='GET', auth_required=AUTH_NO,
# Explicitly indicate zero content length if no request data
# will be sent (avoids HTTP 411 error).
req.add_header('Content-Length', '0')
if add_auth and client_id:
req.add_header('Authorization', 'Bearer %s' % access_token)
resp = _safe_read(opener, req, body)

return parser_fun(resp)
parser_res = parser_fun(resp)
# Restore previous format
set_format(old_format)
return parser_res


def _get_auth_type(entity, id, includes):
""" Some calls require authentication. This returns
a constant (Yes, No, IfSet) for the auth status of the call.
"""
if entity == "userinfo":
return AUTH_YES
if "user-tags" in includes or "user-ratings" in includes or "user-genres" in includes:
return AUTH_YES
elif entity.startswith("collection"):
Expand Down