Skip to content

Commit

Permalink
fix: add logging and error handling for issue #721 (#755)
Browse files Browse the repository at this point in the history
  • Loading branch information
jamsea authored Nov 29, 2024
1 parent c5324df commit 5e22ef2
Showing 1 changed file with 19 additions and 7 deletions.
26 changes: 19 additions & 7 deletions src/pipecat/processors/frame_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@

import asyncio
import inspect

from enum import Enum
from typing import Awaitable, Callable, Optional

from loguru import logger

from pipecat.clocks.base_clock import BaseClock
from pipecat.frames.frames import (
EndFrame,
Expand All @@ -24,8 +25,6 @@
from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics
from pipecat.utils.utils import obj_count, obj_id

from loguru import logger


class FrameDirection(Enum):
DOWNSTREAM = 1
Expand Down Expand Up @@ -220,11 +219,16 @@ def _register_event_handler(self, event_name: str):
#

async def _start_interruption(self):
# Cancel the push frame task. This will stop pushing frames downstream.
await self.__cancel_push_task()
try:
# Cancel the push frame task. This will stop pushing frames downstream.
await self.__cancel_push_task()

# Cancel the input task. This will stop processing queued frames.
await self.__cancel_input_task()
# Cancel the input task. This will stop processing queued frames.
await self.__cancel_input_task()
except Exception as e:
logger.exception(f"Uncaught exception in {self}: {e}")
await self.push_error(ErrorFrame(str(e)))
raise

# Create a new input queue and task.
self.__create_input_task()
Expand Down Expand Up @@ -281,7 +285,11 @@ async def __input_frame_task_handler(self):

self.__input_queue.task_done()
except asyncio.CancelledError:
logger.trace(f"Cancelled input task in {self}")
break
except Exception as e:
logger.exception(f"Uncaught exception in {self}: {e}")
await self.push_error(ErrorFrame(str(e)))

def __create_push_task(self):
self.__push_queue = asyncio.Queue()
Expand All @@ -300,7 +308,11 @@ async def __push_frame_task_handler(self):
running = not isinstance(frame, EndFrame)
self.__push_queue.task_done()
except asyncio.CancelledError:
logger.trace(f"Cancelled push task in {self}")
break
except Exception as e:
logger.exception(f"Uncaught exception in {self}: {e}")
await self.push_error(ErrorFrame(str(e)))

async def _call_event_handler(self, event_name: str, *args, **kwargs):
try:
Expand Down

0 comments on commit 5e22ef2

Please sign in to comment.