Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactored coverage processor in to class hierarchy #230

Merged

Conversation

coderustic
Copy link
Contributor

@coderustic coderustic commented Nov 20, 2024

User description

  • Expose a unified interface for different formats
    ** CoverageData & CoverageReport
  • Class hierarchy of coverage processors that are initialized
    via a factory
  • CoverageProcessors are abstracted from feature flag
  • Filtering of report based on feature flag set

PR Type

enhancement, tests


Description

  • Introduced a new class hierarchy for processing coverage reports, including CoverageData and CoverageReport data classes.
  • Implemented specific processors for Cobertura, Lcov, and Jacoco coverage formats.
  • Added a factory class to create appropriate coverage processors based on the tool type.
  • Developed tests for the new coverage processor classes, ensuring correct functionality and coverage.
  • Made minor formatting adjustments in existing files.

Changes walkthrough 📝

Relevant files
Enhancement
processor.py
Implement coverage processing with a new class hierarchy 

cover_agent/coverage/processor.py

  • Introduced a new class hierarchy for coverage processing.
  • Added CoverageData and CoverageReport data classes.
  • Implemented multiple coverage processors for different formats
    (Cobertura, Lcov, Jacoco).
  • Added a factory class for creating coverage processors.
  • +300/-0 
    Tests
    test_processor.py
    Add tests for coverage processor classes                                 

    tests/coverage/test_processor.py

  • Added tests for CoverageProcessorFactory and CoverageProcessor.
  • Included tests for different coverage processors (Cobertura, Jacoco,
    Lcov).
  • Used pytest fixtures and mock for testing.
  • +97/-0   
    Formatting
    CoverageProcessor.py
    Minor formatting adjustment                                                           

    cover_agent/CoverageProcessor.py

    • Minor formatting change to ensure no newline at end of file.
    +1/-1     
    test_CoverageProcessor.py
    Remove unnecessary newline                                                             

    tests/test_CoverageProcessor.py

    • Removed an unnecessary newline at the end of the file.
    +0/-1     

    💡 PR-Agent usage: Comment /help "your question" on any pull request to receive relevant information

    Copy link
    Contributor

    PR-Agent was enabled for this repository. To continue using it, please link your git user with your CodiumAI identity here.

    PR Reviewer Guide 🔍

    Here are some key observations to aid the review process:

    ⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪
    🧪 PR contains tests
    🔒 No security concerns identified
    ⚡ Recommended focus areas for review

    Potential Bug
    The coverage percentage calculation in CoverageReportFilter.filter_report() may raise a ZeroDivisionError if all files are filtered out

    Type Error
    The JacocoProcessor.parse_coverage_report() method creates CoverageData with incorrect parameter name 'coverage_percentag' (missing 'e')

    Code Smell
    The JacocoProcessor._parse_jacoco_csv() method returns a tuple but its type hint indicates it returns a Dict[str, CoverageData]

    Documentation Issue
    Typo in CoverageData class docstring - 'nubmers' instead of 'numbers' in missed_lines description

    Copy link
    Contributor

    qodo-merge-pro bot commented Nov 20, 2024

    PR-Agent was enabled for this repository. To continue using it, please link your git user with your CodiumAI identity here.

    PR Code Suggestions ✨

    Explore these optional code suggestions:

    CategorySuggestion                                                                                                                                    Score
    Possible issue
    ✅ Fix incorrect parameter name and provide all required arguments for dataclass instantiation
    Suggestion Impact:The suggestion corrected the parameter name from 'coverage_percentag' to 'coverage_percentage' and added the required arguments 'covered_lines' and 'missed_lines' for the CoverageData instantiation.

    code diff:

    -        coverage[class_name] = CoverageData(covered=covered, missed=missed, coverage_percentag=coverage_percentage)
    +        coverage[class_name] = CoverageData(covered_lines=[], covered=covered, missed_lines=[], missed=missed, coverage=coverage_percentage)

    Fix the typo in the parameter name 'coverage_percentag' to 'coverage_percentage'
    when creating CoverageData in JacocoProcessor.parse_coverage_report()

    cover_agent/coverage/processor.py [167]

    -coverage[class_name] = CoverageData(covered=covered, missed=missed, coverage_percentag=coverage_percentage)
    +coverage[class_name] = CoverageData(covered_lines=[], covered=covered, missed_lines=[], missed=missed, coverage=coverage_percentage)
    • Apply this suggestion
    Suggestion importance[1-10]: 9

    Why: The suggestion fixes a critical bug where the code would fail at runtime due to incorrect parameter name and missing required arguments for the CoverageData dataclass, which is marked as frozen.

    9
    ✅ Add protection against division by zero when calculating coverage percentages
    Suggestion Impact:The suggestion was implemented by adding a check for division by zero when calculating total_coverage, using total_lines to ensure it is greater than zero before performing the division.

    code diff:

    -            total_coverage=(sum(cov.covered_lines for cov in filtered_coverage.values()) / 
    -                         sum(cov.covered_lines + cov.missed_lines for cov in filtered_coverage.values()))
    -                         if filtered_coverage else 0.0,
    +        total_lines = sum(len(cov.covered_lines) + len(cov.missed_lines) for cov in filtered_coverage.values())
    +        total_coverage = (sum(len(cov.covered_lines) for cov in filtered_coverage.values()) / total_lines) if total_lines > 0 else 0.0,

    Add error handling for division by zero when calculating total_coverage in
    CoverageReportFilter.filter_report()

    cover_agent/coverage/processor.py [244-246]

    -total_coverage=(sum(cov.covered_lines for cov in filtered_coverage.values()) / 
    -                     sum(cov.covered_lines + cov.missed_lines for cov in filtered_coverage.values()))
    -                     if filtered_coverage else 0.0,
    +total_lines = sum(len(cov.covered_lines) + len(cov.missed_lines) for cov in filtered_coverage.values())
    +total_coverage = (sum(len(cov.covered_lines) for cov in filtered_coverage.values()) / total_lines) if total_lines > 0 else 0.0,
    • Apply this suggestion
    Suggestion importance[1-10]: 8

    Why: The suggestion prevents potential runtime errors by adding proper handling for division by zero, and improves the calculation logic by using len() for list counts.

    8
    ✅ Prevent race condition by capturing file modification time before processing
    Suggestion Impact:The commit implements the suggestion by moving the file modification time calculation to be passed as a parameter to process_coverage()

    code diff:

         tool_type: str,
    +    time_of_test_command: int,
         report_path: str,
         src_file_path: str,
         is_global_coverage_enabled: bool = True,
    -    file_pattern: Optional[str] = None
    +    file_pattern: Optional[str] = None,
    +    diff_coverage_report_path: Optional[str] = None
     ) -> CoverageReport:
         # Create appropriate processor
    -    processor = CoverageProcessorFactory.create_processor(tool_type, report_path, src_file_path)
    +    processor = CoverageProcessorFactory.create_processor(tool_type, report_path, src_file_path, diff_coverage_report_path)
         
         # Process full report
    -    report = processor.process_coverage_report(time_of_test_command=int(os.path.getmtime(report_path) * 1000))
    +    report = processor.process_coverage_report(time_of_test_command=time_of_test_command)

    Fix potential race condition in process_coverage() by capturing file modification
    time before processing

    cover_agent/coverage/processor.py [293]

    -report = processor.process_coverage_report(time_of_test_command=int(os.path.getmtime(report_path) * 1000))
    +report_mtime = int(os.path.getmtime(report_path) * 1000)
    +report = processor.process_coverage_report(time_of_test_command=report_mtime)
    • Apply this suggestion
    Suggestion importance[1-10]: 7

    Why: The suggestion prevents a potential race condition where the file modification time could change between when it's checked and when it's used, improving code reliability.

    7

    💡 Need additional feedback ? start a PR chat

    @coderustic coderustic force-pushed the feature/improve-coverage-processor branch from 8491408 to a60c58e Compare November 24, 2024 15:17
    @coderustic coderustic force-pushed the feature/improve-coverage-processor branch from a60c58e to cdd3a21 Compare December 26, 2024 02:33
    @coderustic
    Copy link
    Contributor Author

    /review

    Copy link
    Contributor

    qodo-merge-pro bot commented Dec 27, 2024

    PR Reviewer Guide 🔍

    (Review updated until commit d6ec168)

    Here are some key observations to aid the review process:

    ⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪
    🧪 PR contains tests
    🔒 No security concerns identified
    ⚡ Recommended focus areas for review

    Code Complexity

    The new coverage processor implementation introduces significant complexity with multiple classes and inheritance hierarchies. While this provides better abstraction, it may make the code harder to maintain and debug.

    from abc import ABC, abstractmethod
    from dataclasses import dataclass
    from cover_agent.CustomLogger import CustomLogger
    from typing import Dict, Optional, List, Tuple, Union
    import csv
    import os
    import re
    import json
    import xml.etree.ElementTree as ET
    
    @dataclass(frozen=True)
    class CoverageData:
        """
        A class to represent coverage data.
    
        Attributes:
            covered_lines (int): The line numbers that are covered by tests.
            covered (int)      : The number of lines that are covered by tests.
            missed_lines (int) : The line numbers that are not covered by tests.
            missed (int)       : The number of lines that are not covered by tests.
            coverage (float)   : The coverage percentage of the file or class.
        """
        covered_lines: List[int]
        covered: int
        missed_lines: List[int]
        missed: int
        coverage: float
    
    @dataclass
    class CoverageReport:
        """
        A class to represent the coverage report of a project.
    
        Attributes:
        ----------
        total_coverage : float
            The total coverage percentage of the project.
        file_coverage : Dict[str, CoverageData]
            A dictionary mapping file names to their respective coverage data.
        """
        total_coverage: float
        file_coverage: Dict[str, CoverageData]
    
    class CoverageProcessor(ABC):
        """
        Abstract base class for processing coverage reports.
    
        Attributes:
            file_path (str): The path to the coverage report file.
            src_file_path (str): The path to the source file.
            logger (Logger): The logger object for logging messages.
        Methods:
            parse_coverage_report() -> Union[Tuple[list, list, float], dict]:
                Abstract method to parse the coverage report.
    
            process_coverage_report(time_of_test_command: int) -> Union[Tuple[list, list, float], dict]:
                Processes the coverage report and returns the coverage data.
    
            _is_report_exist():
                Checks if the coverage report file exists.
    
            _is_report_obsolete(time_of_test_command: int):
                Checks if the coverage report file is obsolete based on the test command time.
        """
        def __init__(
            self,
            file_path: str,
            src_file_path: str,
        ):
            self.file_path = file_path
            self.src_file_path = src_file_path
            self.logger = CustomLogger.get_logger(__name__)
    
        @abstractmethod
        def parse_coverage_report(self) -> Dict[str, CoverageData]:
            pass
    
        def process_coverage_report(self, time_of_test_command: int) -> CoverageReport:
            self._is_coverage_valid(time_of_test_command=time_of_test_command)
            coverage = self.parse_coverage_report()
            report = CoverageReport(0.0, coverage)
            if coverage:
                total_covered = sum(cov.covered for cov in coverage.values())
                total_missed = sum(cov.missed for cov in coverage.values())
                total_lines = total_covered + total_missed
                report.total_coverage = (float(total_covered) / float(total_lines)) if total_lines > 0 else 0.0
            return report
    
        def _is_coverage_valid(
            self, time_of_test_command: int
        ) ->  None:
            if not self._is_report_exist():
                raise FileNotFoundError(f'Coverage report "{self.file_path}" not found')
            if self._is_report_obsolete(time_of_test_command):
                raise ValueError("Coverage report is outdated")
    
        def _is_report_exist(self) -> bool:
            return os.path.exists(self.file_path)
    
        def _is_report_obsolete(self, time_of_test_command: int) -> bool:
            return int(round(os.path.getmtime(self.file_path) * 1000)) < time_of_test_command
    
    class CoberturaProcessor(CoverageProcessor):
        def parse_coverage_report(self) -> Dict[str, CoverageData]:
            tree = ET.parse(self.file_path)
            root = tree.getroot()
            coverage = {}
            for cls in root.findall(".//class"):
                cls_filename = cls.get("filename")
                if cls_filename:
                    coverage[cls_filename] = self._parse_coverage_data_for_class(cls)
            return coverage
    
        def _parse_coverage_data_for_class(self, cls) -> CoverageData:
            lines_covered, lines_missed = [], []
            for line in cls.findall(".//line"):
                line_number = int(line.get("number"))
                hits = int(line.get("hits"))
                if hits > 0:
                    lines_covered.append(line_number)
                else:
                    lines_missed.append(line_number)
            total_lines = len(lines_covered) + len(lines_missed)
            coverage_percentage = (float(len(lines_covered)) / total_lines) if total_lines > 0 else 0.0
            return CoverageData(lines_covered, len(lines_covered), lines_missed, len(lines_missed), coverage_percentage)
    
    class LcovProcessor(CoverageProcessor):
        def parse_coverage_report(self) -> Dict[str, CoverageData]:
            coverage = {}
            try:
                with open(self.file_path, "r") as file:
                    for line in file:
                        line = line.strip()
                        if line.startswith("SF:"):
                            filename = line[3:]
                            lines_covered, lines_missed = [], []
                            for line in file:
                                line = line.strip()
                                if line.startswith("DA:"):
                                    line_number, hits = map(int, line[3:].split(","))
                                    if hits > 0:
                                        lines_covered.append(int(line_number))
                                    else:
                                        lines_missed.append(int(line_number))
                                elif line.startswith("end_of_record"):
                                    break
                            total_lines = len(lines_covered) + len(lines_missed)
                            coverage_percentage = (float(len(lines_covered)) / total_lines) if total_lines > 0 else 0.0
                            coverage[filename] = CoverageData(lines_covered, len(lines_covered), lines_missed, len(lines_missed), coverage_percentage)
            except (FileNotFoundError, IOError) as e:
                self.logger.error(f"Error reading file {self.file_path}: {e}")
                raise
            return coverage
    
    class JacocoProcessor(CoverageProcessor):
        def parse_coverage_report(self) -> Dict[str, CoverageData]:
            coverage = {}
            package_name, class_name = self._extract_package_and_class_java()
            file_extension = self._get_file_extension(self.file_path)
            if file_extension == 'xml':
                missed, covered = self._parse_jacoco_xml(class_name=class_name)
            elif file_extension == 'csv':
                missed, covered = self._parse_jacoco_csv(package_name=package_name, class_name=class_name)
            else:
                raise ValueError(f"Unsupported JaCoCo code coverage report format: {file_extension}")
            total_lines = missed + covered
            coverage_percentage = (float(covered) / total_lines) if total_lines > 0 else 0.0
            coverage[class_name] = CoverageData(covered_lines=[], covered=covered, missed_lines=[], missed=missed, coverage=coverage_percentage)
            return coverage
    
        def _get_file_extension(self, filename: str) -> str | None:
            """Get the file extension from a given filename."""
            return os.path.splitext(filename)[1].lstrip(".")
    
        def _extract_package_and_class_java(self):
            package_pattern = re.compile(r"^\s*package\s+([\w\.]+)\s*;.*$")
            class_pattern = re.compile(r"^\s*public\s+class\s+(\w+).*")
    
            package_name = ""
            class_name = ""
            try:
                with open(self.src_file_path, "r") as file:
                    for line in file:
                        if not package_name:  # Only match package if not already found
                            package_match = package_pattern.match(line)
                            if package_match:
                                package_name = package_match.group(1)
    
                        if not class_name:  # Only match class if not already found
                            class_match = class_pattern.match(line)
                            if class_match:
                                class_name = class_match.group(1)
    
                        if package_name and class_name:  # Exit loop if both are found
                            break
            except (FileNotFoundError, IOError) as e:
                self.logger.error(f"Error reading file {self.src_file_path}: {e}")
                raise
    
            return package_name, class_name
    
        def _parse_jacoco_xml(
            self, class_name: str
        ) -> tuple[int, int]:
            """Parses a JaCoCo XML code coverage report to extract covered and missed line numbers for a specific file."""
            tree = ET.parse(self.file_path)
            root = tree.getroot()
            sourcefile = root.find(f".//sourcefile[@name='{class_name}.java']")
    
            if sourcefile is None:
                return 0, 0
    
            missed, covered = 0, 0
            for counter in sourcefile.findall('counter'):
                if counter.attrib.get('type') == 'LINE':
                    missed += int(counter.attrib.get('missed', 0))
                    covered += int(counter.attrib.get('covered', 0))
                    break
    
            return missed, covered
        def _parse_jacoco_csv(self, package_name, class_name) -> Dict[str, CoverageData]:
            with open(self.file_path, "r") as file:
                reader = csv.DictReader(file)
                missed, covered = 0, 0
                for row in reader:
                    if row["PACKAGE"] == package_name and row["CLASS"] == class_name:
                        try:
                            missed = int(row["LINE_MISSED"])
                            covered = int(row["LINE_COVERED"])
                            break
                        except KeyError as e:
                            self.logger.error(f"Missing expected column in CSV: {e}")
                            raise
    
            return missed, covered
    
    class DiffCoverageProcessor(CoverageProcessor):
        def __init__(
            self,
            diff_coverage_report_path: str,
            file_path: str,
            src_file_path: str,
        ):
            super().__init__(file_path, src_file_path)
            self.diff_coverage_report_path = diff_coverage_report_path
    
        def parse_coverage_report(self) -> Dict[str, CoverageData]:
            """
            Parses a JSON-formatted diff coverage report to extract covered lines, missed lines,
            and the coverage percentage for the specified src_file_path.
            Returns:
                Tuple[List[int], List[int], float]: A tuple containing lists of covered and missed lines,
                                                    and the coverage percentage.
            """
            with open(self.diff_coverage_report_path, "r") as file:
                report_data = json.load(file)
    
            # Create relative path components of `src_file_path` for matching
            src_relative_path = os.path.relpath(self.src_file_path)
            src_relative_components = src_relative_path.split(os.sep)
    
            # Initialize variables for covered and missed lines
            relevant_stats = None
            coverage = {}
            for file_path, stats in report_data["src_stats"].items():
                # Split the JSON's file path into components
                file_path_components = file_path.split(os.sep)
    
                # Match if the JSON path ends with the same components as `src_file_path`
                if (
                    file_path_components[-len(src_relative_components) :]
                    == src_relative_components
                ):
                    relevant_stats = stats
                    break
    
            # If a match is found, extract the data
            if relevant_stats:
                covered_lines = relevant_stats["covered_lines"]
                violation_lines = relevant_stats["violation_lines"]
                coverage_percentage = (
                    relevant_stats["percent_covered"] / 100
                )  # Convert to decimal
            else:
                # Default values if the file isn't found in the report
                covered_lines = []
                violation_lines = []
                coverage_percentage = 0.0
    
            coverage[self.file_path] = CoverageData(covered_lines=covered_lines, covered=len(covered_lines), missed_lines=violation_lines,missed=len(violation_lines), coverage=coverage_percentage)
            return coverage
    
    class CoverageReportFilter:
        def filter_report(self, report: CoverageReport, file_pattern: str) -> CoverageReport:
            filtered_coverage = {
                file: coverage 
                for file, coverage in report.file_coverage.items()
                if file_pattern in file
            }
            total_lines = sum(len(cov.covered_lines) + len(cov.missed_lines) for cov in filtered_coverage.values())
            total_coverage = (sum(len(cov.covered_lines) for cov in filtered_coverage.values()) / total_lines) if total_lines > 0 else 0.0
            return CoverageReport(total_coverage = total_coverage, file_coverage=filtered_coverage)
    
    class CoverageProcessorFactory:
        """Factory for creating coverage processors based on tool type."""
    
        @staticmethod
        def create_processor(
            tool_type: str,
            report_path: str, 
            src_file_path: str,
            diff_coverage_report_path: Optional[str] = None
        ) -> CoverageProcessor:
            """
            Creates appropriate coverage processor instance.
    
            Args:
                tool_type: Coverage tool type (cobertura/jacoco/lcov)
                report_path: Path to coverage report
                src_file_path: Path to source file
    
            Returns:
                CoverageProcessor instance
    
            Raises:
                ValueError: If invalid tool type specified
            """
            processors = {
                'cobertura': CoberturaProcessor,
                'jacoco': JacocoProcessor,
                'lcov': LcovProcessor,
                'diff_cover_json': DiffCoverageProcessor
            }
            if tool_type.lower() not in processors:
                raise ValueError(f"Invalid coverage type specified: {tool_type}")
            if tool_type.lower() == 'diff_cover_json':
                return DiffCoverageProcessor(diff_coverage_report_path, report_path, src_file_path)
            return processors[tool_type.lower()](report_path, src_file_path)
    
    def process_coverage(
        tool_type: str,
        time_of_test_command: int,
        report_path: str,
        src_file_path: str,
        is_global_coverage_enabled: bool = True,
        file_pattern: Optional[str] = None,
        diff_coverage_report_path: Optional[str] = None
    ) -> CoverageReport:
        # Create appropriate processor
        processor = CoverageProcessorFactory.create_processor(tool_type, report_path, src_file_path, diff_coverage_report_path)
    
        # Process full report
        report = processor.process_coverage_report(time_of_test_command=time_of_test_command)
    
        # Apply filtering if needed
        if not is_global_coverage_enabled and file_pattern:
            filter = CoverageReportFilter()
            report = filter.filter_report(report, file_pattern)
    
        return report
    Potential Bug

    The transition from direct coverage percentage access to using CoverageReport objects could introduce edge cases where coverage data is not properly initialized or accessed.

    def get_coverage(self):
        """
        Run code coverage and build the prompt to be used for generating tests.
    
        Returns:
            None
        """
        # Run coverage and build the prompt
        self.run_coverage()
        # Run diff coverage if enabled
        if self.diff_coverage:
            self.generate_diff_coverage_report()
        return self.failed_test_runs, self.language, self.testing_framework, self.code_coverage_report

    @coderustic coderustic force-pushed the feature/improve-coverage-processor branch 2 times, most recently from f2fa10a to d6ec168 Compare December 27, 2024 22:07
    @coderustic coderustic changed the title Draft: Initial Proposal for redesign of coverage processor Refactored coverage processor in to class hierarchy Dec 27, 2024
    @coderustic
    Copy link
    Contributor Author

    /review

    Copy link
    Contributor

    Persistent review updated to latest commit d6ec168

    @coderustic
    Copy link
    Contributor Author

    @mrT23 @EmbeddedDevops1 This is ready for a review. I would appreciate a review and meanwhile I might add more tests.

    * A new class hierarchy for processing coverage from different tools
    
    * New interface representing CoverageReport and CoverageData
    
    * A factory to create the appropriate coverage processor
    @coderustic coderustic force-pushed the feature/improve-coverage-processor branch from d6ec168 to 7c6a7c8 Compare December 29, 2024 16:47
    @EmbeddedDevops1 EmbeddedDevops1 self-assigned this Dec 30, 2024
    Comment on lines 74 to 338
    self,
    diff_coverage_report_path: str,
    file_path: str,
    src_file_path: str,
    ):
    super().__init__(file_path, src_file_path)
    self.diff_coverage_report_path = diff_coverage_report_path

    def parse_coverage_report(self) -> Dict[str, CoverageData]:
    """
    Parses a JSON-formatted diff coverage report to extract covered lines, missed lines,
    and the coverage percentage for the specified src_file_path.
    Returns:
    Tuple[List[int], List[int], float]: A tuple containing lists of covered and missed lines,
    and the coverage percentage.
    """
    with open(self.diff_coverage_report_path, "r") as file:
    report_data = json.load(file)

    # Create relative path components of `src_file_path` for matching
    src_relative_path = os.path.relpath(self.src_file_path)
    src_relative_components = src_relative_path.split(os.sep)

    # Initialize variables for covered and missed lines
    relevant_stats = None
    coverage = {}
    for file_path, stats in report_data["src_stats"].items():
    # Split the JSON's file path into components
    file_path_components = file_path.split(os.sep)

    # Match if the JSON path ends with the same components as `src_file_path`
    if (
    file_path_components[-len(src_relative_components) :]
    == src_relative_components
    ):
    relevant_stats = stats
    break

    # If a match is found, extract the data
    if relevant_stats:
    covered_lines = relevant_stats["covered_lines"]
    violation_lines = relevant_stats["violation_lines"]
    coverage_percentage = (
    relevant_stats["percent_covered"] / 100
    ) # Convert to decimal
    else:
    # Default values if the file isn't found in the report
    covered_lines = []
    violation_lines = []
    coverage_percentage = 0.0

    coverage[self.file_path] = CoverageData(covered_lines=covered_lines, covered=len(covered_lines), missed_lines=violation_lines,missed=len(violation_lines), coverage=coverage_percentage)
    return coverage

    class CoverageReportFilter:
    def filter_report(self, report: CoverageReport, file_pattern: str) -> CoverageReport:
    filtered_coverage = {
    file: coverage
    for file, coverage in report.file_coverage.items()
    if file_pattern in file
    }
    total_lines = sum(len(cov.covered_lines) + len(cov.missed_lines) for cov in filtered_coverage.values())
    total_coverage = (sum(len(cov.covered_lines) for cov in filtered_coverage.values()) / total_lines) if total_lines > 0 else 0.0
    return CoverageReport(total_coverage = total_coverage, file_coverage=filtered_coverage)

    class CoverageProcessorFactory:
    """Factory for creating coverage processors based on tool type."""

    @staticmethod
    def create_processor(
    tool_type: str,
    report_path: str,
    src_file_path: str,
    diff_coverage_report_path: Optional[str] = None
    ) -> CoverageProcessor:
    """
    Creates appropriate coverage processor instance.

    Args:
    tool_type: Coverage tool type (cobertura/jacoco/lcov)
    report_path: Path to coverage report
    src_file_path: Path to source file

    Returns:
    CoverageProcessor instance

    Raises:
    ValueError: If invalid tool type specified
    """
    processors = {
    'cobertura': CoberturaProcessor,
    'jacoco': JacocoProcessor,
    'lcov': LcovProcessor,
    'diff_cover_json': DiffCoverageProcessor
    }
    if tool_type.lower() not in processors:
    raise ValueError(f"Invalid coverage type specified: {tool_type}")
    if tool_type.lower() == 'diff_cover_json':
    return DiffCoverageProcessor(diff_coverage_report_path, report_path, src_file_path)
    return processors[tool_type.lower()](report_path, src_file_path)
    Copy link
    Collaborator

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    @coderustic Can you add some inline comments to this code?

    Copy link
    Contributor Author

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    I added documentation for the methods. Let me know if anything specific is required.

    @coderustic
    Copy link
    Contributor Author

    @qododavid @EmbeddedDevops1 If everything looks like appreciate if you can merge this.

    @EmbeddedDevops1
    Copy link
    Collaborator

    Sorry - last request: Need to update the version.txt so this change goes into an official release.

    @EmbeddedDevops1 EmbeddedDevops1 merged commit 3496069 into qodo-ai:main Jan 3, 2025
    7 checks passed
    coderustic added a commit to coderustic/cover-agent that referenced this pull request Jan 5, 2025
    * While earlier PR[qodo-ai#230] managed to breakdown processing
      code into a class hierarechy, there wasnt any changes
      made to the code. This PR brings in enhancements to
      coverage processing where coverage data is stored by
      entity (Class or File).
    
    * Coverage data is stored using a FQDN so that conflicts
      are taken care. This closes[qodo-ai#251]
    
    * Earlier PR broke the behaviour of the agent that only
      target file coverage is considered if the global coverage
      flag is not set by the user, this PR fixes it to bring
      back the original behaviour.
    coderustic added a commit to coderustic/cover-agent that referenced this pull request Jan 7, 2025
    * While earlier PR[qodo-ai#230] managed to breakdown processing
      code into a class hierarechy, there wasnt any changes
      made to the code. This PR brings in enhancements to
      coverage processing where coverage data is stored by
      entity (Class or File).
    
    * Coverage data is stored using a FQDN so that conflicts
      are taken care. This closes[qodo-ai#251]
    
    * Earlier PR broke the behaviour of the agent that only
      target file coverage is considered if the global coverage
      flag is not set by the user, this PR fixes it to bring
      back the original behaviour.
    coderustic added a commit to coderustic/cover-agent that referenced this pull request Jan 7, 2025
    * While earlier PR[qodo-ai#230] managed to breakdown processing
      code into a class hierarechy, there wasnt any changes
      made to the code. This PR brings in enhancements to
      coverage processing where coverage data is stored by
      entity (Class or File).
    
    * Coverage data is stored using a FQDN so that conflicts
      are taken care. This closes[qodo-ai#251]
    
    * Earlier PR broke the behaviour of the agent that only
      target file coverage is considered if the global coverage
      flag is not set by the user, this PR fixes it to bring
      back the original behaviour.
    EmbeddedDevops1 pushed a commit that referenced this pull request Jan 7, 2025
    * Enhanced coverage processing (#2)
    
    * While earlier PR[#230] managed to breakdown processing
      code into a class hierarechy, there wasnt any changes
      made to the code. This PR brings in enhancements to
      coverage processing where coverage data is stored by
      entity (Class or File).
    
    * Coverage data is stored using a FQDN so that conflicts
      are taken care. This closes[#251]
    
    * Earlier PR broke the behaviour of the agent that only
      target file coverage is considered if the global coverage
      flag is not set by the user, this PR fixes it to bring
      back the original behaviour.
    
    * removed sample-reports
    
    * bump version
    Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
    Projects
    None yet
    Development

    Successfully merging this pull request may close these issues.

    3 participants