-
Notifications
You must be signed in to change notification settings - Fork 0
/
VulnSaver.py
84 lines (67 loc) · 2.85 KB
/
VulnSaver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#/usr/bin/python3.9
""" A script for storing known Vulnerabilities in a database """
__author__ = "Malik Giafar"
from datetime import datetime, timedelta
import requests,psycopg2,json
jsonConfigFile = json.load(open("./setup/config.json"))
#==============LOGIN DB===================
db_host = jsonConfigFile["db_host"]
db_port = jsonConfigFile["db_port"]
db_user = jsonConfigFile["db_user"]
db_password = jsonConfigFile["db_password"]
db_name = jsonConfigFile["db_name"]
#==============LOGIN DB===================
conn = psycopg2.connect(host=db_host, port=db_port, user=db_user,
password=db_password, database=db_name)
cursor = conn.cursor()
def insertRow(cveid, cvss, summary, impact, lastUpdate, cpeDifficultJson):
cursor.execute("INSERT INTO cves (cveid, cvss, summary, impact, last_update, cpe_multiple_json) VALUES(%s, %s, %s, %s, %s, %s)",
(cveid, cvss, summary, impact, lastUpdate, cpeDifficultJson))
conn.commit()
def isPresentInDB(cveid, lastUpdate):
# cursor.execute("SELECT * from cves")
# vulnList = cursor.fetchall()
# if not vulnList:
# print("no list")
# else:
# for vuln in vulnList:
# if vuln[1] == cveid and vuln[2] == cvss and vuln[3] == summary and vuln[4] == impact and dict(vuln[5]) == json.loads(cpeDifficultJson):
# return True
# return False
argsList = [cveid, lastUpdate]
cursor.execute('''SELECT * from cves where cveid = \'{}\' and last_update = \'{}\''''.format(*argsList))
cpeList = cursor.fetchall()
if cpeList:
return True
return False
aWeekAgo = datetime.now() - timedelta(days=1)
url = "https://services.nvd.nist.gov/rest/json/cves/1.0?modStartDate=" + aWeekAgo.strftime("%Y-%m-%dT00:00:00:000 UTC-05:00")+"&resultsPerPage=2000"
print("retrieving cves list")
r = requests.get(url=url)
vulns = r.json()
print("inserting cves into database, this may take some time")
for singleVuln in vulns["result"]["CVE_Items"]:
cveid = singleVuln["cve"]["CVE_data_meta"]["ID"]
impact = "N/A"
cvss3 = -1
try:
cvss3 = singleVuln["impact"]["baseMetricV3"]["cvssV3"]["baseScore"]
except Exception as e:
pass
try:
impact = singleVuln["impact"]["baseMetricV3"]["cvssV3"]["baseSeverity"]
except Exception as e:
pass
summary = str(singleVuln["cve"]["description"]["description_data"][0]["value"])
summary = summary.replace("\'","\"")
lastUpdate = str(singleVuln["lastModifiedDate"])
cpeDifficultJson = json.dumps(singleVuln["configurations"])
if not isPresentInDB(cveid, lastUpdate):
insertRow(cveid, cvss3, summary, impact, lastUpdate, cpeDifficultJson)
#print("vuln inserted successully")
# print((cveid, cvss3, summary, impact, cpeDifficultJson))
#else:
# print("vuln already present")
print("done")
cursor.close()
conn.close()