From ecb88c8ea9ce93f3ace9e43fafa9bab5bd0bc974 Mon Sep 17 00:00:00 2001 From: pk-zipstack Date: Mon, 23 Feb 2026 14:21:30 +0530 Subject: [PATCH 1/3] [FEAT] Expose total_pages_processed in execution API response and metadata Surface page usage data from PageUsage model in API responses to support tracking total pages processed per file execution and per workflow execution. Co-Authored-By: Claude Opus 4.6 (1M context) --- backend/usage_v2/helper.py | 21 ++++++++++++++++ .../endpoint_v2/destination.py | 13 ++++++---- .../execution/serializer/execution.py | 5 ++++ .../workflow_v2/file_execution_tasks.py | 24 ++++++++++++------- .../workflow_v2/models/execution.py | 21 ++++++++++++++++ 5 files changed, 70 insertions(+), 14 deletions(-) diff --git a/backend/usage_v2/helper.py b/backend/usage_v2/helper.py index 8cefb3b403..9a5ae7209a 100644 --- a/backend/usage_v2/helper.py +++ b/backend/usage_v2/helper.py @@ -2,6 +2,7 @@ from datetime import date, datetime, time, timedelta from typing import Any +from account_usage.models import PageUsage from django.db.models import Count, QuerySet, Sum from django.utils import timezone from rest_framework.exceptions import APIException, ValidationError @@ -23,6 +24,26 @@ class UsageHelper: + @staticmethod + def get_aggregated_pages_processed(run_id: str) -> int | None: + """Retrieve aggregated pages processed for the given run_id. + + Args: + run_id (str): The identifier for the page usage (file execution ID). + + Returns: + int | None: Total pages processed, or None if no records found. + """ + try: + queryset = PageUsage.objects.filter(run_id=run_id) + if not queryset.exists(): + return None + result = queryset.aggregate(total_pages=Sum("pages_processed")) + return result.get("total_pages") + except Exception as e: + logger.error(f"Error aggregating pages processed for run_id {run_id}: {e}") + return None + @staticmethod def get_aggregated_token_count(run_id: str) -> dict: """Retrieve aggregated token counts for the given run_id. diff --git a/backend/workflow_manager/endpoint_v2/destination.py b/backend/workflow_manager/endpoint_v2/destination.py index 0e63e129a2..f9a7507d2b 100644 --- a/backend/workflow_manager/endpoint_v2/destination.py +++ b/backend/workflow_manager/endpoint_v2/destination.py @@ -5,9 +5,14 @@ import os from typing import Any +from backend.exceptions import UnstractFSException from connector_v2.models import ConnectorInstance from plugins.workflow_manager.workflow_v2.utils import WorkflowUtil from rest_framework.exceptions import APIException +from unstract.connectors.exceptions import ConnectorError +from unstract.sdk1.constants import ToolExecKey +from unstract.sdk1.tool.mime_types import EXT_MIME_MAP +from unstract.workflow_execution.constants import ToolOutputType from usage_v2.helper import UsageHelper from utils.user_context import UserContext from workflow_manager.endpoint_v2.base_connector import BaseConnector @@ -34,12 +39,7 @@ from workflow_manager.workflow_v2.models.file_history import FileHistory from workflow_manager.workflow_v2.models.workflow import Workflow -from backend.exceptions import UnstractFSException -from unstract.connectors.exceptions import ConnectorError from unstract.filesystem import FileStorageType, FileSystem -from unstract.sdk1.constants import ToolExecKey -from unstract.sdk1.tool.mime_types import EXT_MIME_MAP -from unstract.workflow_execution.constants import ToolOutputType logger = logging.getLogger(__name__) @@ -694,6 +694,9 @@ def get_combined_metadata(self) -> dict[str, Any]: # Combine both metadata workflow_metadata["usage"] = usage_metadata + workflow_metadata["total_pages_processed"] = ( + UsageHelper.get_aggregated_pages_processed(run_id=file_execution_id) + ) return workflow_metadata diff --git a/backend/workflow_manager/execution/serializer/execution.py b/backend/workflow_manager/execution/serializer/execution.py index 04065cdcc5..0f440cc8ae 100644 --- a/backend/workflow_manager/execution/serializer/execution.py +++ b/backend/workflow_manager/execution/serializer/execution.py @@ -10,6 +10,7 @@ class ExecutionSerializer(serializers.ModelSerializer): pipeline_name = serializers.SerializerMethodField() successful_files = serializers.SerializerMethodField() failed_files = serializers.SerializerMethodField() + aggregated_total_pages_processed = serializers.SerializerMethodField() execution_time = serializers.ReadOnlyField(source="pretty_execution_time") class Meta: @@ -31,3 +32,7 @@ def get_successful_files(self, obj: WorkflowExecution) -> int: def get_failed_files(self, obj: WorkflowExecution) -> int: """Return the count of failed executed files""" return obj.file_executions.filter(status=ExecutionStatus.ERROR.value).count() + + def get_aggregated_total_pages_processed(self, obj: WorkflowExecution) -> int | None: + """Return the total pages processed across all file executions.""" + return obj.aggregated_total_pages_processed diff --git a/backend/workflow_manager/workflow_v2/file_execution_tasks.py b/backend/workflow_manager/workflow_v2/file_execution_tasks.py index a9b3ab8717..dc63de51d1 100644 --- a/backend/workflow_manager/workflow_v2/file_execution_tasks.py +++ b/backend/workflow_manager/workflow_v2/file_execution_tasks.py @@ -3,15 +3,6 @@ from uuid import UUID from account_v2.constants import Common -from django.conf import settings -from plugins.workflow_manager.workflow_v2.api_hub_usage_utils import APIHubUsageUtil -from plugins.workflow_manager.workflow_v2.utils import WorkflowUtil -from tool_instance_v2.constants import ToolInstanceKey -from tool_instance_v2.models import ToolInstance -from tool_instance_v2.tool_instance_helper import ToolInstanceHelper -from utils.constants import Account -from utils.local_context import StateStore - from backend.workers.file_processing.constants import ( QueueNames as FileProcessingQueueNames, ) @@ -22,6 +13,12 @@ from backend.workers.file_processing_callback.file_processing_callback import ( app as file_processing_callback_app, ) +from django.conf import settings +from plugins.workflow_manager.workflow_v2.api_hub_usage_utils import APIHubUsageUtil +from plugins.workflow_manager.workflow_v2.utils import WorkflowUtil +from tool_instance_v2.constants import ToolInstanceKey +from tool_instance_v2.models import ToolInstance +from tool_instance_v2.tool_instance_helper import ToolInstanceHelper from unstract.core.file_execution_tracker import ( FileExecutionData, FileExecutionStage, @@ -32,6 +29,10 @@ from unstract.core.tool_execution_status import ToolExecutionData, ToolExecutionTracker from unstract.workflow_execution.enums import LogComponent, LogStage, LogState from unstract.workflow_execution.exceptions import StopExecution +from usage_v2.helper import UsageHelper +from utils.constants import Account +from utils.local_context import StateStore + from workflow_manager.endpoint_v2.destination import DestinationConnector from workflow_manager.endpoint_v2.dto import ( DestinationConfig, @@ -1035,6 +1036,11 @@ def _process_final_output( if destination.is_api: execution_metadata = destination.get_metadata(file_history) + if execution_metadata is not None: + total_pages = UsageHelper.get_aggregated_pages_processed( + run_id=file_execution_id + ) + execution_metadata["total_pages_processed"] = total_pages if cls._should_create_file_history( destination=destination, file_history=file_history, diff --git a/backend/workflow_manager/workflow_v2/models/execution.py b/backend/workflow_manager/workflow_v2/models/execution.py index 11b5fdb37f..f677569acb 100644 --- a/backend/workflow_manager/workflow_v2/models/execution.py +++ b/backend/workflow_manager/workflow_v2/models/execution.py @@ -2,6 +2,7 @@ import uuid from datetime import timedelta +from account_usage.models import PageUsage from api_v2.models import APIDeployment from django.core.exceptions import ObjectDoesNotExist from django.db import models @@ -258,6 +259,26 @@ def aggregated_usage_cost(self) -> float | None: return total_cost + @property + def aggregated_total_pages_processed(self) -> int | None: + """Retrieve aggregated total pages processed for this execution. + + Returns: + int | None: Total pages processed across all file executions, + or None if no page usage data exists. + """ + file_execution_ids = list(self.file_executions.values_list("id", flat=True)) + if not file_execution_ids: + return None + + str_ids = [str(fid) for fid in file_execution_ids] + queryset = PageUsage.objects.filter(run_id__in=str_ids) + if not queryset.exists(): + return None + + result = queryset.aggregate(total_pages=Sum("pages_processed")) + return result.get("total_pages") + @property def is_completed(self) -> bool: return ExecutionStatus.is_completed(self.status) From 25de508a58fe02fcd2f91e0160daafd91540af59 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 23 Feb 2026 08:52:05 +0000 Subject: [PATCH 2/3] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../endpoint_v2/destination.py | 10 +++++----- .../workflow_v2/file_execution_tasks.py | 20 +++++++++---------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/backend/workflow_manager/endpoint_v2/destination.py b/backend/workflow_manager/endpoint_v2/destination.py index f9a7507d2b..093c64b905 100644 --- a/backend/workflow_manager/endpoint_v2/destination.py +++ b/backend/workflow_manager/endpoint_v2/destination.py @@ -5,14 +5,9 @@ import os from typing import Any -from backend.exceptions import UnstractFSException from connector_v2.models import ConnectorInstance from plugins.workflow_manager.workflow_v2.utils import WorkflowUtil from rest_framework.exceptions import APIException -from unstract.connectors.exceptions import ConnectorError -from unstract.sdk1.constants import ToolExecKey -from unstract.sdk1.tool.mime_types import EXT_MIME_MAP -from unstract.workflow_execution.constants import ToolOutputType from usage_v2.helper import UsageHelper from utils.user_context import UserContext from workflow_manager.endpoint_v2.base_connector import BaseConnector @@ -39,7 +34,12 @@ from workflow_manager.workflow_v2.models.file_history import FileHistory from workflow_manager.workflow_v2.models.workflow import Workflow +from backend.exceptions import UnstractFSException +from unstract.connectors.exceptions import ConnectorError from unstract.filesystem import FileStorageType, FileSystem +from unstract.sdk1.constants import ToolExecKey +from unstract.sdk1.tool.mime_types import EXT_MIME_MAP +from unstract.workflow_execution.constants import ToolOutputType logger = logging.getLogger(__name__) diff --git a/backend/workflow_manager/workflow_v2/file_execution_tasks.py b/backend/workflow_manager/workflow_v2/file_execution_tasks.py index dc63de51d1..3c5f5c0a93 100644 --- a/backend/workflow_manager/workflow_v2/file_execution_tasks.py +++ b/backend/workflow_manager/workflow_v2/file_execution_tasks.py @@ -3,6 +3,16 @@ from uuid import UUID from account_v2.constants import Common +from django.conf import settings +from plugins.workflow_manager.workflow_v2.api_hub_usage_utils import APIHubUsageUtil +from plugins.workflow_manager.workflow_v2.utils import WorkflowUtil +from tool_instance_v2.constants import ToolInstanceKey +from tool_instance_v2.models import ToolInstance +from tool_instance_v2.tool_instance_helper import ToolInstanceHelper +from usage_v2.helper import UsageHelper +from utils.constants import Account +from utils.local_context import StateStore + from backend.workers.file_processing.constants import ( QueueNames as FileProcessingQueueNames, ) @@ -13,12 +23,6 @@ from backend.workers.file_processing_callback.file_processing_callback import ( app as file_processing_callback_app, ) -from django.conf import settings -from plugins.workflow_manager.workflow_v2.api_hub_usage_utils import APIHubUsageUtil -from plugins.workflow_manager.workflow_v2.utils import WorkflowUtil -from tool_instance_v2.constants import ToolInstanceKey -from tool_instance_v2.models import ToolInstance -from tool_instance_v2.tool_instance_helper import ToolInstanceHelper from unstract.core.file_execution_tracker import ( FileExecutionData, FileExecutionStage, @@ -29,10 +33,6 @@ from unstract.core.tool_execution_status import ToolExecutionData, ToolExecutionTracker from unstract.workflow_execution.enums import LogComponent, LogStage, LogState from unstract.workflow_execution.exceptions import StopExecution -from usage_v2.helper import UsageHelper -from utils.constants import Account -from utils.local_context import StateStore - from workflow_manager.endpoint_v2.destination import DestinationConnector from workflow_manager.endpoint_v2.dto import ( DestinationConfig, From adf2dc2809373ab618e6d5363044fa74878d9013 Mon Sep 17 00:00:00 2001 From: pk-zipstack Date: Wed, 25 Feb 2026 15:11:41 +0530 Subject: [PATCH 3/3] [FIX] Address review: unify page aggregation logic, remove broad exception handling - Unify UsageHelper.get_aggregated_pages_processed() to accept either run_id or run_ids, eliminating duplicate PageUsage query logic - WorkflowExecution.aggregated_total_pages_processed now delegates to UsageHelper instead of duplicating the aggregation - Remove broad try/except so exceptions bubble up to middleware Co-Authored-By: Claude Opus 4.6 (1M context) --- backend/usage_v2/helper.py | 27 ++++++++++++------- .../workflow_v2/models/execution.py | 13 ++++----- 2 files changed, 22 insertions(+), 18 deletions(-) diff --git a/backend/usage_v2/helper.py b/backend/usage_v2/helper.py index 9a5ae7209a..0b211514ab 100644 --- a/backend/usage_v2/helper.py +++ b/backend/usage_v2/helper.py @@ -25,24 +25,31 @@ class UsageHelper: @staticmethod - def get_aggregated_pages_processed(run_id: str) -> int | None: - """Retrieve aggregated pages processed for the given run_id. + def get_aggregated_pages_processed( + run_id: str | None = None, + run_ids: list[str] | None = None, + ) -> int | None: + """Retrieve aggregated pages processed for given run_id(s). + + Provide either a single run_id or a list of run_ids. Args: - run_id (str): The identifier for the page usage (file execution ID). + run_id: Single file execution ID. + run_ids: List of file execution IDs. Returns: int | None: Total pages processed, or None if no records found. """ - try: + if run_id: queryset = PageUsage.objects.filter(run_id=run_id) - if not queryset.exists(): - return None - result = queryset.aggregate(total_pages=Sum("pages_processed")) - return result.get("total_pages") - except Exception as e: - logger.error(f"Error aggregating pages processed for run_id {run_id}: {e}") + elif run_ids: + queryset = PageUsage.objects.filter(run_id__in=run_ids) + else: + return None + if not queryset.exists(): return None + result = queryset.aggregate(total_pages=Sum("pages_processed")) + return result.get("total_pages") @staticmethod def get_aggregated_token_count(run_id: str) -> dict: diff --git a/backend/workflow_manager/workflow_v2/models/execution.py b/backend/workflow_manager/workflow_v2/models/execution.py index f677569acb..1ce6162942 100644 --- a/backend/workflow_manager/workflow_v2/models/execution.py +++ b/backend/workflow_manager/workflow_v2/models/execution.py @@ -2,7 +2,6 @@ import uuid from datetime import timedelta -from account_usage.models import PageUsage from api_v2.models import APIDeployment from django.core.exceptions import ObjectDoesNotExist from django.db import models @@ -267,17 +266,15 @@ def aggregated_total_pages_processed(self) -> int | None: int | None: Total pages processed across all file executions, or None if no page usage data exists. """ + from usage_v2.helper import UsageHelper + file_execution_ids = list(self.file_executions.values_list("id", flat=True)) if not file_execution_ids: return None - str_ids = [str(fid) for fid in file_execution_ids] - queryset = PageUsage.objects.filter(run_id__in=str_ids) - if not queryset.exists(): - return None - - result = queryset.aggregate(total_pages=Sum("pages_processed")) - return result.get("total_pages") + return UsageHelper.get_aggregated_pages_processed( + run_ids=[str(fid) for fid in file_execution_ids] + ) @property def is_completed(self) -> bool: