Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions backend/usage_v2/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from datetime import date, datetime, time, timedelta
from typing import Any

from account_usage.models import PageUsage
from django.db.models import Count, QuerySet, Sum
from django.utils import timezone
from rest_framework.exceptions import APIException, ValidationError
Expand All @@ -23,6 +24,33 @@


class UsageHelper:
@staticmethod
def get_aggregated_pages_processed(
run_id: str | None = None,
run_ids: list[str] | None = None,
) -> int | None:
"""Retrieve aggregated pages processed for given run_id(s).

Provide either a single run_id or a list of run_ids.

Args:
run_id: Single file execution ID.
run_ids: List of file execution IDs.

Returns:
int | None: Total pages processed, or None if no records found.
"""
if run_id:
queryset = PageUsage.objects.filter(run_id=run_id)
elif run_ids:
queryset = PageUsage.objects.filter(run_id__in=run_ids)
else:
return None
if not queryset.exists():
return None
result = queryset.aggregate(total_pages=Sum("pages_processed"))
return result.get("total_pages")

@staticmethod
def get_aggregated_token_count(run_id: str) -> dict:
"""Retrieve aggregated token counts for the given run_id.
Expand Down
3 changes: 3 additions & 0 deletions backend/workflow_manager/endpoint_v2/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,9 @@ def get_combined_metadata(self) -> dict[str, Any]:

# Combine both metadata
workflow_metadata["usage"] = usage_metadata
workflow_metadata["total_pages_processed"] = (
UsageHelper.get_aggregated_pages_processed(run_id=file_execution_id)
)

return workflow_metadata

Expand Down
5 changes: 5 additions & 0 deletions backend/workflow_manager/execution/serializer/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class ExecutionSerializer(serializers.ModelSerializer):
pipeline_name = serializers.SerializerMethodField()
successful_files = serializers.SerializerMethodField()
failed_files = serializers.SerializerMethodField()
aggregated_total_pages_processed = serializers.SerializerMethodField()
execution_time = serializers.ReadOnlyField(source="pretty_execution_time")

class Meta:
Expand All @@ -31,3 +32,7 @@ def get_successful_files(self, obj: WorkflowExecution) -> int:
def get_failed_files(self, obj: WorkflowExecution) -> int:
"""Return the count of failed executed files"""
return obj.file_executions.filter(status=ExecutionStatus.ERROR.value).count()

def get_aggregated_total_pages_processed(self, obj: WorkflowExecution) -> int | None:
"""Return the total pages processed across all file executions."""
return obj.aggregated_total_pages_processed
6 changes: 6 additions & 0 deletions backend/workflow_manager/workflow_v2/file_execution_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from tool_instance_v2.constants import ToolInstanceKey
from tool_instance_v2.models import ToolInstance
from tool_instance_v2.tool_instance_helper import ToolInstanceHelper
from usage_v2.helper import UsageHelper
from utils.constants import Account
from utils.local_context import StateStore

Expand Down Expand Up @@ -1035,6 +1036,11 @@ def _process_final_output(

if destination.is_api:
execution_metadata = destination.get_metadata(file_history)
if execution_metadata is not None:
total_pages = UsageHelper.get_aggregated_pages_processed(
run_id=file_execution_id
)
execution_metadata["total_pages_processed"] = total_pages
if cls._should_create_file_history(
destination=destination,
file_history=file_history,
Expand Down
18 changes: 18 additions & 0 deletions backend/workflow_manager/workflow_v2/models/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,24 @@ def aggregated_usage_cost(self) -> float | None:

return total_cost

@property
def aggregated_total_pages_processed(self) -> int | None:
"""Retrieve aggregated total pages processed for this execution.

Returns:
int | None: Total pages processed across all file executions,
or None if no page usage data exists.
"""
from usage_v2.helper import UsageHelper

file_execution_ids = list(self.file_executions.values_list("id", flat=True))
if not file_execution_ids:
return None

return UsageHelper.get_aggregated_pages_processed(
run_ids=[str(fid) for fid in file_execution_ids]
)
Comment on lines +261 to +277
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

N+1 query risk when aggregated_total_pages_processed is serialized in a list endpoint.

Each call to this property issues at minimum 2 queries (1 for file_executions.values_list, 1 for PageUsage.aggregate). The AI summary confirms this property is exposed in ExecutionSerializer for the execution list API, making this O(2N) extra queries for N executions — on top of any existing per-object properties already doing the same.

Unlike aggregated_usage_cost (which filters Usage directly by execution_id), this property must first resolve file_execution_ids and then fan out to PageUsage, because there is no direct execution_id column on PageUsage (only run_id, which maps to a file execution ID). The fan-out is structurally inherent to the data model.

Mitigation options to consider:

  1. Add a DB index on PageUsage.run_id to at least make each aggregate query fast.
  2. Batch-load in the serializer: override the list view's queryset to prefetch/annotate page totals per execution, bypassing the per-object property for list responses.
  3. Accept the cost if the list endpoint is paginated tightly (e.g., page size ≤ 10) and usage is low, but make this explicit.

Run the following to confirm there's no existing index on PageUsage.run_id:

#!/bin/bash
# Confirm PageUsage model's Meta.indexes and any migrations that add an index on run_id
rg -n "run_id" --type py -C3 backend/account_usage/models.py

# Also check migrations for any index on page_usage.run_id
rg -rn "page_usage" --type py -g "**/migrations/**" -C2
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/workflow_manager/workflow_v2/models/execution.py` around lines 261 -
277, The aggregated_total_pages_processed property issues per-object DB queries
(file_executions.values_list(...) and
UsageHelper.get_aggregated_pages_processed(...)) causing an N+1 when used in
ExecutionSerializer list endpoints; fix by either adding a DB index on
PageUsage.run_id to speed each aggregate query and adding a migration for that
index, or (preferred) batch the totals in the list view/serializer by computing
aggregated page totals for all execution IDs in one query and attaching them to
the queryset (override the list view or ExecutionSerializer to accept a
precomputed map keyed by execution id and avoid calling
aggregated_total_pages_processed per instance); reference the property
aggregated_total_pages_processed, the call to file_executions.values_list, and
UsageHelper.get_aggregated_pages_processed when implementing the change.


@property
def is_completed(self) -> bool:
return ExecutionStatus.is_completed(self.status)
Expand Down