forked from mozilla/foundation-security-advisories
-
Notifications
You must be signed in to change notification settings - Fork 0
/
common_cve.py
397 lines (356 loc) · 14.5 KB
/
common_cve.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
#!/usr/bin/env python3
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import os
import sys
import subprocess
from datetime import datetime, timezone
from json import dumps
import difflib
from bisect import insort
import requests
from cvelib.cve_api import CveApi
from requests import HTTPError
from foundation_security_advisories.common import (
get_all_files,
parse_yml_file,
CVEAdvisory,
CVEAdvisoryInstance,
)
cve_api = CveApi(
username=os.getenv("CVE_USER"),
org=os.getenv("CVE_ORG"),
api_key=os.getenv("CVE_API_KEY"),
env=os.getenv("CVE_ENV"),
)
announced_cve_steps: list[str] = []
def print_cve_step(cve_id: str):
if cve_id not in announced_cve_steps:
print(f"\n-> {cve_id}")
announced_cve_steps.append(cve_id)
def publish_cve(cve_id: str, cve_json: dict):
"""
CVE Services: Publish the content for a already existing and given
CVE-ID with the given data in CVE JSON format.
"""
cve_json["containers"]["cna"]["references"].sort(
key=lambda reference: reference["url"]
)
diff = difflib.unified_diff(
"",
dumps(cve_json, indent=2, sort_keys=True).split("\n"),
lineterm="",
fromfile=f"Remote (not yet published)",
tofile=f"Local",
)
for line in diff:
print(line)
if not prompt_yes_no(f"\nShould this content be published for {cve_id}?"):
print(f"Skipping {cve_id}")
return False
print(f"Publishing {cve_id}")
try:
cve_api.publish(cve_id, cve_json)
# The timestamp on the API needs to be younger than the commit timestamp so that
# the file does not get registered as modified.
touch_cve_id(cve_id)
except HTTPError as e:
raise Exception(f"Failed to publish {cve_id}, {e.response.text}")
def get_cve(cve_id: str):
"""CVE Services: Get CVE for the given CVE-ID."""
try:
return cve_api.show_cve_record(cve_id)
except HTTPError as e:
raise Exception(f"Failed to publish {cve_id}, {e.response.text}")
def touch_cve_id(cve_id: str):
"""CVE Services: Update the timestamp of the given CVE-ID to the current date."""
print(
f"Updating timestamp on {cve_id} to current date {pretty_date(datetime.now(tz=timezone.utc).timestamp())}"
)
return cve_api._put(f"cve-id/{cve_id}").json()
def update_published_cve(cve_id: str, cve_json: dict):
"""CVE Servies: Update the content of the given CVE-ID with the given data in CVE JSON 5.1 format."""
print(f"Updating {cve_id}")
try:
cve_api.update_published(cve_id, cve_json)
# We need to update the timestamp on the CVE-ID itself, because that is what we use
# later to check for modified files.
touch_cve_id(cve_id)
except HTTPError as e:
raise Exception(f"Failed to update {cve_id}, {e.response.text}")
def try_update_published_cve(local_cve: CVEAdvisory, local_date: int, remote_date):
"""
Check if there is a difference between the local and the remote CVE.
If there is one, update the CVE.
"""
remote_date_str = pretty_date(remote_date)
local_date_str = pretty_date(local_date)
if remote_date > local_date and not os.getenv("FORCE_UPDATE"):
return
print_cve_step(local_cve.id)
# We need to modify the remote and local json a bit to make sure we only
# detect a diff if something actually changed.
remote_cve_json = get_cve(local_cve.id)
remote_cve_json.pop("cveMetadata")
remote_cve_json_container = remote_cve_json["containers"]["cna"]
remote_cve_json_container.pop("providerMetadata")
if "x_legacyV4Record" in remote_cve_json_container:
remote_cve_json_container.pop("x_legacyV4Record")
local_cve_json = local_cve.to_json()
local_reference_urls = [
local_reference[0]
for local_instance in local_cve.instances
for local_reference in local_instance.references
]
# If there are references which we did not add automatically, we probably don't
# want to remove them, so we move them to our to-be-published object.
remote_extra_references = list(
filter(
lambda reference: not reference["url"] in local_reference_urls
and all(
not reference["url"].startswith(prefix)
for prefix in [
"https://bugzilla.mozilla.org",
"https://www.bugzilla.mozilla.org",
"https://mozilla.org",
"https://www.mozilla.org",
]
),
remote_cve_json["containers"]["cna"]["references"],
)
)
local_cve_json["containers"]["cna"]["references"].extend(remote_extra_references)
# Sort the references to make sure we detect the diff correctly.
remote_cve_json["containers"]["cna"]["references"].sort(
key=lambda reference: reference["url"]
)
local_cve_json["containers"]["cna"]["references"].sort(
key=lambda reference: reference["url"]
)
# Include any other containers from the remote we do not know about (like "adp")
for container_name in remote_cve_json["containers"].keys():
if container_name not in local_cve_json["containers"].keys():
local_cve_json["containers"][container_name] = remote_cve_json[
"containers"
][container_name]
diff = difflib.unified_diff(
dumps(remote_cve_json, indent=2, sort_keys=True).split("\n"),
dumps(local_cve_json, indent=2, sort_keys=True).split("\n"),
lineterm="",
fromfile=f"Remote",
fromfiledate=remote_date_str,
tofile=f"Local ",
tofiledate=local_date_str,
)
is_unchanged = True
for line in diff:
print(line)
is_unchanged = False
if is_unchanged:
# There seems to be no actual difference, lets update the
# timestamp so that we won't be here again next time.
print(f"--- Remote\t{remote_date_str}")
print(f"+++ Local \t{local_date_str}")
print(f"Not actual difference found for {local_cve.id}")
touch_cve_id(local_cve.id)
return
if local_cve.year < 2023:
if not prompt_yes_no(
f"\nThis CVE lies before the cutoff year 2023. Should the content still be updated for {local_cve.id}?",
default=False,
):
print(f"Skipping {local_cve.id} because it lies before the cutoff year")
touch_cve_id(local_cve.id)
return False
else:
if not prompt_yes_no(f"\nShould this content be updated for {local_cve.id}?"):
print(f"Skipping {local_cve.id}")
return False
update_published_cve(local_cve.id, local_cve_json)
def reserve_cve_id(year: str):
"""CVE Servies: Reserve a new CVE-ID for a given year and return that new id."""
print(f"Reserving CVE-ID for year {year}")
try:
response = cve_api.reserve(1, False, year)
except HTTPError as e:
raise Exception(f"Failed to reserve CVE-ID, {e.response.text}")
if (
"cve_ids" not in response
or len(response["cve_ids"]) != 1
or "cve_id" not in response["cve_ids"][0]
):
raise ValueError(f"API did not respond with valid CVE-ID")
return response["cve_ids"][0]["cve_id"]
def get_owned_cve_ids():
"""
CVE-Services: Get all the CVE-IDs owned by the current CNA. Returns a tuple containing:
- A list of all the owned IDs, regardless of their state
- A dictionary of all the IDs with the state `PUBLISHED`, mapped to the time
they were last modified.
"""
published_dates: dict[str, float] = {}
owned_ids = []
print("-> Fetching already owned CVE-IDs")
for cve_advisory in cve_api.list_cves():
cve_id = cve_advisory["cve_id"]
owned_ids.append(cve_id)
if cve_advisory["state"] == "PUBLISHED" or cve_advisory["state"] == "REJECTED":
published_dates[cve_id] = parse_iso_date(cve_advisory["time"]["modified"])
elif cve_advisory["state"] == "RESERVED":
continue
else:
raise ValueError(f"Invalid CVE state '{cve_advisory['state']}'")
return owned_ids, published_dates
def replace_cve_id(cve: CVEAdvisory):
"""
Replace the id of a given `CVEAdvisory` with a new CVE-ID.
Returns True if the id of the given advisory has been changed and False
if it hasn't.
"""
old_id = cve.id
if not prompt_yes_no(f"Should a new CVE-ID be reserved to replace {old_id}?"):
print(f"Skipping {old_id}")
return False
print(f"Replacing CVE-ID for {old_id}")
new_id = reserve_cve_id(cve.year)
print(f"Reserved {new_id}")
cve.id = new_id
for instance in cve.instances:
with open(instance.file_name) as r:
file_content = r.read().replace(old_id, new_id)
with open(instance.file_name, "w") as w:
w.write(file_content)
if os.getenv("CI"):
subprocess.run(["git", "add", instance.file_name])
print(f"Renamed {old_id} to {new_id}")
return True
def parse_iso_date(date_string: str):
"""
Parse the given date string in the format used by CVE Servies
and return the corresponding date as a unix timestamp.
"""
return datetime.fromisoformat(date_string).timestamp()
def pretty_date(utc_timestamp: str):
"""
Return the given Unix UTC timestamp as a string in the following format:
%Y-%m-%d %H:%M:%S UTC
"""
return datetime.fromtimestamp(utc_timestamp, timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S UTC"
)
def parse_bug(bug: dict):
"""
Parse a single given bug from the advisory YAML, and return the
corresponding URL and (optionally) description.
"""
url = str(bug["url"])
desc = str(bug["desc"]) if "desc" in bug else None
if not url.startswith("http"):
if "," in url:
url = "https://bugzilla.mozilla.org/buglist.cgi?bug_id=" + url.replace(
" ", ""
).replace(",", "%2C")
else:
url = "https://bugzilla.mozilla.org/show_bug.cgi?id=" + url
return url, desc
def prompt_yes_no(question: str, default=True):
if os.getenv("CI") or os.getenv("PROMPT_CHOOSE_DEFAULT"):
return default
try:
response = input(question + (" (Y/n)" if default else " (y/N)"))
except KeyboardInterrupt:
exit(0)
return response.strip().lower() in (["", "y", "yes"] if default else ["y", "yes"])
def get_local_cve_advisories():
"""
Get all the CVE advisories located in this repository as `CVEAdvisory`
objects. Returns a dictionary of all the local CVE-IDs mapped to
their respective `CVEAdvisory` objects.
"""
local_advisories: dict[str, CVEAdvisory] = {}
print("\n-> Checking local files")
for file_name in get_all_files():
if not file_name.endswith(".yml"):
continue
file_data: dict = parse_yml_file(file_name)
file_last_modified = int(
subprocess.run(
[
"git",
"log",
"--pretty=format:%at",
"-1",
"HEAD",
"--",
file_name,
],
capture_output=True,
).stdout.strip()
)
if "advisories" in file_data:
for cve_id in file_data["advisories"]:
cve_data = file_data["advisories"][cve_id]
if cve_id not in local_advisories:
year = int(cve_id.split("-")[-2])
local_advisories[cve_id] = CVEAdvisory(id=cve_id, year=year)
for fixed_in in file_data["fixed_in"]:
product, version_fixed = fixed_in.rsplit(None, 1)
references = [parse_bug(bug) for bug in cve_data["bugs"]]
cve_instance = CVEAdvisoryInstance(
parent=local_advisories[cve_id],
title=cve_data["title"],
description=cve_data["description"].strip(),
reporter=cve_data["reporter"],
references=references,
mfsa_id=file_data["mfsa_id"],
product=product,
version_fixed=version_fixed,
file_name=file_name,
file_last_modified=file_last_modified,
)
# We want the instances to be sorted by the msfa id to avoid pushing updates
# to the API where the only thing that changes is the order of the instances.
insort(
local_advisories[cve_id].instances,
cve_instance,
key=lambda x: x.mfsa_id,
)
return local_advisories
def try_set_bugzilla_alias(bug: str, cve_id: int):
"""
Try to set the alias of the given bugzilla bug to the given CVE-ID.
The bug number is supposed to come from the temporary MSFA-RESERVE-{year}-{id}
IDs, where {id} potentially is a bugzilla bug number. All {id}s smaller than 100000
will be ignored. Will return without error if anything fails.
"""
try:
# Check if we have a bugzilla API key available
BUGZILLA_API_KEY = os.getenv("BUGZILLA_API_KEY")
if not BUGZILLA_API_KEY:
print(
f"Skipping alias assignment for {cve_id} (bug {bug}) as no BUGZILLA_API_KEY was provided"
)
return
# Make sure this is actually a number
bug_number = int(bug)
# Skip smaller numbers as there is a high chance these aren't any actual bugzilla bug numbers
if bug_number < 100000:
print(
f"Skipping alias assignment for {cve_id} as '{bug_number}' does not seem to be a bug number"
)
return
if not prompt_yes_no(
f"Should '{cve_id}' be set as an alias for bug {bug_number} on bugzilla?"
):
print(f"Skipping alias assignment for {cve_id} (bug {bug})")
return
# Try to update the alias for the given bug number. If this fails our try block will catch it.
requests.put(
f"https://bugzilla.mozilla.org/rest/bug/{bug_number}",
data={"alias": cve_id},
headers={"X-BUGZILLA-API-KEY": BUGZILLA_API_KEY},
)
print(f"Assigned alias {cve_id} to bug {bug}")
except Exception as e:
print(f"Failed to assign alias {cve_id} to bug {bug} - {e}")