-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Description
Profile-Agent Assignment Tracking
Objective
Implement a comprehensive tracking system that maintains real-time visibility of OctoBrowser profile assignments to Skyvern agents, enabling efficient resource management, conflict resolution, and operational monitoring.
Background
As multiple Skyvern agents operate concurrently with OctoBrowser profiles, we need robust tracking to prevent conflicts, optimize resource utilization, and provide operational visibility. This system serves as the foundation for orchestration and debugging.
Requirements
Functional Requirements
- Real-Time Assignment Tracking: Monitor active profile-agent mappings
- Conflict Detection: Identify and prevent assignment conflicts
- Resource Utilization: Track profile usage patterns and efficiency
- Historical Analytics: Maintain assignment history for optimization
- Operational Visibility: Provide dashboards and monitoring interfaces
Technical Requirements
- Thread-safe assignment operations
- Distributed tracking for multi-instance deployments
- Real-time event streaming
- Persistent state management
- High-availability design
Implementation Plan
Phase 1: Core Tracking System
-
AssignmentTracker Class
class AssignmentTracker: def __init__(self, storage_backend, event_publisher) def assign_profile(self, profile_id: str, agent_id: str) -> AssignmentResult def release_profile(self, profile_id: str, agent_id: str) -> bool def get_assignments(self, filters: AssignmentFilters) -> List[Assignment] def get_assignment_by_profile(self, profile_id: str) -> Optional[Assignment]
-
Assignment Data Model
@dataclass class Assignment: assignment_id: str profile_id: str agent_id: str workflow_id: str assigned_at: datetime last_activity: datetime status: AssignmentStatus metadata: Dict[str, Any]
Phase 2: Event System
-
Real-Time Events
- Assignment creation/release events
- Status change notifications
- Conflict alerts
- Usage metrics updates
-
Event Streaming
- WebSocket connections for real-time updates
- Event history for replay and debugging
- Subscription-based filtering
Phase 3: Analytics & Monitoring
-
Usage Analytics
- Profile utilization rates
- Agent efficiency metrics
- Resource contention analysis
- Historical trends
-
Operational Dashboards
- Real-time assignment overview
- Resource utilization visualizations
- Alert management interface
- Performance metrics display
Technical Specifications
Assignment Status Model
class AssignmentStatus(Enum):
PENDING = "pending" # Assignment requested
ACTIVE = "active" # Profile actively in use
IDLE = "idle" # Profile assigned but inactive
RELEASING = "releasing" # Profile being released
RELEASED = "released" # Assignment completed
FAILED = "failed" # Assignment failed
EXPIRED = "expired" # Assignment timed out
Storage Schema
# Primary assignment table
class AssignmentRecord:
assignment_id: UUID
profile_id: str
agent_id: str
workflow_id: str
status: AssignmentStatus
assigned_at: datetime
released_at: Optional[datetime]
last_activity: datetime
metadata: JSON
Redis Integration
# Active assignments (fast lookup)
assignments:active:{profile_id} -> {agent_id, workflow_id, assigned_at}
assignments:by_agent:{agent_id} -> [profile_id1, profile_id2, ...]
# Assignment locks (prevent conflicts)
locks:assignment:{profile_id} -> {agent_id, expires_at}
# Real-time metrics
metrics:profiles -> {profile_id: {usage_count, total_time, ...}}
metrics:agents -> {agent_id: {profiles_used, success_rate, ...}}
Integration Points
Skyvern Agent Integration
class SkyvernAgent:
def __init__(self, agent_id: str, assignment_tracker: AssignmentTracker):
self.agent_id = agent_id
self.assignment_tracker = assignment_tracker
def request_profile(self, profile_requirements) -> Assignment:
assignment = self.assignment_tracker.assign_profile(
profile_id=selected_profile.id,
agent_id=self.agent_id
)
return assignment
def release_profile(self, profile_id: str):
self.assignment_tracker.release_profile(profile_id, self.agent_id)
OctoBrowser Profile Manager
class ProfileManager:
def __init__(self, assignment_tracker: AssignmentTracker):
self.assignment_tracker = assignment_tracker
def get_profile_status(self, profile_id: str) -> ProfileStatus:
assignment = self.assignment_tracker.get_assignment_by_profile(profile_id)
return ProfileStatus(
available=assignment is None,
assigned_to=assignment.agent_id if assignment else None,
last_activity=assignment.last_activity if assignment else None
)
Conflict Resolution
Assignment Conflicts
class ConflictResolver:
def resolve_assignment_conflict(self, profile_id: str, requesting_agents: List[str]) -> Resolution:
# Priority-based resolution
if self.has_priority_agent(requesting_agents):
return self.assign_to_priority_agent(profile_id, requesting_agents)
# Time-based resolution (first-come-first-served)
return self.assign_to_first_requester(profile_id, requesting_agents)
def handle_zombie_assignments(self) -> List[str]:
# Clean up assignments with no activity
expired_assignments = self.find_expired_assignments()
for assignment in expired_assignments:
self.force_release_assignment(assignment.assignment_id)
return [a.assignment_id for a in expired_assignments]
Deadlock Prevention
class DeadlockPrevention:
def check_circular_dependencies(self, assignment_request: AssignmentRequest) -> bool:
# Implement cycle detection in assignment graph
def apply_timeout_policies(self, assignment: Assignment):
# Automatic release after timeout
if assignment.idle_time > self.max_idle_time:
self.force_release(assignment)
Event System
Event Publishing
class AssignmentEventPublisher:
def publish_assignment_created(self, assignment: Assignment):
event = AssignmentEvent(
event_type="assignment.created",
assignment_id=assignment.assignment_id,
profile_id=assignment.profile_id,
agent_id=assignment.agent_id,
timestamp=datetime.utcnow()
)
self.event_bus.publish(event)
Event Consumers
class AssignmentEventHandler:
def handle_assignment_created(self, event: AssignmentEvent):
# Update metrics
self.metrics_collector.increment_profile_usage(event.profile_id)
# Send notifications
self.notification_service.notify_assignment_created(event)
# Update dashboards
self.dashboard_updater.refresh_assignment_view()
Monitoring & Analytics
Metrics Collection
@dataclass
class AssignmentMetrics:
total_assignments: int
active_assignments: int
average_assignment_duration: timedelta
profile_utilization_rates: Dict[str, float]
agent_efficiency_scores: Dict[str, float]
conflict_frequency: float
successful_assignments: int
failed_assignments: int
Dashboard Data
class AssignmentDashboard:
def get_real_time_overview(self) -> DashboardData:
return DashboardData(
active_assignments=self.tracker.count_active_assignments(),
profile_utilization=self.calculate_utilization_rates(),
recent_activities=self.get_recent_activities(),
alerts=self.get_active_alerts()
)
API Endpoints
REST API
# GET /assignments
# GET /assignments/{assignment_id}
# POST /assignments (create assignment)
# DELETE /assignments/{assignment_id} (release assignment)
# GET /profiles/{profile_id}/assignment
# GET /agents/{agent_id}/assignments
# GET /assignments/metrics
WebSocket API
# Real-time assignment updates
ws://localhost:8000/assignments/events
# Event types:
# - assignment.created
# - assignment.released
# - assignment.status_changed
# - assignment.conflict
# - assignment.expired
Testing Strategy
Unit Tests
- Assignment creation/release logic
- Conflict detection algorithms
- Event publishing/handling
- Metrics calculation
Integration Tests
- End-to-end assignment workflows
- Multi-agent concurrent assignments
- Event system integration
- Dashboard data accuracy
Load Tests
- High-frequency assignment operations
- Concurrent conflict resolution
- Event system throughput
- Dashboard performance under load
Chaos Tests
- Network partitions
- Database failures
- Redis connectivity issues
- Agent crashes during assignments
Performance Optimization
Caching Strategy
- In-memory cache for active assignments
- Redis cache for frequently accessed data
- Cache invalidation on state changes
Database Optimization
- Proper indexing on lookup fields
- Partitioning of historical data
- Query optimization for analytics
Success Criteria
- Zero assignment conflicts under normal load
- <50ms assignment operation latency
- 99.9% assignment tracking accuracy
- Real-time dashboard updates (<1s delay)
- Complete historical audit trail
Risks & Mitigation
Risk: Split-Brain Scenarios
Mitigation: Implement distributed locking with lease expiration
Risk: Event System Overload
Mitigation: Event batching and backpressure mechanisms
Risk: Data Inconsistency
Mitigation: Eventual consistency with reconciliation processes
Dependencies
- Task 002: Profile Management System (profile identification)
- Task 003: Browser Control Integration (agent identification)
Deliverables
- AssignmentTracker implementation
- Event publishing/subscription system
- Conflict resolution mechanisms
- Real-time monitoring dashboard
- REST and WebSocket APIs
- Comprehensive test suite
- Performance monitoring tools
- Operational runbooks
- Analytics and reporting system