Skip to content

Commit a2295b6

Browse files
authored
Merge pull request #91 from daily-co/pipeline-logging
Add logging for pipeline
2 parents fef1366 + 5c0ba1b commit a2295b6

File tree

4 files changed

+97
-6
lines changed

4 files changed

+97
-6
lines changed

examples/foundational/04-utterance-and-speech.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ async def main(room_url: str):
6565
simple_tts_pipeline = Pipeline([azure_tts])
6666
await simple_tts_pipeline.queue_frames(
6767
[
68-
TextFrame("My friend the LLM is going to tell a joke about llamas"),
68+
TextFrame("My friend the LLM is going to tell a joke about llamas."),
6969
EndPipeFrame(),
7070
]
7171
)

src/dailyai/pipeline/frame_processor.py

+3
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,6 @@ async def process_frame(
2929
async def interrupted(self) -> None:
3030
"""Handle any cleanup if the pipeline was interrupted."""
3131
pass
32+
33+
def __str__(self):
34+
return self.__class__.__name__

src/dailyai/pipeline/pipeline.py

+41-5
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
import asyncio
2+
import logging
23
from typing import AsyncGenerator, AsyncIterable, Iterable, List
34
from dailyai.pipeline.frame_processor import FrameProcessor
45

5-
from dailyai.pipeline.frames import EndPipeFrame, EndFrame, Frame
6+
from dailyai.pipeline.frames import AudioFrame, EndPipeFrame, EndFrame, Frame
67

78

89
class Pipeline:
@@ -17,7 +18,8 @@ def __init__(
1718
self,
1819
processors: List[FrameProcessor],
1920
source: asyncio.Queue | None = None,
20-
sink: asyncio.Queue[Frame] | None = None
21+
sink: asyncio.Queue[Frame] | None = None,
22+
name: str | None = None,
2123
):
2224
"""Create a new pipeline. By default we create the sink and source queues
2325
if they're not provided, but these can be overridden to point to other
@@ -29,6 +31,11 @@ def __init__(
2931
self.source: asyncio.Queue[Frame] = source or asyncio.Queue()
3032
self.sink: asyncio.Queue[Frame] = sink or asyncio.Queue()
3133

34+
self._logger = logging.getLogger("dailyai.pipeline")
35+
self._last_log_line = ""
36+
self._shown_repeated_log = False
37+
self._name = name or str(id(self))
38+
3239
def set_source(self, source: asyncio.Queue[Frame]):
3340
"""Set the source queue for this pipeline. Frames from this queue
3441
will be processed by each frame_processor in the pipeline, or order
@@ -85,6 +92,7 @@ async def run_pipeline(self):
8592
async for frame in self._run_pipeline_recursively(
8693
initial_frame, self._processors
8794
):
95+
self._log_frame(frame, len(self._processors) + 1)
8896
await self.sink.put(frame)
8997

9098
if isinstance(initial_frame, EndFrame) or isinstance(
@@ -96,18 +104,46 @@ async def run_pipeline(self):
96104
# here.
97105
for processor in self._processors:
98106
await processor.interrupted()
99-
pass
100107

101108
async def _run_pipeline_recursively(
102-
self, initial_frame: Frame, processors: List[FrameProcessor]
109+
self, initial_frame: Frame, processors: List[FrameProcessor], depth=1
103110
) -> AsyncGenerator[Frame, None]:
104111
"""Internal function to add frames to the pipeline as they're yielded
105112
by each processor."""
106113
if processors:
114+
self._log_frame(initial_frame, depth)
107115
async for frame in processors[0].process_frame(initial_frame):
108116
async for final_frame in self._run_pipeline_recursively(
109-
frame, processors[1:]
117+
frame, processors[1:], depth + 1
110118
):
111119
yield final_frame
112120
else:
113121
yield initial_frame
122+
123+
def _log_frame(self, frame: Frame, depth: int):
124+
"""Log a frame as it moves through the pipeline. This is useful for debugging.
125+
Note that this function inherits the logging level from the "dailyai" logger.
126+
If you want debug output from dailyai in general but not this function (it is
127+
noisy) you can silence this function by doing something like this:
128+
129+
# enable debug logging for the dailyai package.
130+
logger = logging.getLogger("dailyai")
131+
logger.setLevel(logging.DEBUG)
132+
133+
# silence the pipeline logging
134+
logger = logging.getLogger("dailyai.pipeline")
135+
logger.setLevel(logging.WARNING)
136+
"""
137+
source = str(self._processors[depth - 2]) if depth > 1 else "source"
138+
dest = str(self._processors[depth - 1]) if depth < (len(self._processors) + 1) else "sink"
139+
prefix = self._name + " " * depth
140+
logline = prefix + " -> ".join([source, frame.__class__.__name__, dest])
141+
if logline == self._last_log_line:
142+
if self._shown_repeated_log:
143+
return
144+
self._shown_repeated_log = True
145+
self._logger.debug(prefix + "... repeated")
146+
else:
147+
self._shown_repeated_log = False
148+
self._last_log_line = logline
149+
self._logger.debug(logline)

tests/test_pipeline.py

+52
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
import asyncio
22
import unittest
3+
from unittest.mock import Mock
4+
35
from dailyai.pipeline.aggregators import SentenceAggregator, StatelessTextTransformer
6+
from dailyai.pipeline.frame_processor import FrameProcessor
47
from dailyai.pipeline.frames import EndFrame, TextFrame
58

69
from dailyai.pipeline.pipeline import Pipeline
@@ -57,3 +60,52 @@ async def test_pipeline_multiple_stages(self):
5760
TextFrame(" "),
5861
)
5962
self.assertIsInstance(await outgoing_queue.get(), EndFrame)
63+
64+
65+
class TestLogFrame(unittest.TestCase):
66+
class MockProcessor(FrameProcessor):
67+
def __init__(self, name):
68+
self.name = name
69+
70+
def __str__(self):
71+
return self.name
72+
73+
def setUp(self):
74+
self.processor1 = self.MockProcessor('processor1')
75+
self.processor2 = self.MockProcessor('processor2')
76+
self.pipeline = Pipeline(
77+
processors=[self.processor1, self.processor2])
78+
self.pipeline._name = 'MyClass'
79+
self.pipeline._logger = Mock()
80+
81+
def test_log_frame_from_source(self):
82+
frame = Mock(__class__=Mock(__name__='MyFrame'))
83+
self.pipeline._log_frame(frame, depth=1)
84+
self.pipeline._logger.debug.assert_called_once_with(
85+
'MyClass source -> MyFrame -> processor1')
86+
87+
def test_log_frame_to_sink(self):
88+
frame = Mock(__class__=Mock(__name__='MyFrame'))
89+
self.pipeline._log_frame(frame, depth=3)
90+
self.pipeline._logger.debug.assert_called_once_with(
91+
'MyClass processor2 -> MyFrame -> sink')
92+
93+
def test_log_frame_repeated_log(self):
94+
frame = Mock(__class__=Mock(__name__='MyFrame'))
95+
self.pipeline._log_frame(frame, depth=2)
96+
self.pipeline._logger.debug.assert_called_once_with(
97+
'MyClass processor1 -> MyFrame -> processor2')
98+
self.pipeline._log_frame(frame, depth=2)
99+
self.pipeline._logger.debug.assert_called_with('MyClass ... repeated')
100+
101+
def test_log_frame_reset_repeated_log(self):
102+
frame1 = Mock(__class__=Mock(__name__='MyFrame1'))
103+
frame2 = Mock(__class__=Mock(__name__='MyFrame2'))
104+
self.pipeline._log_frame(frame1, depth=2)
105+
self.pipeline._logger.debug.assert_called_once_with(
106+
'MyClass processor1 -> MyFrame1 -> processor2')
107+
self.pipeline._log_frame(frame1, depth=2)
108+
self.pipeline._logger.debug.assert_called_with('MyClass ... repeated')
109+
self.pipeline._log_frame(frame2, depth=2)
110+
self.pipeline._logger.debug.assert_called_with(
111+
'MyClass processor1 -> MyFrame2 -> processor2')

0 commit comments

Comments
 (0)