Skip to content

ENHANCEMENT:Queue Worker Inefficient Polling #251

@vaishcodescape

Description

@vaishcodescape

Is there an existing issue for this?

  • I have searched the existing issues

What happened?

Queue workers use inefficient polling with asyncio.sleep(0.1) in a tight loop, causing CPU waste and up to 100ms message processing delays.

Location: backend/app/core/orchestration/queue_manager.py:99-124

Current Behavior

async def _worker(self, worker_name: str):
    queues = [...]
    while self.running:
        for queue in queues:
            message = await queue.get(no_ack=False, fail=False)
            if message:
                # process message...
        await asyncio.sleep(0.1)  # ❌ Polls every 100ms even when empty

Impact:

  • 30 polls/second (10 per worker × 3 workers) when queues are empty
  • Up to 100ms delay before messages are processed
  • Unnecessary CPU usage during idle periods

Expected Behavior

Workers should block and wait for messages instead of polling:

  • Near-instant message processing (eliminate 100ms delay)
  • ~90% reduction in CPU usage during idle
  • Better resource efficiency

Proposed Solution

Use asyncio.wait() to block until messages arrive:

async def _worker(self, worker_name: str):
    queues = [
        await self.channel.declare_queue(self.queues[priority], durable=True)
        for priority in [QueuePriority.HIGH, QueuePriority.MEDIUM, QueuePriority.LOW]
    ]
    
    while self.running:
        try:
            done, pending = await asyncio.wait(
                [queue.get(no_ack=False) for queue in queues],
                return_when=asyncio.FIRST_COMPLETED,
                timeout=1.0
            )
            
            if not self.running:
                break
                
            for task in done:
                message = await task
                if message:
                    item = json.loads(message.body.decode())
                    await self._process_item(item, worker_name)
                    await message.ack()
                    
            for task in pending:
                task.cancel()
                
        except asyncio.CancelledError:
            return
        except Exception as e:
            logger.error(f"Worker {worker_name} error: {e}")
            await asyncio.sleep(1)

Testing Checklist

  • Messages processed immediately when enqueued
  • CPU usage near zero during idle periods
  • Priority handling (HIGH → MEDIUM → LOW) maintained
  • No message duplication with multiple workers
  • Graceful shutdown works correctly

Performance Targets

  • CPU reduction: ~90% during idle
  • Latency: <10ms (from current ~100ms)
  • Throughput: Maintain or improve

Additional Context

This is part of a broader performance optimization effort. The current implementation works but is inefficient.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions