-
Notifications
You must be signed in to change notification settings - Fork 1
/
check_dns_serials.py
executable file
·87 lines (81 loc) · 2.94 KB
/
check_dns_serials.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
#!/usr/bin/env python3
# Copyright 2016 Andrej Rode.
#
# This is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3, or (at your option)
# any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this software; see the file COPYING. If not, write to
# the Free Software Foundation, Inc., 51 Franklin Street,
# Boston, MA 02110-1301, USA.
import dns.resolver
import dns.message
import dns.rdatatype
import sys
import logging as log
import argparse
def check_serials(mydomain):
ns_query = dns.resolver.query(mydomain, 'NS')
nameservers = ns_query.rrset.items
ns_list = [myns.to_text() for myns in nameservers]
ns_dict = {}
for ns in ns_list:
ip_list = []
try:
ip4_query = dns.resolver.query(ns, 'A')
for ip in ip4_query:
ip_list += [ip.address]
except:
pass
ns_dict.update({ns: frozenset(ip_list)})
try:
ip6_query = dns.resolver.query(ns, 'AAAA')
for ip in ip6_query:
ip_list += [ip.address]
except:
pass
ns_dict.update({ns: frozenset(ip_list)})
request = dns.message.make_query(mydomain, dns.rdatatype.SOA)
serial_dict = {}
for key in ns_dict.keys():
name_server = list(ns_dict[key])[0]
response = dns.query.udp(request, name_server)
serial_dict.update({key: response.answer[0][0].serial})
serial_values = list(serial_dict.values())
log.info('Serials returned for domain {}: \n'.format(mydomain)+str(serial_dict))
if len(set(serial_values)) > 1:
return(serial_dict)
else:
return 0
def main():
parser = argparse.ArgumentParser()
parser.add_argument('domains', metavar='d', type=str, nargs='+',
help='domains which to check')
parser.add_argument('--verbose',action='store_true')
args = parser.parse_args()
errors = {}
if args.verbose:
log.basicConfig(format="%(levelname)s: %(message)s", level=log.DEBUG)
else:
log.basicConfig(format="%(levelname)s: %(message)s")
for domain in args.domains:
result = check_serials(domain)
if result != 0:
errors.update({domain: result})
else:
print('Serials in domain: {} are OK'.format(domain))
if errors :
error_text = 'Serials in domain: {} are unequal. Returned serials from NS: {}\n'
full_text = ''
for key in errors.keys():
full_text += error_text.format(key,errors[key])
raise Exception(full_text)
if __name__ == "__main__":
main()