diff --git a/backend/usage_v2/helper.py b/backend/usage_v2/helper.py index 8cefb3b403..0b211514ab 100644 --- a/backend/usage_v2/helper.py +++ b/backend/usage_v2/helper.py @@ -2,6 +2,7 @@ from datetime import date, datetime, time, timedelta from typing import Any +from account_usage.models import PageUsage from django.db.models import Count, QuerySet, Sum from django.utils import timezone from rest_framework.exceptions import APIException, ValidationError @@ -23,6 +24,33 @@ class UsageHelper: + @staticmethod + def get_aggregated_pages_processed( + run_id: str | None = None, + run_ids: list[str] | None = None, + ) -> int | None: + """Retrieve aggregated pages processed for given run_id(s). + + Provide either a single run_id or a list of run_ids. + + Args: + run_id: Single file execution ID. + run_ids: List of file execution IDs. + + Returns: + int | None: Total pages processed, or None if no records found. + """ + if run_id: + queryset = PageUsage.objects.filter(run_id=run_id) + elif run_ids: + queryset = PageUsage.objects.filter(run_id__in=run_ids) + else: + return None + if not queryset.exists(): + return None + result = queryset.aggregate(total_pages=Sum("pages_processed")) + return result.get("total_pages") + @staticmethod def get_aggregated_token_count(run_id: str) -> dict: """Retrieve aggregated token counts for the given run_id. diff --git a/backend/workflow_manager/endpoint_v2/destination.py b/backend/workflow_manager/endpoint_v2/destination.py index 0e63e129a2..093c64b905 100644 --- a/backend/workflow_manager/endpoint_v2/destination.py +++ b/backend/workflow_manager/endpoint_v2/destination.py @@ -694,6 +694,9 @@ def get_combined_metadata(self) -> dict[str, Any]: # Combine both metadata workflow_metadata["usage"] = usage_metadata + workflow_metadata["total_pages_processed"] = ( + UsageHelper.get_aggregated_pages_processed(run_id=file_execution_id) + ) return workflow_metadata diff --git a/backend/workflow_manager/execution/serializer/execution.py b/backend/workflow_manager/execution/serializer/execution.py index 04065cdcc5..0f440cc8ae 100644 --- a/backend/workflow_manager/execution/serializer/execution.py +++ b/backend/workflow_manager/execution/serializer/execution.py @@ -10,6 +10,7 @@ class ExecutionSerializer(serializers.ModelSerializer): pipeline_name = serializers.SerializerMethodField() successful_files = serializers.SerializerMethodField() failed_files = serializers.SerializerMethodField() + aggregated_total_pages_processed = serializers.SerializerMethodField() execution_time = serializers.ReadOnlyField(source="pretty_execution_time") class Meta: @@ -31,3 +32,7 @@ def get_successful_files(self, obj: WorkflowExecution) -> int: def get_failed_files(self, obj: WorkflowExecution) -> int: """Return the count of failed executed files""" return obj.file_executions.filter(status=ExecutionStatus.ERROR.value).count() + + def get_aggregated_total_pages_processed(self, obj: WorkflowExecution) -> int | None: + """Return the total pages processed across all file executions.""" + return obj.aggregated_total_pages_processed diff --git a/backend/workflow_manager/workflow_v2/file_execution_tasks.py b/backend/workflow_manager/workflow_v2/file_execution_tasks.py index a9b3ab8717..3c5f5c0a93 100644 --- a/backend/workflow_manager/workflow_v2/file_execution_tasks.py +++ b/backend/workflow_manager/workflow_v2/file_execution_tasks.py @@ -9,6 +9,7 @@ from tool_instance_v2.constants import ToolInstanceKey from tool_instance_v2.models import ToolInstance from tool_instance_v2.tool_instance_helper import ToolInstanceHelper +from usage_v2.helper import UsageHelper from utils.constants import Account from utils.local_context import StateStore @@ -1035,6 +1036,11 @@ def _process_final_output( if destination.is_api: execution_metadata = destination.get_metadata(file_history) + if execution_metadata is not None: + total_pages = UsageHelper.get_aggregated_pages_processed( + run_id=file_execution_id + ) + execution_metadata["total_pages_processed"] = total_pages if cls._should_create_file_history( destination=destination, file_history=file_history, diff --git a/backend/workflow_manager/workflow_v2/models/execution.py b/backend/workflow_manager/workflow_v2/models/execution.py index 11b5fdb37f..1ce6162942 100644 --- a/backend/workflow_manager/workflow_v2/models/execution.py +++ b/backend/workflow_manager/workflow_v2/models/execution.py @@ -258,6 +258,24 @@ def aggregated_usage_cost(self) -> float | None: return total_cost + @property + def aggregated_total_pages_processed(self) -> int | None: + """Retrieve aggregated total pages processed for this execution. + + Returns: + int | None: Total pages processed across all file executions, + or None if no page usage data exists. + """ + from usage_v2.helper import UsageHelper + + file_execution_ids = list(self.file_executions.values_list("id", flat=True)) + if not file_execution_ids: + return None + + return UsageHelper.get_aggregated_pages_processed( + run_ids=[str(fid) for fid in file_execution_ids] + ) + @property def is_completed(self) -> bool: return ExecutionStatus.is_completed(self.status)