Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Name.com #507

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ lexicon/providers/localzone.py @ags-slc
lexicon/providers/luadns.py @analogj
lexicon/providers/memset.py @tnwhitwell
lexicon/providers/namecheap.py @pschmitt @rbelnap
lexicon/providers/namecom.py @Jamim
lexicon/providers/namesilo.py @analogj
lexicon/providers/netcup.py @coldfix
lexicon/providers/nfsn.py @tersers
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ The current supported providers are:
- Linode v4 ([docs](https://developers.linode.com/api/docs/v4#tag/Domains))
- LuaDNS ([docs](http://www.luadns.com/api.html))
- Memset ([docs](https://www.memset.com/apidocs/methods_dns.html))
- Name.com ([docs](https://www.name.com/api-docs/DNS))
- Namecheap ([docs](https://www.namecheap.com/support/api/methods.aspx))
- Namesilo ([docs](https://www.namesilo.com/api_reference.php))
- Netcup ([docs](https://ccp.netcup.net/run/webservice/servers/endpoint.php))
Expand Down
2 changes: 1 addition & 1 deletion lexicon/providers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def authenticate(self):
Make any requests required to get the domain's id for this provider,
so it can be used in subsequent calls.
Should throw an error if authentication fails for any reason,
of if the domain does not exist.
or if the domain does not exist.
"""
return self._authenticate()

Expand Down
212 changes: 212 additions & 0 deletions lexicon/providers/namecom.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
"""Module provider for Name.com"""
from __future__ import absolute_import

import logging

from requests import HTTPError, Session
from requests.auth import HTTPBasicAuth

from lexicon.providers.base import Provider as BaseProvider

LOGGER = logging.getLogger(__name__)

NAMESERVER_DOMAINS = ['name.com']

DUPLICATE_ERROR = {
'message': 'Invalid Argument',
'details': 'Parameter Value Error - Duplicate Record'
}


def provider_parser(subparser):
"""Configure a subparser for Name.com."""

subparser.add_argument('--auth-username', help='specify a username')
subparser.add_argument('--auth-token', help='specify an API token')


class NamecomLoader(object): # pylint: disable=useless-object-inheritance,too-few-public-methods
"""Loader that handles pagination for the Name.com provider."""

def __init__(self, get, url, data_key, next_page=1):
self.get = get
self.url = url
self.data_key = data_key
self.next_page = next_page

def __iter__(self):
while self.next_page:
response = self.get(self.url, {'page': self.next_page})
for data in response[self.data_key]:
yield data
self.next_page = response.get('next_page')


class NamecomProvider(BaseProvider):
"""Provider implementation for Name.com."""

def __init__(self, config):
super(Provider, self).__init__(config)
self.api_endpoint = 'https://api.name.com/v4'
self.session = Session()

def _authenticate(self):
self.session.auth = HTTPBasicAuth(
username=self._get_provider_option('auth_username'),
password=self._get_provider_option('auth_token')
)

# checking domain existence
domain_name = self.domain
for domain in NamecomLoader(self._get, '/domains', 'domains'):
if domain['domainName'] == domain_name:
self.domain_id = domain_name
return

raise Exception('{} domain does not exist'.format(domain_name))

def _create_record(self, rtype, name, content):
data = {
'type': rtype,
'host': self._relative_name(name),
'answer': content,
'ttl': self._get_lexicon_option('ttl')
}

if rtype in ('MX', 'SRV'):
# despite the documentation says a priority is
# required for MX and SRV, it's actually optional
priority = self._get_lexicon_option('priority')
if priority:
data['priority'] = priority

url = '/domains/{}/records'.format(self.domain)
try:
record_id = self._post(url, data)['id']
except HTTPError as error:
response = error.response
if response.status_code == 400 and \
response.json() == DUPLICATE_ERROR:
LOGGER.warning(
'create_record: duplicate record has been skipped'
)
return True
raise

LOGGER.debug('create_record: record %s has been created', record_id)

return record_id

def _list_records(self, rtype=None, name=None, content=None):
url = '/domains/{}/records'.format(self.domain)
records = []

for raw in NamecomLoader(self._get, url, 'records'):
record = {
'id': raw['id'],
'type': raw['type'],
'name': raw['fqdn'][:-1],
'ttl': raw['ttl'],
'content': raw['answer'],
}
records.append(record)

LOGGER.debug('list_records: retrieved %s records', len(records))

if rtype:
records = (record for record in records if record['type'] == rtype)
if name:
name = self._full_name(name)
records = (record for record in records if record['name'] == name)
if content:
records = (record for record in records
if record['content'] == content)

if not isinstance(records, list):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a specific reason to use comprehension tuples above which then need to cast back tuples into list here ? From what I see, you could use comprehension lists directly ([record for record in records if ...]) above, and avoid this cast.

records = list(records)
LOGGER.debug('list_records: filtered %s records', len(records))

return records

def _update_record(self, identifier, rtype=None, name=None, content=None):
if not identifier:
if not (rtype and name):
raise ValueError(
'Record identifier or rtype+name must be specified'
)
records = self._list_records(rtype, name)
if not records:
raise Exception('There is no record to update')

if len(records) > 1:
filtered_records = [record for record in records
if record['content'] == content]
if filtered_records:
records = filtered_records

if len(records) > 1:
raise Exception(
'There are multiple records to update: {}'.format(
', '.join(record['id'] for record in records)
)
)

record_id = records[0]['id']
else:
record_id = identifier

data = {'ttl': self._get_lexicon_option('ttl')}

# even though the documentation says a type and an answer
# are required, they are not required actually
if rtype:
data['type'] = rtype
if name:
data['host'] = self._relative_name(name)
if content:
data['answer'] = content

url = '/domains/{}/records/{}'.format(self.domain, record_id)
record_id = self._put(url, data)['id']
logging.debug('update_record: record %s has been updated', record_id)

return record_id

def _delete_record(self, identifier=None,
rtype=None, name=None, content=None):
if not identifier:
if not (rtype and name):
raise ValueError(
'Record identifier or rtype+name must be specified'
)
records = self._list_records(rtype, name, content)
if not records:
LOGGER.warning('delete_record: there is no record to delete')
return None
record_ids = tuple(record['id'] for record in records)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A comprehension list would be a better fit here, since it will be iterated after (record_ids = [record['id'] for record in records]).

else:
record_ids = (identifier,)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here: records_ids = [identifier,]


for record_id in record_ids:
url = '/domains/{}/records/{}'.format(self.domain, record_id)
self._delete(url)
LOGGER.debug(
'delete_record: record %s has been deleted', record_id
)

return record_ids if len(record_ids) > 1 else record_ids[0]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our specification about delete is to return True if the delete was successful.


def _get_raw_record(self, record_id):
url = '/domains/{}/records/{}'.format(self.domain, record_id)
return self._get(url)

def _request(self, action='GET', url='/', data=None, query_params=None):
response = self.session.request(method=action,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it strictly required to use an HTTP session with pre-authentication? I would prefer if possible to use plain request each time because I saw from my experience that HTTP sessions are raising their own problems. However if it speeds up the requests, I am ok with it if it works for Name.com.

url=self.api_endpoint + url,
json=data,
params=query_params)
response.raise_for_status()
return response.json()


Provider = NamecomProvider
148 changes: 148 additions & 0 deletions lexicon/tests/providers/test_namecom.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
"""Integration tests for Name.com"""
import json
from unittest import TestCase

import pytest
from mock import ANY, Mock, patch
from requests import HTTPError

from lexicon.config import DictConfigSource
from lexicon.providers.namecom import provider_parser
from lexicon.tests.providers.integration_tests import (
IntegrationTests, _vcr_integration_test
)


# Hook into testing framework by inheriting unittest.TestCase and reuse
# the tests which *each and every* implementation of the interface must
# pass, by inheritance from integration_tests.IntegrationTests
class NamecomProviderTests(TestCase, IntegrationTests):
"""TestCase for Name.com"""

# I don't think we really need some docstrings here.
# pylint: disable=missing-function-docstring
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, most of the time I even put this pylint disable directive at the top of the test module ;)


provider_name = 'namecom'
domain = 'mim.pw'

def _filter_headers(self):
return ['Authorization', 'Cookie']

def _filter_response(self, response):
headers = response['headers']
headers.pop('Set-Cookie', None)
headers.pop('content-length', None)

if response['status']['code'] == 200:
try:
data = json.loads(response['body']['string'].decode())
except ValueError:
pass
else:
if 'records' in data:
min_id = 10 ** 8
data['records'] = [
record for record in data['records']
if record['id'] > min_id
]
response['body']['string'] = json.dumps(data).encode()

return response

###########################
# Provider.authenticate() #
###########################
@_vcr_integration_test
def test_provider_authenticate(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you rename this test to not shadow the existing one in integration_test?

provider = self._construct_authenticated_provider()
assert provider.session.auth

############################
# Provider.create_record() #
############################
@_vcr_integration_test
def test_provider_when_calling_create_record_for_MX_with_priority(self): # pylint: disable=invalid-name
priority = 42
config = self._test_config()
config.add_config_source(DictConfigSource({'priority': priority}), 0)
provider = self.provider_module.Provider(config)
provider.authenticate()

record_id = provider.create_record('MX', 'mx.test1', self.domain)
assert provider._get_raw_record(record_id)['priority'] == priority # pylint: disable=protected-access

@_vcr_integration_test
def test_provider_when_calling_create_record_for_MX_with_no_priority(self): # pylint: disable=invalid-name
provider = self._construct_authenticated_provider()
record_id = provider.create_record('MX', 'mx.test2', self.domain)
assert 'priority' not in provider._get_raw_record(record_id) # pylint: disable=protected-access

@_vcr_integration_test
def test_provider_when_calling_create_record_should_fail_on_http_error(self):
provider = self._construct_authenticated_provider()
error = HTTPError(response=Mock())
with patch.object(provider, '_request', side_effect=error):
with pytest.raises(HTTPError):
provider.create_record('TXT', 'httperror', 'HTTPError')

############################
# Provider.update_record() #
############################
@_vcr_integration_test
def test_provider_when_calling_update_record_with_no_identifier_or_rtype_and_name_should_fail(self): # pylint: disable=line-too-long
provider = self._construct_authenticated_provider()
with pytest.raises(ValueError):
provider.update_record(None)

@_vcr_integration_test
def test_provider_when_calling_update_record_should_fail_if_no_record_to_update(self):
provider = self._construct_authenticated_provider()
with pytest.raises(Exception):
provider.update_record(None, 'TXT', 'missingrecord')

@_vcr_integration_test
def test_provider_when_calling_update_record_should_fail_if_multiple_records_to_update(self):
provider = self._construct_authenticated_provider()
provider.create_record('TXT', 'multiple.test', 'foo')
provider.create_record('TXT', 'multiple.test', 'bar')
with pytest.raises(Exception):
provider.update_record(None, 'TXT', 'multiple.test', 'updated')

@_vcr_integration_test
def test_provider_when_calling_update_record_filter_by_content_should_pass(self):
provider = self._construct_authenticated_provider()
provider.create_record('TXT', 'multiple.test', 'foo')
provider.create_record('TXT', 'multiple.test', 'bar')
assert provider.update_record(None, 'TXT', 'multiple.test', 'foo')

@_vcr_integration_test
def test_provider_when_calling_update_record_by_identifier_with_no_other_args_should_pass(self):
provider = self._construct_authenticated_provider()
record_id = provider.create_record('TXT', 'update.test', 'foo')
assert provider.update_record(record_id)

############################
# Provider.delete_record() #
############################
@_vcr_integration_test
def test_provider_when_calling_delete_record_with_no_identifier_or_rtype_and_name_should_fail(self): # pylint: disable=line-too-long
provider = self._construct_authenticated_provider()
with pytest.raises(ValueError):
provider.delete_record()

@_vcr_integration_test
@patch('lexicon.providers.namecom.LOGGER.warning')
def test_provider_when_calling_delete_record_should_pass_if_no_record_to_delete(self, warning):
provider = self._construct_authenticated_provider()
provider.delete_record(None, 'TXT', 'missingrecord')
warning.assert_called_once()
assert 'no record' in warning.call_args.args[0]


def test_subparser_configuration():
"""Tests the provider_parser method."""

subparser = Mock()
provider_parser(subparser)
subparser.add_argument.assert_any_call('--auth-username', help=ANY)
subparser.add_argument.assert_any_call('--auth-token', help=ANY)
Loading