forked from Nathanwoodburn/hns_doh_loadbalancer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cert.py
135 lines (115 loc) · 4.27 KB
/
cert.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#!/usr/bin/env python3
import json
import os
import requests
import sys
import time
AUTH = ""
# Check if token file exists
if os.path.isfile("/root/hns_doh_loadbalancer/token"):
print("ERROR: Token file not found")
# Read token from file
with open("/root/hns_doh_loadbalancer/token", "r") as fh:
AUTH = fh.read().strip()
# Check if token is empty
if len(AUTH) == 0:
print("ERROR: Token is empty")
exit(1)
# URL to acme-dns instance
ACMEDNS_URL = "https://nathan.woodburn.au/hnsdoh-acme"
# Path for acme-dns credential storage
STORAGE_PATH = "/etc/letsencrypt/acmedns.json"
# Whitelist for address ranges to allow the updates from
# Example: ALLOW_FROM = ["192.168.10.0/24", "::1/128"]
ALLOW_FROM = []
# Force re-registration. Overwrites the already existing acme-dns accounts.
FORCE_REGISTER = False
DOMAIN = os.environ["CERTBOT_DOMAIN"]
if DOMAIN.startswith("*."):
DOMAIN = DOMAIN[2:]
VALIDATION_DOMAIN = "_acme-challenge."+DOMAIN
VALIDATION_TOKEN = os.environ["CERTBOT_VALIDATION"]
class AcmeDnsClient(object):
"""
Handles the communication with ACME-DNS API
"""
def __init__(self, acmedns_url):
self.acmedns_url = acmedns_url
def update_txt_record(self, txt):
"""Updates the TXT challenge record to ACME-DNS subdomain."""
update = {"txt": txt, "auth": AUTH}
headers = {"Content-Type": "application/json"}
res = requests.post(self.acmedns_url,
headers=headers,
data=json.dumps(update))
if res.status_code == 200:
# Successful update
return
else:
msg = ("Encountered an error while trying to update TXT record in "
"acme-dns. \n"
"------- Request headers:\n{}\n"
"------- Request body:\n{}\n"
"------- Response HTTP status: {}\n"
"------- Response body: {}")
s_headers = json.dumps(headers, indent=2, sort_keys=True)
s_update = json.dumps(update, indent=2, sort_keys=True)
s_body = json.dumps(res.json(), indent=2, sort_keys=True)
print(msg.format(s_headers, s_update, res.status_code, s_body))
sys.exit(1)
class Storage(object):
def __init__(self, storagepath):
self.storagepath = storagepath
self._data = self.load()
def load(self):
"""Reads the storage content from the disk to a dict structure"""
data = dict()
filedata = ""
try:
with open(self.storagepath, 'r') as fh:
filedata = fh.read()
except IOError as e:
if os.path.isfile(self.storagepath):
# Only error out if file exists, but cannot be read
print("ERROR: Storage file exists but cannot be read")
sys.exit(1)
try:
data = json.loads(filedata)
except ValueError:
if len(filedata) > 0:
# Storage file is corrupted
print("ERROR: Storage JSON is corrupted")
sys.exit(1)
return data
def save(self):
"""Saves the storage content to disk"""
serialized = json.dumps(self._data)
try:
with os.fdopen(os.open(self.storagepath,
os.O_WRONLY | os.O_CREAT, 0o600), 'w') as fh:
fh.truncate()
fh.write(serialized)
except IOError as e:
print("ERROR: Could not write storage file.")
sys.exit(1)
def put(self, key, value):
"""Puts the configuration value to storage and sanitize it"""
# If wildcard domain, remove the wildcard part as this will use the
# same validation record name as the base domain
if key.startswith("*."):
key = key[2:]
self._data[key] = value
def fetch(self, key):
"""Gets configuration value from storage"""
try:
return self._data[key]
except KeyError:
return None
if __name__ == "__main__":
# Init
client = AcmeDnsClient(ACMEDNS_URL)
storage = Storage(STORAGE_PATH)
# Update the TXT record in acme-dns instance
client.update_txt_record(VALIDATION_TOKEN)
# Wait for the DNS to propagate for 60 seconds
time.sleep(60)