Skip to content

Commit

Permalink
Merge branch 'main' into data-equivalency
Browse files Browse the repository at this point in the history
  • Loading branch information
northdpole authored Apr 25, 2022
2 parents 2bffe99 + bf1ad26 commit 122e22e
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 49 deletions.
10 changes: 2 additions & 8 deletions application/cmd/cre_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,12 @@ def register_cre(cre: defs.CRE, collection: db.Node_collection) -> db.CRE:
collection.add_internal_link(
dbcre, register_cre(link.document, collection), type=link.ltype
)
elif type(link.document) == defs.Standard:
else:
collection.add_link(
cre=dbcre,
node=register_node(node=link.document, collection=collection),
type=link.ltype,
)
elif type(link.document) == defs.Tool:
collection.add_link(
cre=dbcre,
tool=register_tool(tool=link.document, collection=collection),
type=link.ltype,
)
return dbcre


Expand Down Expand Up @@ -327,7 +321,7 @@ def print_graph() -> None:
raise NotImplementedError


def run(args: argparse.Namespace) -> None:
def run(args: argparse.Namespace) -> None: # pragma: no cover
script_path = os.path.dirname(os.path.realpath(__file__))
os.path.join(script_path, "../cres")

Expand Down
59 changes: 37 additions & 22 deletions application/tests/cre_main_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def test_register_node_with_links(self) -> None:
id="",
description="",
name="standard_with_links",
section="Standard With Links",
links=[
defs.Link(
document=defs.Standard(
Expand All @@ -54,9 +55,15 @@ def test_register_node_with_links(self) -> None:
name="CodemcCodeFace",
)
),
defs.Link(
document=defs.Tool(
description="awesome hacking tool",
name="ToolmcToolFace",
)
),
],
section="Standard With Links",
)

ret = main.register_node(node=standard_with_links, collection=self.collection)
# assert returned value makes sense
self.assertEqual(ret.name, "standard_with_links")
Expand All @@ -68,46 +75,51 @@ def test_register_node_with_links(self) -> None:
self.assertIsNone(thing.cre)

self.assertEqual(self.collection.session.query(db.Links).all(), [])
# 3 cre-less nodes in the db
self.assertEqual(len(self.collection.session.query(db.Node).all()), 3)

# 4 cre-less nodes in the db
self.assertEqual(len(self.collection.session.query(db.Node).all()), 4)

def test_register_node_with_cre(self) -> None:
known_standard_with_cre = defs.Standard(
name="CWE",
section="598",
links=[
defs.Link(document=defs.CRE(id="101-202", name="crename")),
],
)
standard_with_cre = defs.Standard(
doctype=defs.Credoctypes.Standard,
id="",
description="",
name="standard_with_cre",
links=[
defs.Link(
document=defs.CRE(
doctype=defs.Credoctypes.CRE,
id="101-202",
description="cre desc",
name="crename",
links=[],
tags=[],
metadata={},
)
),
defs.Link(
document=defs.Tool(
doctype=defs.Credoctypes.Tool,
tooltype=defs.ToolTypes.Offensive,
name="zap",
)
),
defs.Link(
document=defs.Standard(
name="CWE",
section="598",
links=[
defs.Link(document=defs.CRE(id="101-202", name="crename")),
],
)
),
],
section="standard_with_cre",
)

main.register_node(node=known_standard_with_cre, collection=self.collection)
main.register_node(node=standard_with_cre, collection=self.collection)
# assert db structure makes sense
self.assertEqual(
len(self.collection.session.query(db.Links).all()), 2
) # 2 links in the db
len(self.collection.session.query(db.Links).all()), 3
) # 3 links in the db
self.assertEqual(
len(self.collection.session.query(db.Node).all()), 2
) # 2 standards in the db
len(self.collection.session.query(db.Node).all()), 3
) # 3 standards in the db
self.assertEqual(
len(self.collection.session.query(db.CRE).all()), 1
) # 1 cre in the db
Expand Down Expand Up @@ -181,16 +193,16 @@ def test_register_standard_with_groupped_cre_links(self) -> None:

def test_register_cre(self) -> None:
standard = defs.Standard(
doctype=defs.Credoctypes.Standard,
name="ASVS",
section="SESSION-MGT-TOKEN-DIRECTIVES-DISCRETE-HANDLING",
subsection="3.1.1",
)
tool = defs.Tool(name="Tooly", tooltype=defs.ToolTypes.Defensive)
cre = defs.CRE(
id="100",
description="CREdesc",
name="CREname",
links=[defs.Link(document=standard)],
links=[defs.Link(document=standard), defs.Link(document=tool)],
tags=["CREt1", "CREt2"],
metadata={"tags": ["CREl1", "CREl2"]},
)
Expand All @@ -199,6 +211,9 @@ def test_register_cre(self) -> None:
self.assertEqual(
len(self.collection.session.query(db.CRE).all()), 1
) # 1 cre in the db
self.assertEqual(
len(self.collection.session.query(db.Node).all()), 2
) # 2 nodes in the db

