Skip to content

Commit

Permalink
Provide ability to run test generation without source or test files (#…
Browse files Browse the repository at this point in the history
…143)

* Adding expanded functionality for cobertura report processing.

Added project level coverage checking and more tests.

Added more tests.

Increment version.

Setting arg.

Fixed args.

Adding arg.

* Incrementing version.
  • Loading branch information
EmbeddedDevops1 authored Aug 12, 2024
1 parent 18a5d71 commit 9634b99
Show file tree
Hide file tree
Showing 9 changed files with 295 additions and 57 deletions.
1 change: 1 addition & 0 deletions cover_agent/CoverAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def __init__(self, args):
additional_instructions=args.additional_instructions,
llm_model=args.model,
api_base=args.api_base,
use_report_coverage_feature_flag=args.use_report_coverage_feature_flag,
)

def _validate_paths(self):
Expand Down
111 changes: 75 additions & 36 deletions cover_agent/CoverageProcessor.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from typing import Literal, Tuple
from cover_agent.CustomLogger import CustomLogger
from typing import Literal, Tuple, Union
import csv
import os
import time
import re
import csv
import xml.etree.ElementTree as ET
from cover_agent.CustomLogger import CustomLogger


class CoverageProcessor:
Expand All @@ -13,6 +12,7 @@ def __init__(
file_path: str,
src_file_path: str,
coverage_type: Literal["cobertura", "lcov", "jacoco"],
use_report_coverage_feature_flag: bool = False,
):
"""
Initializes a CoverageProcessor object.
Expand All @@ -35,6 +35,7 @@ def __init__(
self.src_file_path = src_file_path
self.coverage_type = coverage_type
self.logger = CustomLogger.get_logger(__name__)
self.use_report_coverage_feature_flag = use_report_coverage_feature_flag

def process_coverage_report(
self, time_of_test_command: int
Expand Down Expand Up @@ -74,51 +75,89 @@ def verify_report_update(self, time_of_test_command: int):
), f"Fatal: The coverage report file was not updated after the test command. file_mod_time_ms: {file_mod_time_ms}, time_of_test_command: {time_of_test_command}. {file_mod_time_ms > time_of_test_command}"

def parse_coverage_report(self) -> Tuple[list, list, float]:
"""
Parses a code coverage report to extract covered and missed line numbers for a specific file,
and calculates the coverage percentage, based on the specified coverage report type.
Returns:
Tuple[list, list, float]: A tuple containing lists of covered and missed line numbers, and the coverage percentage.
"""
if self.use_report_coverage_feature_flag:
if self.coverage_type == "cobertura":
return self.parse_coverage_report_cobertura()
elif self.coverage_type == "lcov":
return self.parse_coverage_report_lcov()
elif self.coverage_type == "jacoco":
return self.parse_coverage_report_jacoco()
else:
raise ValueError(f"Unsupported coverage report type: {self.coverage_type}")
else:
if self.coverage_type == "cobertura":
# Default behavior is to parse out a single file from the report
return self.parse_coverage_report_cobertura(filename=os.path.basename(self.src_file_path))
elif self.coverage_type == "lcov":
return self.parse_coverage_report_lcov()
elif self.coverage_type == "jacoco":
return self.parse_coverage_report_jacoco()
else:
raise ValueError(f"Unsupported coverage report type: {self.coverage_type}")

def parse_coverage_report_cobertura(self, filename: str = None) -> Union[Tuple[list, list, float], dict]:
"""
Parses a code coverage report to extract covered and missed line numbers for a specific file,
and calculates the coverage percentage, based on the specified coverage report type.
Parses a Cobertura XML code coverage report to extract covered and missed line numbers for a specific file
or all files, and calculates the coverage percentage.
Args:
filename (str, optional): The name of the file to process. If None, processes all files.
Returns:
Tuple[list, list, float]: A tuple containing lists of covered and missed line numbers, and the coverage percentage.
Union[Tuple[list, list, float], dict]: If filename is provided, returns a tuple
containing lists of covered and missed line numbers, and the coverage percentage.
If filename is None, returns a dictionary with filenames as keys and a tuple
containing lists of covered and missed line numbers, and the coverage percentage
as values.
"""
if self.coverage_type == "cobertura":
return self.parse_coverage_report_cobertura()
elif self.coverage_type == "lcov":
return self.parse_coverage_report_lcov()
elif self.coverage_type == "jacoco":
return self.parse_coverage_report_jacoco()
else:
raise ValueError(f"Unsupported coverage report type: {self.coverage_type}")
tree = ET.parse(self.file_path)
root = tree.getroot()

def parse_coverage_report_cobertura(self) -> Tuple[list, list, float]:
if filename:
for cls in root.findall(".//class"):
name_attr = cls.get("filename")
if name_attr and name_attr.endswith(filename):
return self.parse_coverage_data_for_class(cls)
return [], [], 0.0 # Return empty lists if the file is not found
else:
coverage_data = {}
for cls in root.findall(".//class"):
cls_filename = cls.get("filename")
if cls_filename:
lines_covered, lines_missed, coverage_percentage = self.parse_coverage_data_for_class(cls)
coverage_data[cls_filename] = (lines_covered, lines_missed, coverage_percentage)
return coverage_data

def parse_coverage_data_for_class(self, cls) -> Tuple[list, list, float]:
"""
Parses a Cobertura XML code coverage report to extract covered and missed line numbers for a specific file,
and calculates the coverage percentage.
Parses coverage data for a single class.
Args:
cls (Element): XML element representing the class.
Returns:
Tuple[list, list, float]: A tuple containing lists of covered and missed line numbers, and the coverage percentage.
Tuple[list, list, float]: A tuple containing lists of covered and missed line numbers,
and the coverage percentage.
"""
tree = ET.parse(self.file_path)
root = tree.getroot()
lines_covered, lines_missed = [], []
filename = os.path.basename(self.src_file_path)

for cls in root.findall(".//class"):
name_attr = cls.get("filename")
if name_attr and name_attr.endswith(filename):
for line in cls.findall(".//line"):
line_number = int(line.get("number"))
hits = int(line.get("hits"))
if hits > 0:
lines_covered.append(line_number)
else:
lines_missed.append(line_number)
break # Assuming filename is unique, break after finding and processing it
for line in cls.findall(".//line"):
line_number = int(line.get("number"))
hits = int(line.get("hits"))
if hits > 0:
lines_covered.append(line_number)
else:
lines_missed.append(line_number)

total_lines = len(lines_covered) + len(lines_missed)
coverage_percentage = (
(len(lines_covered) / total_lines) if total_lines > 0 else 0
)
coverage_percentage = (len(lines_covered) / total_lines) if total_lines > 0 else 0

return lines_covered, lines_missed, coverage_percentage

Expand Down
91 changes: 85 additions & 6 deletions cover_agent/UnitTestGenerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def __init__(
coverage_type="cobertura",
desired_coverage: int = 90, # Default to 90% coverage if not specified
additional_instructions: str = "",
use_report_coverage_feature_flag: bool = False,
):
"""
Initialize the UnitTestGenerator class with the provided parameters.
Expand All @@ -45,6 +46,9 @@ def __init__(
coverage_type (str, optional): The type of coverage report. Defaults to "cobertura".
desired_coverage (int, optional): The desired coverage percentage. Defaults to 90.
additional_instructions (str, optional): Additional instructions for test generation. Defaults to an empty string.
use_report_coverage_feature_flag (bool, optional): Setting this to True considers the coverage of all the files in the coverage report.
This means we consider a test as good if it increases coverage for a different
file other than the source file. Defaults to False.
Returns:
None
Expand All @@ -60,6 +64,8 @@ def __init__(
self.desired_coverage = desired_coverage
self.additional_instructions = additional_instructions
self.language = self.get_code_language(source_file_path)
self.use_report_coverage_feature_flag = use_report_coverage_feature_flag
self.last_coverage_percentages = {}

# Objects to instantiate
self.ai_caller = AICaller(model=llm_model, api_base=api_base)
Expand Down Expand Up @@ -138,15 +144,47 @@ def run_coverage(self):
file_path=self.code_coverage_report_path,
src_file_path=self.source_file_path,
coverage_type=self.coverage_type,
use_report_coverage_feature_flag=self.use_report_coverage_feature_flag
)

# Use the process_coverage_report method of CoverageProcessor, passing in the time the test command was executed
try:
lines_covered, lines_missed, percentage_covered = (
coverage_processor.process_coverage_report(
if self.use_report_coverage_feature_flag:
self.logger.info(
"Using the report coverage feature flag to process the coverage report"
)
file_coverage_dict = coverage_processor.process_coverage_report(
time_of_test_command=time_of_test_command
)
)
total_lines_covered = 0
total_lines_missed = 0
total_lines = 0
for key in file_coverage_dict:
lines_covered, lines_missed, percentage_covered = (
file_coverage_dict[key]
)
total_lines_covered += len(lines_covered)
total_lines_missed += len(lines_missed)
total_lines += len(lines_covered) + len(lines_missed)
if key == self.source_file_path:
self.last_source_file_coverage = percentage_covered
if key not in self.last_coverage_percentages:
self.last_coverage_percentages[key] = 0
self.last_coverage_percentages[key] = percentage_covered
percentage_covered = total_lines_covered / total_lines

self.logger.info(
f"Total lines covered: {total_lines_covered}, Total lines missed: {total_lines_missed}, Total lines: {total_lines}"
)
self.logger.info(
f"coverage: Percentage {round(percentage_covered * 100, 2)}%"
)
else:
lines_covered, lines_missed, percentage_covered = (
coverage_processor.process_coverage_report(
time_of_test_command=time_of_test_command
)
)

# Process the extracted coverage metrics
self.current_coverage = percentage_covered
Expand Down Expand Up @@ -538,12 +576,38 @@ def validate_test(self, generated_test: dict, generated_tests_dict: dict, num_at
file_path=self.code_coverage_report_path,
src_file_path=self.source_file_path,
coverage_type=self.coverage_type,
use_report_coverage_feature_flag=self.use_report_coverage_feature_flag,
)
_, _, new_percentage_covered = (
new_coverage_processor.process_coverage_report(
coverage_percentages = {}

if self.use_report_coverage_feature_flag:
self.logger.info(
"Using the report coverage feature flag to process the coverage report"
)
file_coverage_dict = new_coverage_processor.process_coverage_report(
time_of_test_command=time_of_test_command
)
)
total_lines_covered = 0
total_lines_missed = 0
total_lines = 0
for key in file_coverage_dict:
lines_covered, lines_missed, percentage_covered = (
file_coverage_dict[key]
)
total_lines_covered += len(lines_covered)
total_lines_missed += len(lines_missed)
total_lines += len(lines_covered) + len(lines_missed)
if key not in coverage_percentages:
coverage_percentages[key] = 0
coverage_percentages[key] = percentage_covered

new_percentage_covered = total_lines_covered / total_lines
else:
_, _, new_percentage_covered = (
new_coverage_processor.process_coverage_report(
time_of_test_command=time_of_test_command
)
)

if new_percentage_covered <= self.current_coverage:
# Coverage has not increased, rollback the test by removing it from the test file
Expand Down Expand Up @@ -610,6 +674,21 @@ def validate_test(self, generated_test: dict, generated_tests_dict: dict, num_at
) # this is important, otherwise the next test will be inserted at the wrong line

self.current_coverage = new_percentage_covered


for key in coverage_percentages:
if key not in self.last_coverage_percentages:
self.last_coverage_percentages[key] = 0
if coverage_percentages[key] > self.last_coverage_percentages[key] and key == self.source_file_path.split("/")[-1]:
self.logger.info(
f"Coverage for provided source file: {key} increased from {round(self.last_coverage_percentages[key] * 100, 2)} to {round(coverage_percentages[key] * 100, 2)}"
)
elif coverage_percentages[key] > self.last_coverage_percentages[key]:
self.logger.info(
f"Coverage for non-source file: {key} increased from {round(self.last_coverage_percentages[key] * 100, 2)} to {round(coverage_percentages[key] * 100, 2)}"
)
self.last_coverage_percentages[key] = coverage_percentages[key]

self.logger.info(
f"Test passed and coverage increased. Current coverage: {round(new_percentage_covered * 100, 2)}%"
)
Expand Down
6 changes: 5 additions & 1 deletion cover_agent/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,11 @@ def parse_args():
default=1,
help="Number of times to run the tests generated by Cover Agent. Default: %(default)s.",
)

parser.add_argument(
"--use-report-coverage-feature-flag",
action="store_true",
help="Setting this to True considers the coverage of all the files in the coverage report. This means we consider a test as good if it increases coverage for a different file other than the source file. Default: False.",
)
return parser.parse_args()


Expand Down
2 changes: 1 addition & 1 deletion cover_agent/version.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.1.47
0.1.48
56 changes: 56 additions & 0 deletions tests/test_CoverAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,59 @@ def test_agent_test_file_not_found(
agent = CoverAgent(args)

assert str(exc_info.value) == f"Test file not found at {args.test_file_path}"

@patch("cover_agent.CoverAgent.shutil.copy")
@patch("cover_agent.CoverAgent.os.path.isfile", return_value=True)
def test_duplicate_test_file_with_output_path(self, mock_isfile, mock_copy):
args = argparse.Namespace(
source_file_path="test_source.py",
test_file_path="test_file.py",
test_file_output_path="output_test_file.py",
code_coverage_report_path="coverage_report.xml",
test_command="echo hello",
test_command_dir=os.getcwd(),
included_files=None,
coverage_type="cobertura",
report_filepath="test_results.html",
desired_coverage=90,
max_iterations=10,
additional_instructions="",
model="openai/test-model",
api_base="openai/test-api",
use_report_coverage_feature_flag=False
)

with pytest.raises(AssertionError) as exc_info:
agent = CoverAgent(args)
agent._duplicate_test_file()

assert "Fatal: Coverage report" in str(exc_info.value)
mock_copy.assert_called_once_with(args.test_file_path, args.test_file_output_path)

@patch("cover_agent.CoverAgent.os.path.isfile", return_value=True)
def test_duplicate_test_file_without_output_path(self, mock_isfile):
args = argparse.Namespace(
source_file_path="test_source.py",
test_file_path="test_file.py",
test_file_output_path="",
code_coverage_report_path="coverage_report.xml",
test_command="echo hello",
test_command_dir=os.getcwd(),
included_files=None,
coverage_type="cobertura",
report_filepath="test_results.html",
desired_coverage=90,
max_iterations=10,
additional_instructions="",
model="openai/test-model",
api_base="openai/test-api",
use_report_coverage_feature_flag=False
)

with pytest.raises(AssertionError) as exc_info:
agent = CoverAgent(args)
agent._duplicate_test_file()

assert "Fatal: Coverage report" in str(exc_info.value)
assert args.test_file_output_path == args.test_file_path

Loading

0 comments on commit 9634b99

Please sign in to comment.