diff --git a/application/cmd/cre_main.py b/application/cmd/cre_main.py index 04a987d9d..476749941 100644 --- a/application/cmd/cre_main.py +++ b/application/cmd/cre_main.py @@ -89,18 +89,12 @@ def register_cre(cre: defs.CRE, collection: db.Node_collection) -> db.CRE: collection.add_internal_link( dbcre, register_cre(link.document, collection), type=link.ltype ) - elif type(link.document) == defs.Standard: + else: collection.add_link( cre=dbcre, node=register_node(node=link.document, collection=collection), type=link.ltype, ) - elif type(link.document) == defs.Tool: - collection.add_link( - cre=dbcre, - tool=register_tool(tool=link.document, collection=collection), - type=link.ltype, - ) return dbcre @@ -327,7 +321,7 @@ def print_graph() -> None: raise NotImplementedError -def run(args: argparse.Namespace) -> None: +def run(args: argparse.Namespace) -> None: # pragma: no cover script_path = os.path.dirname(os.path.realpath(__file__)) os.path.join(script_path, "../cres") diff --git a/application/tests/cre_main_test.py b/application/tests/cre_main_test.py index e0a0348f3..69bce5b20 100644 --- a/application/tests/cre_main_test.py +++ b/application/tests/cre_main_test.py @@ -39,6 +39,7 @@ def test_register_node_with_links(self) -> None: id="", description="", name="standard_with_links", + section="Standard With Links", links=[ defs.Link( document=defs.Standard( @@ -54,9 +55,15 @@ def test_register_node_with_links(self) -> None: name="CodemcCodeFace", ) ), + defs.Link( + document=defs.Tool( + description="awesome hacking tool", + name="ToolmcToolFace", + ) + ), ], - section="Standard With Links", ) + ret = main.register_node(node=standard_with_links, collection=self.collection) # assert returned value makes sense self.assertEqual(ret.name, "standard_with_links") @@ -68,46 +75,51 @@ def test_register_node_with_links(self) -> None: self.assertIsNone(thing.cre) self.assertEqual(self.collection.session.query(db.Links).all(), []) - # 3 cre-less nodes in the db - self.assertEqual(len(self.collection.session.query(db.Node).all()), 3) + + # 4 cre-less nodes in the db + self.assertEqual(len(self.collection.session.query(db.Node).all()), 4) def test_register_node_with_cre(self) -> None: + known_standard_with_cre = defs.Standard( + name="CWE", + section="598", + links=[ + defs.Link(document=defs.CRE(id="101-202", name="crename")), + ], + ) standard_with_cre = defs.Standard( - doctype=defs.Credoctypes.Standard, id="", description="", name="standard_with_cre", links=[ - defs.Link( - document=defs.CRE( - doctype=defs.Credoctypes.CRE, - id="101-202", - description="cre desc", - name="crename", - links=[], - tags=[], - metadata={}, - ) - ), defs.Link( document=defs.Tool( - doctype=defs.Credoctypes.Tool, tooltype=defs.ToolTypes.Offensive, name="zap", ) ), + defs.Link( + document=defs.Standard( + name="CWE", + section="598", + links=[ + defs.Link(document=defs.CRE(id="101-202", name="crename")), + ], + ) + ), ], section="standard_with_cre", ) + main.register_node(node=known_standard_with_cre, collection=self.collection) main.register_node(node=standard_with_cre, collection=self.collection) # assert db structure makes sense self.assertEqual( - len(self.collection.session.query(db.Links).all()), 2 - ) # 2 links in the db + len(self.collection.session.query(db.Links).all()), 3 + ) # 3 links in the db self.assertEqual( - len(self.collection.session.query(db.Node).all()), 2 - ) # 2 standards in the db + len(self.collection.session.query(db.Node).all()), 3 + ) # 3 standards in the db self.assertEqual( len(self.collection.session.query(db.CRE).all()), 1 ) # 1 cre in the db @@ -181,16 +193,16 @@ def test_register_standard_with_groupped_cre_links(self) -> None: def test_register_cre(self) -> None: standard = defs.Standard( - doctype=defs.Credoctypes.Standard, name="ASVS", section="SESSION-MGT-TOKEN-DIRECTIVES-DISCRETE-HANDLING", subsection="3.1.1", ) + tool = defs.Tool(name="Tooly", tooltype=defs.ToolTypes.Defensive) cre = defs.CRE( id="100", description="CREdesc", name="CREname", - links=[defs.Link(document=standard)], + links=[defs.Link(document=standard), defs.Link(document=tool)], tags=["CREt1", "CREt2"], metadata={"tags": ["CREl1", "CREl2"]}, ) @@ -199,6 +211,9 @@ def test_register_cre(self) -> None: self.assertEqual( len(self.collection.session.query(db.CRE).all()), 1 ) # 1 cre in the db + self.assertEqual( + len(self.collection.session.query(db.Node).all()), 2 + ) # 2 nodes in the db def test_parse_file(self) -> None: file: List[Dict[str, Any]] = [ diff --git a/application/utils/external_project_parsers/cheatsheets_parser.py b/application/utils/external_project_parsers/cheatsheets_parser.py index f105fd898..405126a7f 100644 --- a/application/utils/external_project_parsers/cheatsheets_parser.py +++ b/application/utils/external_project_parsers/cheatsheets_parser.py @@ -18,13 +18,18 @@ def cheatsheet(section: str, hyperlink: str, tags: List[str]) -> defs.Standard: def parse_cheatsheets(cache: db.Node_collection): c_repo = "https://github.com/OWASP/CheatSheetSeries.git" - cheasheets_path = "cheatsheets/" + cheatsheets_path = "cheatsheets/" + repo = git.clone(c_repo) + register_cheatsheets(repo=repo, cache=cache, cheatsheets_path=cheatsheets_path) + + +def register_cheatsheets(cache: db.Node_collection, repo, cheatsheets_path, repo_path): + title_regexp = r"# (?P
.+)"
+ zap_md_top10_regexp = r"OWASP_(?P\d\d\d\d)_A(?P\d\d?)"
- repo = git.clone(zaproxy_website)
for mdfile in os.listdir(os.path.join(repo.working_dir, alerts_path)):
pth = os.path.join(repo.working_dir, alerts_path, mdfile)
name = None
@@ -72,20 +77,52 @@ def parse_zap_alerts(cache: db.Node_collection):
)
continue
cwe = re.search(zap_md_cwe_regexp, mdtext)
+ alert = zap_alert(
+ name=name,
+ id=externalId,
+ description=description,
+ tags=[tag],
+ code=code,
+ )
+ dbnode = cache.add_node(alert)
+
+ top10 = re.finditer(zap_md_top10_regexp, mdtext)
+ if top10:
+ for match in top10:
+ year = match.group("year")
+ num = match.group("num")
+ entries = cache.get_nodes(name=f"Top10 {year}", ntype="Standard")
+ entry = [e for e in entries if str(int(num)) in e.section]
+ if entry:
+ logger.info(
+ f"Found zap alert {name} linking to {entry[0].name}{entry[0].section}"
+ )
+ for cre in [
+ nl
+ for nl in entry[0].links
+ if nl.document.doctype == defs.Credoctypes.CRE
+ ]:
+ cache.add_link(
+ cre=db.dbCREfromCRE(cre.document),
+ node=dbnode,
+ type=defs.LinkTypes.LinkedTo,
+ )
+ else:
+ logger.error(
+ f"Zap Alert {name} links to OWASP top 10 {year}:{num} but CRE doesn't know about it, incomplete data?"
+ )
if cwe:
cweId = cwe.group("cweId")
+ logger.info(f"Found zap alert {name} linking to CWE {cweId}")
cwe_nodes = cache.get_nodes(name="CWE", section=cweId)
for node in cwe_nodes:
for link in node.links:
if link.document.doctype == defs.Credoctypes.CRE:
- alert = zap_alert(
- name=name,
- id=externalId,
- description=description,
- tags=[tag],
- code=code,
- )
- dbnode = cache.add_node(alert)
+
cache.add_link(
- cre=db.dbCREfromCRE(link.document), node=dbnode
+ cre=db.dbCREfromCRE(link.document),
+ node=dbnode,
+ type=defs.LinkTypes.LinkedTo,
)
+ else:
+ logger.info(f"CWE id not found in alert {externalId}, skipping linking")
diff --git a/cre.py b/cre.py
index a8ef6ab74..0e5ab39e4 100644
--- a/cre.py
+++ b/cre.py
@@ -9,7 +9,6 @@
from flask_migrate import Migrate # type: ignore
from application import create_app, sqla # type: ignore
-from application.cmd import cre_main
# Hacky solutions to make this both a command line application with argparse and a flask application
@@ -34,6 +33,9 @@ def test(cover: coverage.Coverage, test_names: List[str]) -> None:
config_file="application/tests/.coveragerc",
)
COV.start()
+ # Hack to get coverage to cover method and class defs
+ from application import create_app, sqla # type: ignore
+ from application.cmd import cre_main
if test_names:
tests = unittest.TestLoader().loadTestsFromNames(test_names)
@@ -146,6 +148,9 @@ def main() -> None:
help="used with --compare_datasets, dataset2",
)
args = parser.parse_args()
+
+ from application.cmd import cre_main
+
cre_main.run(args)