def test_parse_file(self) -> None:
file: List[Dict[str, Any]] = [
Expand Down
17 changes: 10 additions & 7 deletions application/utils/external_project_parsers/cheatsheets_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@ def cheatsheet(section: str, hyperlink: str, tags: List[str]) -> defs.Standard:

def parse_cheatsheets(cache: db.Node_collection):
c_repo = "https://github.com/OWASP/CheatSheetSeries.git"
cheasheets_path = "cheatsheets/"
cheatsheets_path = "cheatsheets/"
repo = git.clone(c_repo)
register_cheatsheets(repo=repo, cache=cache, cheatsheets_path=cheatsheets_path)


def register_cheatsheets(cache: db.Node_collection, repo, cheatsheets_path, repo_path):

title_regexp = r"# (?P<title>.+)"
cre_link = r"(https://www\.)?opencre.org/cre/(?P<cre>\d+-\d+)"
repo = git.clone(c_repo)
files = os.listdir(os.path.join(repo.working_dir, cheasheets_path))
files = os.listdir(os.path.join(repo.working_dir, cheatsheets_path))
for mdfile in files:
pth = os.path.join(repo.working_dir, cheasheets_path, mdfile)
pth = os.path.join(repo.working_dir, cheatsheets_path, mdfile)
name = None
tag = None
section = None
Expand All @@ -39,9 +44,7 @@ def parse_cheatsheets(cache: db.Node_collection):
name = title.group("title")
cre_id = cre.group("cre")
cres = cache.get_CREs(external_id=cre_id)
hyperlink = (
f"{c_repo.replace('.git','')}/tree/master/{cheasheets_path}{mdfile}"
)
hyperlink = f"{repo_path.replace('.git','')}/tree/master/{cheatsheets_path}{mdfile}"
for dbcre in cres:
cs = cheatsheet(
section=name,
Expand Down
59 changes: 48 additions & 11 deletions application/utils/external_project_parsers/zap_alerts_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
def zap_alert(
name: str, id: str, description: str, tags: List[str], code: str
) -> defs.Tool:
tags.append(id)
return defs.Tool(
tooltype=defs.ToolTypes.Offensive,
name=f"ZAP Rule: {name}",
id=id,
description=description,
tags=tags,
hyperlink=code,
Expand All @@ -30,14 +30,19 @@ def zap_alert(
def parse_zap_alerts(cache: db.Node_collection):
zaproxy_website = "https://github.com/zaproxy/zaproxy-website.git"
alerts_path = "site/content/docs/alerts/"
repo = git.clone(zaproxy_website)
register_alerts(repo=repo, cache=cache, alerts_path=alerts_path)


def register_alerts(cache: db.Node_collection, repo: git.git, alerts_path: str):
zap_md_cwe_regexp = r"cwe: ?(?P<cweId>\d+)"
zap_md_title_regexp = r"title: ?(?P<title>\".+\")"
zap_md_alert_id_regexp = r"alertid: ?(?P<id>\d+)"
zap_md_alert_type_regexp = r"alerttype: ?(?P<type>\".+\")"
zap_md_solution_regexp = r"solution: ?(?P<solution>\".+\")"
zap_md_code_regexp = r"code: ?(?P<code>.+)"
zap_md_top10_regexp = r"OWASP_(?P<year>\d\d\d\d)_A(?P<num>\d\d?)"

repo = git.clone(zaproxy_website)
for mdfile in os.listdir(os.path.join(repo.working_dir, alerts_path)):
pth = os.path.join(repo.working_dir, alerts_path, mdfile)
name = None
Expand Down Expand Up @@ -72,20 +77,52 @@ def parse_zap_alerts(cache: db.Node_collection):
)
continue
cwe = re.search(zap_md_cwe_regexp, mdtext)
alert = zap_alert(
name=name,
id=externalId,
description=description,
tags=[tag],
code=code,
)
dbnode = cache.add_node(alert)

top10 = re.finditer(zap_md_top10_regexp, mdtext)
if top10:
for match in top10:
year = match.group("year")
num = match.group("num")
entries = cache.get_nodes(name=f"Top10 {year}", ntype="Standard")
entry = [e for e in entries if str(int(num)) in e.section]
if entry:
logger.info(
f"Found zap alert {name} linking to {entry[0].name}{entry[0].section}"
)
for cre in [
nl
for nl in entry[0].links
if nl.document.doctype == defs.Credoctypes.CRE
]:
cache.add_link(
cre=db.dbCREfromCRE(cre.document),
node=dbnode,
type=defs.LinkTypes.LinkedTo,
)
else:
logger.error(
f"Zap Alert {name} links to OWASP top 10 {year}:{num} but CRE doesn't know about it, incomplete data?"
)
if cwe:
cweId = cwe.group("cweId")
logger.info(f"Found zap alert {name} linking to CWE {cweId}")
cwe_nodes = cache.get_nodes(name="CWE", section=cweId)
for node in cwe_nodes:
for link in node.links:
if link.document.doctype == defs.Credoctypes.CRE:
alert = zap_alert(
name=name,
id=externalId,
description=description,
tags=[tag],
code=code,
)
dbnode = cache.add_node(alert)

cache.add_link(
cre=db.dbCREfromCRE(link.document), node=dbnode
cre=db.dbCREfromCRE(link.document),
node=dbnode,
type=defs.LinkTypes.LinkedTo,
)
else:
logger.info(f"CWE id not found in alert {externalId}, skipping linking")
7 changes: 6 additions & 1 deletion cre.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from flask_migrate import Migrate # type: ignore

from application import create_app, sqla # type: ignore
from application.cmd import cre_main

# Hacky solutions to make this both a command line application with argparse and a flask application

Expand All @@ -34,6 +33,9 @@ def test(cover: coverage.Coverage, test_names: List[str]) -> None:
config_file="application/tests/.coveragerc",
)
COV.start()
# Hack to get coverage to cover method and class defs
from application import create_app, sqla # type: ignore
from application.cmd import cre_main

if test_names:
tests = unittest.TestLoader().loadTestsFromNames(test_names)
Expand Down Expand Up @@ -146,6 +148,9 @@ def main() -> None:
help="used with --compare_datasets, dataset2",
)
args = parser.parse_args()

from application.cmd import cre_main

cre_main.run(args)


Expand Down

0 comments on commit 122e22e

Please sign in to comment.