diff --git a/package-lock.json b/package-lock.json index 33b2a4b..649601e 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9684,6 +9684,7 @@ "os": [ "android" ], + "peer": true, "engines": { "node": ">= 12.0.0" }, @@ -9705,6 +9706,7 @@ "os": [ "darwin" ], + "peer": true, "engines": { "node": ">= 12.0.0" }, @@ -9726,6 +9728,7 @@ "os": [ "darwin" ], + "peer": true, "engines": { "node": ">= 12.0.0" }, diff --git a/packages/client/src/components/chat/AsChat/bubble.tsx b/packages/client/src/components/chat/AsChat/bubble.tsx index c1ee50a..c084e5d 100644 --- a/packages/client/src/components/chat/AsChat/bubble.tsx +++ b/packages/client/src/components/chat/AsChat/bubble.tsx @@ -3,15 +3,32 @@ import { ContentType, Reply, TextBlock } from '@shared/types'; import BubbleBlock, { CollapsibleBlockDiv, } from '@/components/chat/bubbles/BubbleBlock'; +import SpeechBar from '@/components/chat/bubbles/SpeechBar'; import { CircleAlertIcon } from 'lucide-react'; import { useTranslation } from 'react-i18next'; +import { ReplySpeechState } from '@/context/RunRoomContext'; interface Props { + /** The reply data to display */ reply: Reply; + /** Avatar component to display */ avatar: ReactNode; + /** Whether to render content as markdown */ markdown: boolean; + /** Callback when bubble is clicked */ onClick: (reply: Reply) => void; + /** Whether to display user avatar on the right side */ userAvatarRight: boolean; + /** Speech state for this reply */ + speechState?: ReplySpeechState; + /** Callback to play speech audio */ + onPlaySpeech?: () => void; + /** Callback to pause speech audio */ + onPauseSpeech?: () => void; + /** Callback to change playback rate */ + onPlaybackRateChange?: (rate: number) => void; + /** Callback to change volume */ + onVolumeChange?: (volume: number) => void; } const AsBubble = ({ @@ -20,6 +37,11 @@ const AsBubble = ({ markdown, onClick, userAvatarRight = false, + speechState, + onPlaySpeech, + onPauseSpeech, + onPlaybackRateChange, + onVolumeChange, }: Props) => { const { t } = useTranslation(); @@ -45,6 +67,9 @@ const AsBubble = ({ )); }; + const hasAudio = (speechState?.fullAudioData?.length || 0) > 0; + const showSpeechBar = speechState?.isStreaming || hasAudio; + return (
+ + {/* Speech bar - shown below the message content */} + {showSpeechBar && ( +
+ {})} + onPause={onPauseSpeech || (() => {})} + onPlaybackRateChange={ + onPlaybackRateChange || (() => {}) + } + onVolumeChange={onVolumeChange || (() => {})} + /> +
+ )}
diff --git a/packages/client/src/components/chat/AsChat/index.tsx b/packages/client/src/components/chat/AsChat/index.tsx index 96e79fe..8fa3a29 100644 --- a/packages/client/src/components/chat/AsChat/index.tsx +++ b/packages/client/src/components/chat/AsChat/index.tsx @@ -45,6 +45,8 @@ import Character3Icon from '@/assets/svgs/avatar/character/050-woman.svg?react'; import { Avatar } from '@/components/ui/avatar.tsx'; import { AsAvatar, AvatarSet } from '@/components/chat/AsChat/avatar.tsx'; +import { SpeechStatesRecord } from '@/context/RunRoomContext'; + interface Props { /** List of chat replies to display */ replies: Reply[]; @@ -82,6 +84,16 @@ interface Props { attachAccept: string[]; /** Whether to display user avatar on the right side */ userAvatarRight?: boolean; + /** Speech states for each reply (keyed by replyId) */ + speechStates?: SpeechStatesRecord; + /** Callback to play speech for a specific reply */ + playSpeech?: (replyId: string) => void; + /** Callback to stop/pause speech for a specific reply */ + stopSpeech?: (replyId: string) => void; + /** Callback to set playback rate for a specific reply */ + setPlaybackRate?: (replyId: string, rate: number) => void; + /** Callback to set volume for a specific reply */ + setVolume?: (replyId: string, volume: number) => void; } /** @@ -118,6 +130,11 @@ const AsChat = ({ attachMaxFileSize, onError, userAvatarRight = false, + speechStates, + playSpeech, + stopSpeech, + setPlaybackRate, + setVolume, }: Props) => { // TODO: use a context to manage these settings globally @@ -170,15 +187,20 @@ const AsChat = ({ localStorage.setItem('chat-random-seed', randomSeed.toString()); }, [randomSeed]); + // Extended Reply type that preserves original replyId for speech lookup + interface ExtendedReply extends Reply { + originalReplyId?: string; + } + // Organize replies based on user preference (by reply ID or flattened messages) - const organizedReplies = useMemo(() => { + const organizedReplies = useMemo((): ExtendedReply[] => { if (replies.length === 0) return []; if (byReplyId) { return replies; } - const flattedReplies: Reply[] = []; + const flattedReplies: ExtendedReply[] = []; replies.forEach((reply) => { reply.messages.forEach((msg) => { flattedReplies.push({ @@ -188,7 +210,9 @@ const AsChat = ({ createdAt: msg.timestamp, finishedAt: msg.timestamp, messages: [msg], - } as Reply); + // Preserve original replyId for speech state lookup + originalReplyId: reply.replyId, + } as ExtendedReply); }); }); return flattedReplies; @@ -289,23 +313,51 @@ const AsChat = ({ onScroll={handleScroll} className="flex flex-col gap-y-5 w-full h-full overflow-auto" > - {organizedReplies.map((reply) => ( - - } - key={reply.replyId} - reply={reply} - markdown={renderMarkdown} - onClick={onBubbleClick} - userAvatarRight={userAvatarRight} - /> - ))} + {organizedReplies.map((reply) => { + // Look up speechState using originalReplyId if available (for flattened mode) + const lookupId = reply.originalReplyId || reply.replyId; + const speechState = speechStates?.[lookupId]; + return ( + + } + key={reply.replyId} + reply={reply} + markdown={renderMarkdown} + onClick={onBubbleClick} + userAvatarRight={userAvatarRight} + speechState={speechState} + onPlaySpeech={ + playSpeech + ? () => playSpeech(lookupId) + : undefined + } + onPauseSpeech={ + stopSpeech + ? () => stopSpeech(lookupId) + : undefined + } + onPlaybackRateChange={ + setPlaybackRate + ? (rate: number) => + setPlaybackRate(lookupId, rate) + : undefined + } + onVolumeChange={ + setVolume + ? (volume: number) => + setVolume(lookupId, volume) + : undefined + } + /> + ); + })} + + e.stopPropagation()} + > + {playbackRateOptions.map((rate) => ( + { + e.stopPropagation(); + onPlaybackRateChange(rate); + }} + className={cn( + playbackRate === rate && + 'bg-primary-50', + )} + > + {rate}x{playbackRate === rate && ' ✓'} + + ))} + + + + {/* Volume control */} + + + + + e.stopPropagation()} + > + {volumeOptions.map((v) => ( + { + e.stopPropagation(); + onVolumeChange(v); + }} + className={cn( + Math.abs(volume - v) < 0.01 && + 'bg-primary-50', + )} + > + {v === 0 + ? 'Mute' + : `${Math.round(v * 100)}%`} + {Math.abs(volume - v) < 0.01 && ' ✓'} + + ))} + + + + {/* Play/Pause button */} + + + )} + + ); +}; + +export default memo(SpeechBar); diff --git a/packages/client/src/context/RunRoomContext.tsx b/packages/client/src/context/RunRoomContext.tsx index 0de21d7..e26e770 100644 --- a/packages/client/src/context/RunRoomContext.tsx +++ b/packages/client/src/context/RunRoomContext.tsx @@ -1,8 +1,10 @@ import { createContext, ReactNode, + useCallback, useContext, useEffect, + useRef, useState, } from 'react'; import { @@ -12,11 +14,15 @@ import { Reply, RunData, SocketEvents, + SpeechData, } from '../../../shared/src/types/trpc'; import { useSocket } from './SocketContext'; import { useParams } from 'react-router-dom'; -import { ContentBlocks } from '../../../shared/src/types/messageForm'; +import { + AudioBlock, + ContentBlocks, +} from '../../../shared/src/types/messageForm'; import { SpanData, TraceData, @@ -26,6 +32,31 @@ import { getTimeDifferenceNano } from '../../../shared/src/utils/timeUtils'; import { ProjectNotFoundPage } from '../pages/DefaultPage'; import { useMessageApi } from './MessageApiContext.tsx'; +/** + * Speech state for each reply. + * Tracks audio data, playback status, and user preferences. + */ +export interface ReplySpeechState { + /** Full accumulated audio data (base64 string) */ + fullAudioData: string; + /** Media type of the audio (e.g., "audio/pcm") */ + mediaType: string; + /** Whether audio is currently playing */ + isPlaying: boolean; + /** Whether still receiving streaming data */ + isStreaming: boolean; + /** Playback rate (0.25 to 4.0, default 1.0) */ + playbackRate: number; + /** Volume (0.0 to 1.0, default 1.0) */ + volume: number; +} + +/** + * Record of speech states keyed by reply ID. + * Uses Record instead of Map for better React change detection. + */ +export type SpeechStatesRecord = Record; + interface RunRoomContextType { replies: Reply[]; trace: TraceData | null; @@ -39,6 +70,16 @@ interface RunRoomContextType { blocksInput: ContentBlocks, structuredInput: Record | null, ) => void; + /** Speech states for each reply (keyed by replyId) */ + speechStates: SpeechStatesRecord; + /** Play audio for a specific reply */ + playSpeech: (replyId: string) => void; + /** Stop/pause audio for a specific reply */ + stopSpeech: (replyId: string) => void; + /** Set playback rate for a specific reply */ + setPlaybackRate: (replyId: string, rate: number) => void; + /** Set volume for a specific reply */ + setVolume: (replyId: string, volume: number) => void; } const RunRoomContext = createContext(null); @@ -96,6 +137,553 @@ export function RunRoomContextProvider({ children }: Props) { const [modelInvocationData, setModelInvocationData] = useState(null); + // Speech state management - use Record for better React change detection + const [speechStates, setSpeechStates] = useState({}); + const audioContextRef = useRef(null); + // Store current playing audio source for each reply (for streaming) + const currentSourceRef = useRef< + Record + >({}); + // Store HTML Audio elements for replay (supports preservesPitch) + const audioElementRef = useRef>({}); + // Store gain nodes for volume control for each reply + const gainNodeRef = useRef>({}); + // Store WAV blob URLs for replay with Audio element + const wavBlobUrlRef = useRef>({}); + // Timer for detecting streaming end + const streamingEndTimeoutRef = useRef< + Record + >({}); + // Track already processed data length for incremental playback + const processedLengthRef = useRef>({}); + // Audio queue for each reply + const audioQueueRef = useRef>({}); + // Track if queue is being processed + const isProcessingQueueRef = useRef>({}); + // Store playback settings (rate and volume) as refs to avoid stale closure issues + const playbackSettingsRef = useRef< + Record + >({}); + + // Initialize AudioContext on first user interaction + const ensureAudioContext = useCallback(() => { + if (!audioContextRef.current) { + audioContextRef.current = new AudioContext({ sampleRate: 24000 }); + } + if (audioContextRef.current.state === 'suspended') { + audioContextRef.current.resume(); + } + return audioContextRef.current; + }, []); + + // Convert base64 PCM data to WAV blob URL (for HTML Audio element playback with preservesPitch) + const createWavBlobUrl = useCallback( + (base64Data: string): string | null => { + try { + // Decode base64 to binary + const binaryString = atob(base64Data); + const bytes = new Uint8Array(binaryString.length); + for (let i = 0; i < binaryString.length; i++) { + bytes[i] = binaryString.charCodeAt(i); + } + + // PCM parameters + const sampleRate = 24000; + const numChannels = 1; + const bitsPerSample = 16; + const byteRate = sampleRate * numChannels * (bitsPerSample / 8); + const blockAlign = numChannels * (bitsPerSample / 8); + const dataSize = bytes.length; + + // Create WAV header + const headerSize = 44; + const wavBuffer = new ArrayBuffer(headerSize + dataSize); + const view = new DataView(wavBuffer); + + // RIFF header + const writeString = (offset: number, str: string) => { + for (let i = 0; i < str.length; i++) { + view.setUint8(offset + i, str.charCodeAt(i)); + } + }; + + writeString(0, 'RIFF'); + view.setUint32(4, 36 + dataSize, true); + writeString(8, 'WAVE'); + + // fmt chunk + writeString(12, 'fmt '); + view.setUint32(16, 16, true); // fmt chunk size + view.setUint16(20, 1, true); // audio format (PCM) + view.setUint16(22, numChannels, true); + view.setUint32(24, sampleRate, true); + view.setUint32(28, byteRate, true); + view.setUint16(32, blockAlign, true); + view.setUint16(34, bitsPerSample, true); + + // data chunk + writeString(36, 'data'); + view.setUint32(40, dataSize, true); + + // Copy PCM data + const wavBytes = new Uint8Array(wavBuffer); + wavBytes.set(bytes, headerSize); + + // Create blob and URL + const blob = new Blob([wavBuffer], { type: 'audio/wav' }); + return URL.createObjectURL(blob); + } catch (error) { + console.error('Error creating WAV blob URL:', error); + return null; + } + }, + [], + ); + + // Decode base64 PCM to AudioBuffer + const decodeAudioData = useCallback( + (base64Data: string): AudioBuffer | null => { + try { + const audioContext = ensureAudioContext(); + + // Decode base64 to binary + const binaryString = atob(base64Data); + const bytes = new Uint8Array(binaryString.length); + for (let i = 0; i < binaryString.length; i++) { + bytes[i] = binaryString.charCodeAt(i); + } + + // Convert to 16-bit PCM samples + const samples = new Int16Array(bytes.buffer); + const floatSamples = new Float32Array(samples.length); + for (let i = 0; i < samples.length; i++) { + floatSamples[i] = samples[i] / 32768.0; + } + + // Create audio buffer + const audioBuffer = audioContext.createBuffer( + 1, + floatSamples.length, + 24000, + ); + audioBuffer.getChannelData(0).set(floatSamples); + + return audioBuffer; + } catch (error) { + console.error('Error decoding audio:', error); + return null; + } + }, + [ensureAudioContext], + ); + + // Get or create gain node for a reply + const getOrCreateGainNode = useCallback( + (replyId: string, audioContext: AudioContext): GainNode => { + if (!gainNodeRef.current[replyId]) { + const gainNode = audioContext.createGain(); + gainNode.connect(audioContext.destination); + gainNodeRef.current[replyId] = gainNode; + + // Set initial volume from settings ref + const settings = playbackSettingsRef.current[replyId]; + gainNode.gain.value = settings?.volume ?? 1.0; + } + return gainNodeRef.current[replyId]!; + }, + [], + ); + + // Play a single audio chunk and return a promise + const playAudioChunk = useCallback( + (base64Data: string, replyId: string): Promise => { + return new Promise((resolve) => { + try { + const audioBuffer = decodeAudioData(base64Data); + if (!audioBuffer) { + resolve(); + return; + } + + const audioContext = ensureAudioContext(); + const source = audioContext.createBufferSource(); + source.buffer = audioBuffer; + + // Get playback rate from settings ref (avoids stale closure) + const settings = playbackSettingsRef.current[replyId]; + const playbackRate = settings?.playbackRate ?? 1.0; + source.playbackRate.value = playbackRate; + + // Connect through gain node for volume control + const gainNode = getOrCreateGainNode(replyId, audioContext); + source.connect(gainNode); + + currentSourceRef.current[replyId] = source; + + source.onended = () => { + currentSourceRef.current[replyId] = null; + resolve(); + }; + + source.start(0); + } catch (error) { + console.error('Error playing audio chunk:', error); + resolve(); + } + }); + }, + [decodeAudioData, ensureAudioContext, getOrCreateGainNode], + ); + + // Process audio queue for a reply + const processAudioQueue = useCallback( + async (replyId: string) => { + if (isProcessingQueueRef.current[replyId]) return; + + const queue = audioQueueRef.current[replyId] || []; + if (queue.length === 0) return; + + isProcessingQueueRef.current[replyId] = true; + + setSpeechStates((prev) => { + const state = prev[replyId]; + if (state) { + return { + ...prev, + [replyId]: { ...state, isPlaying: true }, + }; + } + return prev; + }); + + while (queue.length > 0) { + const chunk = queue.shift()!; + await playAudioChunk(chunk, replyId); + } + + isProcessingQueueRef.current[replyId] = false; + + setSpeechStates((prev) => { + const state = prev[replyId]; + if (state) { + return { + ...prev, + [replyId]: { ...state, isPlaying: false }, + }; + } + return prev; + }); + }, + [playAudioChunk], + ); + + // Play full audio from beginning (for replay) using HTML Audio element + // This supports preservesPitch to keep the voice tone unchanged when changing speed + const playAudio = useCallback( + (replyId: string, base64Data?: string) => { + // Stop current playback if any + if (audioElementRef.current[replyId]) { + audioElementRef.current[replyId]!.pause(); + audioElementRef.current[replyId] = null; + } + if (currentSourceRef.current[replyId]) { + try { + currentSourceRef.current[replyId]!.stop(); + } catch { + // Ignore + } + currentSourceRef.current[replyId] = null; + } + + // Clear any pending queue + audioQueueRef.current[replyId] = []; + isProcessingQueueRef.current[replyId] = false; + + // Get or create WAV blob URL + if (!wavBlobUrlRef.current[replyId] && base64Data) { + wavBlobUrlRef.current[replyId] = createWavBlobUrl(base64Data); + } + + const wavUrl = wavBlobUrlRef.current[replyId]; + if (!wavUrl) return; + + // Get playback settings + const settings = playbackSettingsRef.current[replyId]; + const playbackRate = settings?.playbackRate ?? 1.0; + const volume = settings?.volume ?? 1.0; + + // Create HTML Audio element for playback (supports preservesPitch) + const audio = new Audio(wavUrl); + audio.playbackRate = playbackRate; + audio.volume = volume; + // @ts-expect-error - preservesPitch is not in TypeScript types but supported by browsers + audio.preservesPitch = true; + // @ts-expect-error - webkitPreservesPitch for older Safari + audio.webkitPreservesPitch = true; + + audioElementRef.current[replyId] = audio; + + audio.onended = () => { + audioElementRef.current[replyId] = null; + + setSpeechStates((prev) => { + const state = prev[replyId]; + if (state) { + return { + ...prev, + [replyId]: { ...state, isPlaying: false }, + }; + } + return prev; + }); + }; + + audio.onerror = (e) => { + console.error('Audio playback error:', e); + audioElementRef.current[replyId] = null; + + setSpeechStates((prev) => { + const state = prev[replyId]; + if (state) { + return { + ...prev, + [replyId]: { ...state, isPlaying: false }, + }; + } + return prev; + }); + }; + + audio.play().catch(console.error); + + setSpeechStates((prev) => { + const state = prev[replyId]; + if (state) { + return { + ...prev, + [replyId]: { ...state, isPlaying: true }, + }; + } + return prev; + }); + }, + [createWavBlobUrl], + ); + + // Handle incoming speech data (streaming) + const handleSpeechData = useCallback( + (speechData: SpeechData) => { + const { replyId, speech } = speechData; + + // Normalize to array + const speechBlocks: AudioBlock[] = Array.isArray(speech) + ? speech + : [speech]; + + for (const block of speechBlocks) { + if (block.source.type !== 'base64') continue; + + const fullData = block.source.data; + const mediaType = block.source.media_type; + + // Calculate incremental data (new data since last time) + const alreadyProcessed = + processedLengthRef.current[replyId] || 0; + const deltaData = fullData.slice(alreadyProcessed); + + if (deltaData.length > 0) { + // Update processed length + processedLengthRef.current[replyId] = fullData.length; + + // Add incremental data to queue + if (!audioQueueRef.current[replyId]) { + audioQueueRef.current[replyId] = []; + } + audioQueueRef.current[replyId].push(deltaData); + + // Start processing queue + processAudioQueue(replyId); + } + + // Initialize playback settings ref if not exists + if (!playbackSettingsRef.current[replyId]) { + playbackSettingsRef.current[replyId] = { + playbackRate: 1.0, + volume: 1.0, + }; + } + + // Update state + setSpeechStates((prev) => { + const currentState = prev[replyId]; + const settings = playbackSettingsRef.current[replyId]; + + return { + ...prev, + [replyId]: { + fullAudioData: fullData, + mediaType: mediaType, + isPlaying: currentState?.isPlaying || false, + isStreaming: true, + playbackRate: + currentState?.playbackRate ?? + settings?.playbackRate ?? + 1.0, + volume: + currentState?.volume ?? settings?.volume ?? 1.0, + }, + }; + }); + + // Reset streaming end timeout - if no new data for 1.5 seconds, mark streaming as complete + if (streamingEndTimeoutRef.current[replyId]) { + clearTimeout(streamingEndTimeoutRef.current[replyId]!); + } + streamingEndTimeoutRef.current[replyId] = setTimeout(() => { + setSpeechStates((prev) => { + const state = prev[replyId]; + if (state) { + return { + ...prev, + [replyId]: { ...state, isStreaming: false }, + }; + } + return prev; + }); + streamingEndTimeoutRef.current[replyId] = null; + }, 1500); + } + }, + [processAudioQueue], + ); + + // Play audio for a reply + const playSpeech = useCallback( + (replyId: string) => { + const state = speechStates[replyId]; + if (!state || state.fullAudioData.length === 0) return; + if (state.isPlaying) return; + + // Create WAV blob URL if not cached + if (!wavBlobUrlRef.current[replyId]) { + wavBlobUrlRef.current[replyId] = createWavBlobUrl( + state.fullAudioData, + ); + } + + playAudio(replyId, state.fullAudioData); + }, + [speechStates, playAudio, createWavBlobUrl], + ); + + // Stop playing audio for a reply + const stopSpeech = useCallback((replyId: string) => { + // Stop HTML Audio element if playing + const audioElement = audioElementRef.current[replyId]; + if (audioElement) { + audioElement.pause(); + audioElementRef.current[replyId] = null; + } + + // Stop the currently playing audio source (for streaming) + const currentSource = currentSourceRef.current[replyId]; + if (currentSource) { + try { + currentSource.stop(); + } catch { + // Ignore errors if already stopped + } + currentSourceRef.current[replyId] = null; + } + + // Update state + setSpeechStates((prev) => { + const state = prev[replyId]; + if (state) { + return { ...prev, [replyId]: { ...state, isPlaying: false } }; + } + return prev; + }); + }, []); + + // Set playback rate for a reply + const setPlaybackRate = useCallback((replyId: string, rate: number) => { + // Clamp rate between 0.25 and 4.0 + const clampedRate = Math.max(0.25, Math.min(4.0, rate)); + + // Update settings ref (for immediate access without stale closure) + if (!playbackSettingsRef.current[replyId]) { + playbackSettingsRef.current[replyId] = { + playbackRate: 1.0, + volume: 1.0, + }; + } + playbackSettingsRef.current[replyId].playbackRate = clampedRate; + + // Update state (for UI re-render) + setSpeechStates((prev) => { + const state = prev[replyId]; + if (state) { + return { + ...prev, + [replyId]: { ...state, playbackRate: clampedRate }, + }; + } + return prev; + }); + + // Update HTML Audio element if playing + const audioElement = audioElementRef.current[replyId]; + if (audioElement) { + audioElement.playbackRate = clampedRate; + } + + // Update current source if playing (for streaming) + const currentSource = currentSourceRef.current[replyId]; + if (currentSource) { + currentSource.playbackRate.value = clampedRate; + } + }, []); + + // Set volume for a reply + const setVolume = useCallback((replyId: string, volume: number) => { + // Clamp volume between 0.0 and 1.0 + const clampedVolume = Math.max(0.0, Math.min(1.0, volume)); + + // Update settings ref (for immediate access without stale closure) + if (!playbackSettingsRef.current[replyId]) { + playbackSettingsRef.current[replyId] = { + playbackRate: 1.0, + volume: 1.0, + }; + } + playbackSettingsRef.current[replyId].volume = clampedVolume; + + // Update state (for UI re-render) + setSpeechStates((prev) => { + const state = prev[replyId]; + if (state) { + return { + ...prev, + [replyId]: { ...state, volume: clampedVolume }, + }; + } + return prev; + }); + + // Update HTML Audio element if playing + const audioElement = audioElementRef.current[replyId]; + if (audioElement) { + audioElement.volume = clampedVolume; + } + + // Update gain node if exists (for streaming) + const gainNode = gainNodeRef.current[replyId]; + if (gainNode) { + gainNode.gain.value = clampedVolume; + } + }, []); + useEffect(() => { if (spans.length > 0) { const traceData = calculateTraceData(spans); @@ -154,6 +742,52 @@ export function RunRoomContextProvider({ children }: Props) { }); return updatedReplies; }); + + // Restore speech states from messages (don't decode yet to avoid AudioContext warning) + newReplies.forEach((reply) => { + reply.messages.forEach((msg) => { + if (msg.speech && msg.speech.length > 0) { + const speechBlocks = msg.speech; + const firstBlock = + speechBlocks[speechBlocks.length - 1]; // Use latest + if (firstBlock?.source?.type === 'base64') { + const fullData = firstBlock.source.data; + const mediaType = firstBlock.source.media_type; + + // Initialize playback settings ref if not exists + if (!playbackSettingsRef.current[reply.replyId]) { + playbackSettingsRef.current[reply.replyId] = { + playbackRate: 1.0, + volume: 1.0, + }; + } + + // Only save the data, decode later when user clicks play + setSpeechStates((prev) => { + const settings = + playbackSettingsRef.current[reply.replyId]; + return { + ...prev, + [reply.replyId]: { + fullAudioData: fullData, + mediaType: mediaType, + isPlaying: false, + isStreaming: false, + playbackRate: + prev[reply.replyId]?.playbackRate ?? + settings?.playbackRate ?? + 1.0, + volume: + prev[reply.replyId]?.volume ?? + settings?.volume ?? + 1.0, + }, + }; + }); + } + } + }); + }); }); socket.on(SocketEvents.server.pushSpans, (newSpans: SpanData[]) => { @@ -206,6 +840,11 @@ export function RunRoomContextProvider({ children }: Props) { setInputRequests([]); }); + // Speech data for real-time audio playback + socket.on(SocketEvents.server.pushSpeech, (speechData: SpeechData) => { + handleSpeechData(speechData); + }); + return () => { if (socket) { // Clear the listeners and leave the room @@ -215,10 +854,11 @@ export function RunRoomContextProvider({ children }: Props) { socket.off(SocketEvents.server.pushRunData); socket.off(SocketEvents.server.clearInputRequests); socket.off(SocketEvents.server.pushModelInvocationData); + socket.off(SocketEvents.server.pushSpeech); socket.emit(SocketEvents.client.leaveRoom, roomName); } }; - }, [socket, runId, roomName]); + }, [socket, runId, roomName, handleSpeechData]); if (!runId) { return ; @@ -267,6 +907,11 @@ export function RunRoomContextProvider({ children }: Props) { runData, sendUserInputToServer, modelInvocationData, + speechStates, + playSpeech, + stopSpeech, + setPlaybackRate, + setVolume, }} > {children} diff --git a/packages/client/src/pages/DashboardPage/RunPage/index.tsx b/packages/client/src/pages/DashboardPage/RunPage/index.tsx index 3b12f0e..8f68235 100644 --- a/packages/client/src/pages/DashboardPage/RunPage/index.tsx +++ b/packages/client/src/pages/DashboardPage/RunPage/index.tsx @@ -18,7 +18,16 @@ import { useMessageApi } from '@/context/MessageApiContext.tsx'; const RunContentPage = () => { const [displayedReply, setDisplayedReply] = useState(null); const [activateTab, setActiveTab] = useState('statistics'); - const { replies, sendUserInputToServer, inputRequests } = useRunRoom(); + const { + replies, + sendUserInputToServer, + inputRequests, + speechStates, + playSpeech, + stopSpeech, + setPlaybackRate, + setVolume, + } = useRunRoom(); const [currentInputRequest, setCurrentInputRequest] = useState(null); const { t } = useTranslation(); @@ -134,6 +143,11 @@ const RunContentPage = () => { onError={async (error) => { messageApi.error(error); }} + speechStates={speechStates} + playSpeech={playSpeech} + stopSpeech={stopSpeech} + setPlaybackRate={setPlaybackRate} + setVolume={setVolume} /> { @@ -240,6 +246,14 @@ export const appRouter = t.router({ } as RegisterReplyParams); } + // Normalize speech to array if provided + let speechArray: AudioBlock[] | null = null; + if (input.speech) { + speechArray = Array.isArray(input.speech) + ? (input.speech as AudioBlock[]) + : [input.speech as AudioBlock]; + } + // Save the message to the database const msgFormData = { id: input.msg.id, @@ -252,6 +266,7 @@ export const appRouter = t.router({ metadata: input.msg.metadata, timestamp: input.msg.timestamp, }, + speech: speechArray, } as MessageForm; // Save the message @@ -280,6 +295,17 @@ export const appRouter = t.router({ console.error(error); throw error; }); + + // Broadcast speech data if provided (for real-time audio playback) + if (input.speech !== null && input.speech !== undefined) { + // Type assertion needed because Zod infers literal strings + // while AudioBlock uses SourceType enum + SocketManager.broadcastSpeechToRunRoom( + input.runId, + replyId, + input.speech as AudioBlock | AudioBlock[], + ); + } }), pushMessageToFridayApp: t.procedure diff --git a/packages/server/src/trpc/socket.ts b/packages/server/src/trpc/socket.ts index f1dd087..19f7eb3 100644 --- a/packages/server/src/trpc/socket.ts +++ b/packages/server/src/trpc/socket.ts @@ -1,7 +1,11 @@ import { spawn } from 'child_process'; import { Server as HttpServer } from 'http'; import { Server } from 'socket.io'; -import { ContentBlocks, Status } from '../../../shared/src/types/messageForm'; +import { + AudioBlock, + ContentBlocks, + Status, +} from '../../../shared/src/types/messageForm'; import { BackendResponse, FridayReply, @@ -9,6 +13,7 @@ import { OverviewData, Reply, RunData, + SpeechData, SocketEvents, SocketRoomName, } from '../../../shared/src/types/trpc'; @@ -580,6 +585,24 @@ export class SocketManager { .emit(SocketEvents.server.pushMessages, [reply] as Reply[]); } + /** + * Broadcast speech data to the run room for real-time audio playback + */ + static broadcastSpeechToRunRoom( + runId: string, + replyId: string, + speech: AudioBlock | AudioBlock[], + ) { + const speechData: SpeechData = { + replyId, + speech, + }; + this.io + .of('/client') + .to(`run-${runId}`) + .emit(SocketEvents.server.pushSpeech, speechData); + } + static broadcastSpanDataToRunRoom(spanDataArray: SpanData[]) { // Group spans by runId const groupedSpans: Record = {}; diff --git a/packages/shared/src/types/messageForm.ts b/packages/shared/src/types/messageForm.ts index 3190f72..5a8948e 100644 --- a/packages/shared/src/types/messageForm.ts +++ b/packages/shared/src/types/messageForm.ts @@ -95,4 +95,6 @@ export interface MessageForm { metadata: object; timestamp: string; }; + /** Speech audio data (optional, for TTS playback) */ + speech?: AudioBlock[] | null; } diff --git a/packages/shared/src/types/trpc.ts b/packages/shared/src/types/trpc.ts index 284fe0a..fd64088 100644 --- a/packages/shared/src/types/trpc.ts +++ b/packages/shared/src/types/trpc.ts @@ -1,5 +1,5 @@ import { z } from 'zod'; -import { ContentBlocks, ContentType, Status } from './messageForm'; +import { AudioBlock, ContentBlocks, ContentType, Status } from './messageForm'; import { Usage } from './usage'; export const RegisterReplyParamsSchema = z.object({ @@ -34,6 +34,7 @@ export const SocketEvents = { pushInputRequests: 'pushInputRequests', clearInputRequests: 'clearInputRequests', pushMessages: 'pushMessages', + pushSpeech: 'pushSpeech', pushSpans: 'pushSpans', pushModelInvocationData: 'pushModelInvocationData', // Friday app room @@ -119,6 +120,19 @@ export interface Message { content: ContentType; timestamp: string; metadata: object; + /** Speech audio data (optional, for TTS playback) */ + speech?: AudioBlock[] | null; +} + +/** + * Speech data for real-time audio playback. + * Sent via WebSocket when new audio data is available. + */ +export interface SpeechData { + /** The reply ID this speech data belongs to */ + replyId: string; + /** Audio block(s) containing the speech data */ + speech: AudioBlock | AudioBlock[]; } export interface FridayReply {