-
Notifications
You must be signed in to change notification settings - Fork 10
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: add command to get aggregate report
Relates to JIRA: DISCOVERY-611
- Loading branch information
1 parent
c651843
commit e801026
Showing
6 changed files
with
216 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
"""ReportAggregateCommand retrieves and outputs the aggregate report.""" | ||
|
||
import sys | ||
from logging import getLogger | ||
|
||
from requests import codes | ||
|
||
from qpc import messages, report, scan | ||
from qpc.clicommand import CliCommand | ||
from qpc.request import GET, request | ||
from qpc.translation import _ | ||
from qpc.utils import pretty_format | ||
|
||
logger = getLogger(__name__) | ||
|
||
|
||
class ReportAggregateCommand(CliCommand): | ||
"""Defines the command for showing the aggregate report.""" | ||
|
||
SUBCOMMAND = report.SUBCOMMAND | ||
ACTION = report.AGGREGATE | ||
|
||
def __init__(self, subparsers): | ||
"""Create command.""" | ||
CliCommand.__init__( | ||
self, | ||
self.SUBCOMMAND, | ||
self.ACTION, | ||
subparsers.add_parser(self.ACTION), | ||
GET, | ||
report.REPORT_URI, | ||
[codes.ok], | ||
) | ||
id_group = self.parser.add_mutually_exclusive_group(required=True) | ||
id_group.add_argument( | ||
"--scan-job", | ||
dest="scan_job_id", | ||
metavar="SCAN_JOB_ID", | ||
help=_(messages.REPORT_SCAN_JOB_ID_HELP), | ||
) | ||
id_group.add_argument( | ||
"--report", | ||
dest="report_id", | ||
metavar="REPORT_ID", | ||
help=_(messages.REPORT_REPORT_ID_HELP), | ||
) | ||
self.report_id = None | ||
|
||
def _validate_args(self): | ||
CliCommand._validate_args(self) | ||
|
||
if not (report_id := self.args.report_id): | ||
response = request( | ||
parser=self.parser, | ||
method=GET, | ||
path=f"{scan.SCAN_JOB_URI}{self.args.scan_job_id}", | ||
payload=None, | ||
) | ||
if response.status_code == codes.ok: | ||
json_data = response.json() | ||
if not (report_id := json_data.get("report_id")): | ||
logger.error( | ||
_(messages.REPORT_NO_AGGREGATE_REPORT_FOR_SJ), | ||
self.args.scan_job_id, | ||
) | ||
sys.exit(1) | ||
else: | ||
logger.error( | ||
_(messages.REPORT_SJ_DOES_NOT_EXIST), self.args.scan_job_id | ||
) | ||
sys.exit(1) | ||
|
||
self.report_id = report_id | ||
self.req_path = f"{self.req_path}{self.report_id}{report.AGGREGATE_PATH_SUFFIX}" | ||
|
||
def _handle_response_success(self): | ||
json_data = self.response.json() | ||
data = pretty_format(json_data) | ||
print(data) | ||
|
||
def _handle_response_error(self): | ||
if self.args.report_id is None: | ||
logger.error( | ||
_(messages.REPORT_NO_AGGREGATE_REPORT_FOR_SJ), | ||
self.args.scan_job_id, | ||
) | ||
else: | ||
logger.error( | ||
_(messages.REPORT_NO_AGGREGATE_REPORT_FOR_REPORT_ID), | ||
self.args.report_id, | ||
) | ||
sys.exit(1) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
"""Test the "report aggregate" subcommand.""" | ||
|
||
import logging | ||
from argparse import ArgumentParser, Namespace | ||
from unittest.mock import patch | ||
|
||
import pytest | ||
import requests_mock | ||
|
||
from qpc import messages | ||
from qpc.report import AGGREGATE_PATH_SUFFIX, REPORT_URI | ||
from qpc.report.aggregate import ReportAggregateCommand | ||
from qpc.scan import SCAN_JOB_URI | ||
from qpc.utils import get_server_location | ||
|
||
|
||
def get_scan_job_uri(scan_job_id: int) -> str: | ||
"""Construct a scan job URI for testing.""" | ||
return f"{get_server_location()}{SCAN_JOB_URI}{scan_job_id}" | ||
|
||
|
||
def get_aggregate_report_uri(report_id: int) -> str: | ||
"""Construct an aggregate report URI for testing.""" | ||
return f"{get_server_location()}{REPORT_URI}{report_id}{AGGREGATE_PATH_SUFFIX}" | ||
|
||
|
||
def get_command(): | ||
"""Set up new command for each test.""" | ||
# We can't use a fixture for this, even though that seems reasonable, | ||
# because our command instances modify req_path on the fly. This is | ||
# definitely a bad code smell, but we are choosing to ignore it because | ||
# this design issue affects many of our commands and tests the same way. | ||
argument_parser = ArgumentParser() | ||
subparser = argument_parser.add_subparsers(dest="subcommand") | ||
return ReportAggregateCommand(subparser) | ||
|
||
|
||
@patch("qpc.report.aggregate.pretty_format") | ||
def test_aggregate_report_success_via_report(mock_pretty_format, faker, capsys): | ||
"""Test aggregate report with report id.""" | ||
report_id = faker.pyint() | ||
report_uri = get_aggregate_report_uri(report_id) | ||
report_json_data = {faker.slug(): faker.slug()} | ||
with requests_mock.Mocker() as mocker: | ||
mocker.get(report_uri, status_code=200, json=report_json_data) | ||
args = Namespace(scan_job_id=None, report_id=report_id) | ||
get_command().main(args) | ||
mock_pretty_format.assert_called_once_with(report_json_data) | ||
out, err = capsys.readouterr() | ||
assert str(mock_pretty_format.return_value) in out | ||
assert not err | ||
|
||
|
||
@patch("qpc.report.aggregate.pretty_format") | ||
def test_aggregate_report_success_via_scan_job(mock_pretty_format, faker, capsys): | ||
"""Test aggregate report with scan job id.""" | ||
report_id = faker.pyint() | ||
scan_job_id = faker.pyint() | ||
scan_job_uri = get_scan_job_uri(scan_job_id) | ||
scan_job_json_data = {"id": scan_job_id, "report_id": report_id} | ||
aggregate_report_uri = get_aggregate_report_uri(report_id) | ||
aggregate_report_json_data = {faker.slug(): faker.slug()} | ||
with requests_mock.Mocker() as mocker: | ||
mocker.get(scan_job_uri, status_code=200, json=scan_job_json_data) | ||
mocker.get( | ||
aggregate_report_uri, status_code=200, json=aggregate_report_json_data | ||
) | ||
args = Namespace(scan_job_id=scan_job_id, report_id=None) | ||
get_command().main(args) | ||
mock_pretty_format.assert_called_once_with(aggregate_report_json_data) | ||
out, err = capsys.readouterr() | ||
assert str(mock_pretty_format.return_value) in out | ||
assert not err | ||
|
||
|
||
def test_aggregate_report_but_scan_job_does_not_exist(faker, caplog): | ||
"""Test aggregate report with scan job id that does not exist.""" | ||
scan_job_id = faker.pyint() | ||
scan_job_uri = get_scan_job_uri(scan_job_id) | ||
with requests_mock.Mocker() as mocker: | ||
mocker.get(scan_job_uri, status_code=400) | ||
args = Namespace(scan_job_id=scan_job_id, report_id=None) | ||
with pytest.raises(SystemExit): | ||
get_command().main(args) | ||
|
||
expected_error = messages.REPORT_SJ_DOES_NOT_EXIST % scan_job_id | ||
assert expected_error in caplog.text | ||
|
||
|
||
def test_deployments_report_but_scan_job_has_no_report(faker, caplog): | ||
"""Test aggregate report with scan job id that has no report id.""" | ||
scan_job_id = faker.pyint() | ||
scan_job_uri = get_scan_job_uri(scan_job_id) | ||
scan_job_json_data = {"id": scan_job_id} | ||
with requests_mock.Mocker() as mocker: | ||
mocker.get(scan_job_uri, status_code=200, json=scan_job_json_data) | ||
args = Namespace(scan_job_id=scan_job_id, report_id=None) | ||
with pytest.raises(SystemExit): | ||
get_command().main(args) | ||
expected_error = messages.REPORT_NO_AGGREGATE_REPORT_FOR_SJ % scan_job_id | ||
assert expected_error in caplog.text | ||
|
||
|
||
def test_aggregate_report_but_report_does_not_exist(faker, caplog): | ||
"""Test aggregate report with nonexistent report id.""" | ||
unknown_report_id = faker.pyint() | ||
aggregate_report_uri = get_aggregate_report_uri(unknown_report_id) | ||
with requests_mock.Mocker() as mocker: | ||
mocker.get(aggregate_report_uri, status_code=400) | ||
args = Namespace(scan_job_id=None, report_id=unknown_report_id) | ||
with caplog.at_level(logging.ERROR): | ||
with pytest.raises(SystemExit): | ||
get_command().main(args) | ||
expected_error = ( | ||
messages.REPORT_NO_AGGREGATE_REPORT_FOR_REPORT_ID % unknown_report_id | ||
) | ||
assert expected_error in caplog.text |