Skip to content

Commit

Permalink
Merge pull request #316 from pipecat-ai/aleix/metrics-improvements
Browse files Browse the repository at this point in the history
metrics improvements
  • Loading branch information
aconchillo authored Jul 23, 2024
2 parents 33f0865 + 08e0722 commit 0a6ddbf
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 9 deletions.
10 changes: 9 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unrelease]

### Added

- Added `send_initial_empty_metrics` flag to `PipelineParams` to request for
initial empty metrics (zero values). True by default.

### Fixed

- Fixed initial metrics format. It was using the wrong keys name/time instead of
processor/value.

- STT services should be using ISO 8601 time format for transcription frames.

- Fix an issue that would cause Daily transport to show a stop transcription
- Fixed an issue that would cause Daily transport to show a stop transcription
error when actually none occurred.

## [0.0.37] - 2024-07-22
Expand Down
2 changes: 1 addition & 1 deletion src/pipecat/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def processors_with_metrics(self):
services = []
for p in self._processors:
if isinstance(p, BasePipeline):
services += p.processors_with_metrics()
services.extend(p.processors_with_metrics())
elif p.can_generate_metrics():
services.append(p)
return services
Expand Down
9 changes: 6 additions & 3 deletions src/pipecat/pipeline/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
class PipelineParams(BaseModel):
allow_interruptions: bool = False
enable_metrics: bool = False
send_initial_empty_metrics: bool = True
report_only_initial_ttfb: bool = False


Expand Down Expand Up @@ -95,8 +96,8 @@ async def queue_frames(self, frames: Iterable[Frame] | AsyncIterable[Frame]):

def _initial_metrics_frame(self) -> MetricsFrame:
processors = self._pipeline.processors_with_metrics()
ttfb = [{"name": p.name, "time": 0.0} for p in processors]
processing = [{"name": p.name, "time": 0.0} for p in processors]
ttfb = [{"processor": p.name, "value": 0.0} for p in processors]
processing = [{"processor": p.name, "value": 0.0} for p in processors]
return MetricsFrame(ttfb=ttfb, processing=processing)

async def _process_down_queue(self):
Expand All @@ -106,7 +107,9 @@ async def _process_down_queue(self):
report_only_initial_ttfb=self._params.report_only_initial_ttfb
)
await self._source.process_frame(start_frame, FrameDirection.DOWNSTREAM)
await self._source.process_frame(self._initial_metrics_frame(), FrameDirection.DOWNSTREAM)

if self._params.send_initial_empty_metrics:
await self._source.process_frame(self._initial_metrics_frame(), FrameDirection.DOWNSTREAM)

running = True
should_cleanup = True
Expand Down
8 changes: 8 additions & 0 deletions src/pipecat/processors/frameworks/rtvi.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
LLMMessagesAppendFrame,
LLMMessagesUpdateFrame,
LLMModelUpdateFrame,
MetricsFrame,
StartFrame,
SystemFrame,
TTSSpeakFrame,
Expand Down Expand Up @@ -456,6 +457,13 @@ async def _handle_setup(self, setup: RTVISetup | None):
start_frame = dataclasses.replace(self._start_frame)
await self.push_frame(start_frame)

# Send new initial metrics with the new processors
processors = parent.processors_with_metrics()
processors.extend(self._pipeline.processors_with_metrics())
ttfb = [{"processor": p.name, "value": 0.0} for p in processors]
processing = [{"processor": p.name, "value": 0.0} for p in processors]
await self.push_frame(MetricsFrame(ttfb=ttfb, processing=processing))

message = RTVIBotReady()
frame = TransportMessageFrame(message=message.model_dump(exclude_none=True))
await self.push_frame(frame)
Expand Down
11 changes: 7 additions & 4 deletions src/pipecat/transports/services/daily.py
Original file line number Diff line number Diff line change
Expand Up @@ -678,12 +678,15 @@ async def send_message(self, frame: TransportMessageFrame):
await self._client.send_message(frame)

async def send_metrics(self, frame: MetricsFrame):
metrics = {}
if frame.ttfb:
metrics["ttfb"] = frame.ttfb
if frame.processing:
metrics["processing"] = frame.processing

message = DailyTransportMessageFrame(message={
"type": "pipecat-metrics",
"metrics": {
"ttfb": frame.ttfb or [],
"processing": frame.processing or [],
},
"metrics": metrics
})
await self._client.send_message(message)

Expand Down

0 comments on commit 0a6ddbf

Please sign in to comment.