-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
feat: astrbot live chat mode #4534
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey - 我发现了 9 个问题,并给出了一些高层次的反馈:
- 在
StandaloneChat.vue中,ChatInput上的@openLiveMode=""绑定实际上是无效的(no-op),可能会让人困惑;如果独立模式(standalone)不支持 Live Mode,要么把它连接到实际的处理函数,要么直接移除这个事件监听。 LiveMode.vue里的 WebSocket URL 被硬编码为localhost:6185,在非本地环境中会失效;建议从window.location或配置/env 值中推导基础 URL,而不是在代码中写死 host/port。- 在
useVADRecording.ts中,axios的导入未被使用,可以移除以避免死代码,让 composable 更精简。
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `StandaloneChat.vue` the `@openLiveMode=""` binding on `ChatInput` is effectively a no-op and may be confusing; either wire it to a handler or remove the event listener if live mode isn’t supported in standalone.
- The WebSocket URL in `LiveMode.vue` is hard-coded to `localhost:6185`, which will break in non-local environments; consider deriving the base URL from `window.location` or a config/env value instead of embedding the host/port.
- In `useVADRecording.ts` the `axios` import is unused and can be removed to avoid dead code and keep the composable lean.
## Individual Comments
### Comment 1
<location> `astrbot/core/platform/sources/webchat/webchat_event.py:132` </location>
<code_context>
message_id = self.message_obj.message_id
async for chain in generator:
+ # 处理音频流(Live Mode)
+ if chain.type == "audio_chunk":
+ # 音频流数据,直接发送
+ audio_b64 = chain.get_plain_text()
</code_context>
<issue_to_address>
**issue (bug_risk):** Guard against `chain` being `None` before accessing `chain.type` in streaming loop.
This loop can yield `None` (see other usages of `run_agent`/`run_live_agent`), but this branch assumes `chain` is always non-null and accesses `chain.type`. That will raise when `chain` is `None`. Please add a guard (e.g. `if chain is None: continue` at the top of the loop, or `if chain is not None and chain.type == "audio_chunk":`) to prevent a runtime error.
</issue_to_address>
### Comment 2
<location> `astrbot/dashboard/routes/live_chat.py:208` </location>
<code_context>
+
+ # 1. STT - 语音转文字
+ ctx = self.plugin_manager.context
+ stt_provider = ctx.provider_manager.stt_provider_insts[0]
+
+ if not stt_provider:
</code_context>
<issue_to_address>
**issue:** Accessing `stt_provider_insts[0]` without checking emptiness may raise when no STT provider is configured.
If `stt_provider_insts` is empty, this line will raise `IndexError` before the `if not stt_provider` check, causing an unhandled exception and closing the websocket. Consider first checking for an empty list (e.g. `if not ctx.provider_manager.stt_provider_insts:`) and returning the "未配置" error in that case.
</issue_to_address>
### Comment 3
<location> `astrbot/core/provider/provider.py:274-275` </location>
<code_context>
+ with open(audio_path, "rb") as f:
+ audio_data = f.read()
+ await audio_queue.put(audio_data)
+ except Exception:
+ # 出错时也要发送 None 结束标记
+ pass
+ # 发送结束标记
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Swallowing all exceptions in `get_audio_stream` hides TTS failures and makes debugging difficult.
Catching bare `Exception` and doing nothing here will hide real TTS or IO errors and make production issues very hard to trace. Please at least log the exception (with stack trace) before sending the `None` sentinel, or re-raise after logging so failures are visible to callers/monitoring.
Suggested implementation:
```python
except Exception:
# 记录异常以便排查 TTS 或 IO 问题,随后仍然发送结束标记
logger.exception(
"Error while generating or reading TTS audio for accumulated_text=%r",
accumulated_text,
)
# 发送结束标记
await audio_queue.put(None)
break
```
To make this compile and follow your existing logging conventions, you will also need to:
1. Ensure a module-level logger is defined in `astrbot/core/provider/provider.py`, e.g.:
```python
import logging
logger = logging.getLogger(__name__)
```
2. If you already have a different logging pattern in this file (e.g. `from astrbot.logger import logger` or similar), adjust the `logger.exception(...)` call to use the existing logger instance and style.
</issue_to_address>
### Comment 4
<location> `dashboard/src/components/chat/StandaloneChat.vue:39` </location>
<code_context>
@stopRecording="handleStopRecording"
@pasteImage="handlePaste"
@fileSelect="handleFileSelect"
+ @openLiveMode=""
ref="chatInputRef"
/>
</code_context>
<issue_to_address>
**issue (bug_risk):** Empty `@openLiveMode` handler is likely unintended and will emit warnings.
`@openLiveMode=""` creates an empty event handler and will trigger a Vue warning. If Live Mode isn’t supported here, remove the binding; if it is, connect it to a real handler or emit so the event is properly processed.
</issue_to_address>
### Comment 5
<location> `dashboard/src/components/chat/LiveMode.vue:230` </location>
<code_context>
+ }
+
+ const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
+ const wsUrl = `${protocol}//localhost:6185/api/live_chat/ws?token=${encodeURIComponent(token)}`;
+
+ ws = new WebSocket(wsUrl);
</code_context>
<issue_to_address>
**issue (bug_risk):** Hardcoding `localhost:6185` in the WebSocket URL breaks non-local deployments.
This hardcoded `localhost:6185` will break Live Mode when the dashboard is accessed from a different host/port or via a reverse proxy. Please derive the WebSocket host/port from `window.location` (e.g. `window.location.host`) or a configurable base URL so it works across environments.
</issue_to_address>
### Comment 6
<location> `dashboard/src/components/chat/LiveMode.vue:348-337` </location>
<code_context>
+ }
+}
+
+function stopAudioPlayback() {
+ // TODO: 实现停止当前播放的音频
+ isPlaying.value = false;
+}
+
</code_context>
<issue_to_address>
**issue (bug_risk):** `stopAudioPlayback` does not actually stop any playing audio sources.
The websocket protocol defines a `stop_play` message, but this only flips `isPlaying` to false and leaves the TODO, so any `AudioBufferSourceNode` started in `playAudioChunk` will keep playing. Track active sources (e.g., store them in an array when created) and call `stop()` on them here so playback can actually be interrupted.
</issue_to_address>
### Comment 7
<location> `astrbot/core/astr_agent_run_util.py:137` </location>
<code_context>
return
+
+
+async def run_live_agent(
+ agent_runner: AgentRunner,
+ tts_provider,
</code_context>
<issue_to_address>
**issue (complexity):** Consider refactoring `run_live_agent` and its helpers to stream directly from `run_agent` and delegate TTS details to the provider to reduce buffering and branching logic.
You can simplify `run_live_agent` and remove some state/control flow without losing functionality.
### 1. Remove `llm_stream_chunks` and stream directly from `run_agent`
Right now, `run_live_agent` fully buffers all `MessageChain` instances into `llm_stream_chunks` and then post-processes them. That’s at odds with the “live” behavior and adds complexity (`llm_stream_chunks`, `_process_stream_tts`, `_feed_text_to_tts`, `_process_full_tts`).
You can instead drive TTS directly from the `run_agent` async iterator:
```python
async def run_live_agent(
agent_runner: AgentRunner,
tts_provider,
max_step: int = 30,
show_tool_use: bool = True,
show_reasoning: bool = False,
) -> AsyncGenerator[MessageChain | None, None]:
support_stream = tts_provider.support_stream() if tts_provider else False
# No TTS: just stream the agent output
if not tts_provider:
async for chain in run_agent(
agent_runner,
max_step=max_step,
show_tool_use=show_tool_use,
stream_to_general=False,
show_reasoning=show_reasoning,
):
if chain is not None:
yield chain
return
if support_stream:
async for chain in _run_with_stream_tts(
agent_runner,
tts_provider,
max_step=max_step,
show_tool_use=show_tool_use,
show_reasoning=show_reasoning,
):
yield chain
else:
async for chain in _run_with_full_tts(
agent_runner,
tts_provider,
max_step=max_step,
show_tool_use=show_tool_use,
show_reasoning=show_reasoning,
):
yield chain
```
### 2. Collapse `_process_stream_tts` + `_feed_text_to_tts` into a single streaming pipeline
Instead of buffering into `chunks: list[MessageChain]` and then feeding a queue, you can push text into `text_queue` as each `MessageChain` arrives from `run_agent`, leveraging the provider’s `get_audio_stream` queues directly:
```python
async def _run_with_stream_tts(
agent_runner: AgentRunner,
tts_provider,
max_step: int,
show_tool_use: bool,
show_reasoning: bool,
) -> AsyncGenerator[MessageChain, None]:
import base64
text_queue: asyncio.Queue[str | None] = asyncio.Queue()
audio_queue: asyncio.Queue[bytes | None] = asyncio.Queue()
tts_task = asyncio.create_task(
tts_provider.get_audio_stream(text_queue, audio_queue)
)
chunk_size = 50
async def pump_agent_output():
try:
async for chain in run_agent(
agent_runner,
max_step=max_step,
show_tool_use=show_tool_use,
stream_to_general=False,
show_reasoning=show_reasoning,
):
if chain is None:
continue
text = chain.get_plain_text()
if not text:
continue
# simple chunking
while text:
chunk, text = text[:chunk_size], text[chunk_size:]
await text_queue.put(chunk)
finally:
await text_queue.put(None)
pump_task = asyncio.create_task(pump_agent_output())
try:
while True:
audio_data = await audio_queue.get()
if audio_data is None:
break
audio_b64 = base64.b64encode(audio_data).decode("utf-8")
yield MessageChain(chain=[Plain(audio_b64)], type="audio_chunk")
await pump_task
finally:
try:
await asyncio.wait_for(tts_task, timeout=5.0)
except asyncio.TimeoutError:
logger.warning("[Live TTS] TTS 任务超时,强制取消")
tts_task.cancel()
```
This lets you delete `_process_stream_tts` and `_feed_text_to_tts` entirely and removes the `chunks: list[MessageChain]` buffer.
### 3. Centralize full-TTS logic in the provider (or a shared helper)
`_process_full_tts` re-implements a “collect full text → call `get_audio` → read file → base64 encode” flow that is likely already mirrored in the provider’s non-streaming path.
You can:
1. Accumulate **only text**, not `MessageChain`:
2. Delegate the “text → audio bytes → base64” to a shared helper at provider level (reused by `get_audio_stream`’s non-streaming path and `run_live_agent`).
Example shape on the caller side:
```python
async def _run_with_full_tts(
agent_runner: AgentRunner,
tts_provider,
max_step: int,
show_tool_use: bool,
show_reasoning: bool,
) -> AsyncGenerator[MessageChain, None]:
import base64
full_text = []
async for chain in run_agent(
agent_runner,
max_step=max_step,
show_tool_use=show_tool_use,
stream_to_general=False,
show_reasoning=show_reasoning,
):
if chain is None:
continue
text = chain.get_plain_text()
if text:
full_text.append(text)
accumulated_text = "".join(full_text)
if not accumulated_text:
return
# Prefer a provider helper that returns bytes instead of a file path.
audio_bytes = await tts_provider.get_audio_bytes(accumulated_text)
audio_b64 = base64.b64encode(audio_bytes).decode("utf-8")
yield MessageChain(chain=[Plain(audio_b64)], type="audio_chunk")
```
On the provider side, you can keep `get_audio` for backward compatibility by making it a thin wrapper around `get_audio_bytes`, so the text→audio behavior is in one place.
This reduces duplication and keeps the “live” call site focused on orchestration, not low-level audio file handling.
</issue_to_address>
### Comment 8
<location> `astrbot/core/pipeline/process_stage/method/agent_sub_stages/internal.py:687` </location>
<code_context>
)
- if streaming_response and not stream_to_general:
+ # 检测 Live Mode
+ action_type = event.get_extra("action_type")
+ if action_type == "live":
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the Live Mode handling into a dedicated async helper to keep `process` focused on routing and reduce in-method branching complexity.
The Live Mode branch does increase the complexity of `process` in-place. You can keep the behavior identical while reducing branching by extracting the Live Mode handling into a helper.
One way is to move all Live Mode–specific logic (including the history save) into an async generator helper and delegate from `process`:
```python
# inside the class
async def _handle_live_mode(self, event, req, agent_runner):
logger.info("[Internal Agent] 检测到 Live Mode,启用 TTS 处理")
tts_provider = self.ctx.plugin_manager.context.get_using_tts_provider(
event.unified_msg_origin
)
if not tts_provider:
logger.warning("[Live Mode] TTS Provider 未配置,将使用普通流式模式")
event.set_result(
MessageEventResult()
.set_result_content_type(ResultContentType.STREAMING_RESULT)
.set_async_stream(
run_live_agent(
agent_runner,
tts_provider,
self.max_step,
self.show_tool_use,
show_reasoning=self.show_reasoning,
)
)
)
# mirror the original control flow: emit once, then post-process
yield
if not event.is_stopped() and agent_runner.done():
await self._save_to_history(
event,
req,
agent_runner.get_final_llm_resp(),
agent_runner.run_context.messages,
agent_runner.stats,
)
```
Then in `process`, the Live Mode branch becomes a simple dispatcher, keeping the core method focused on routing:
```python
action_type = event.get_extra("action_type")
if action_type == "live":
async for _ in self._handle_live_mode(event, req, agent_runner):
yield
elif streaming_response and not stream_to_general:
event.set_result(
MessageEventResult()
.set_result_content_type(ResultContentType.STREAMING_RESULT)
.set_async_stream(
run_agent(
agent_runner,
self.max_step,
self.show_tool_use,
show_reasoning=self.show_reasoning,
)
)
)
# existing yield / history handling remains unchanged
```
This keeps all functionality (including logging, TTS provider resolution, streaming behavior, and post-stream history saving) while reducing the cognitive load in `process` and isolating Live Mode–specific concerns.
</issue_to_address>
### Comment 9
<location> `astrbot/dashboard/routes/live_chat.py:19` </location>
<code_context>
+from .route import Route, RouteContext
+
+
+class LiveChatSession:
+ """Live Chat 会话管理器"""
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting audio decoding, WAV assembly, interrupt handling, and pipeline orchestration into small helpers to keep `LiveChatSession`, `_handle_message`, and `_process_audio` focused and easier to read.
You can keep the current behavior but reduce complexity by pulling a few responsibilities into small helpers, and by localizing state/interrupt handling.
### 1. Extract base64/audio handling from `_handle_message`
Right now `_handle_message` mixes protocol and binary decoding. A tiny helper keeps protocol handling readable:
```python
# new helper (same file or small utils module)
import base64
def decode_audio_chunk(message: dict) -> bytes | None:
audio_data_b64 = message.get("data")
if not audio_data_b64:
return None
try:
return base64.b64decode(audio_data_b64)
except Exception as e:
logger.error(f"[Live Chat] 解码音频数据失败: {e}")
return None
```
Then `_handle_message` becomes:
```python
elif msg_type == "speaking_part":
audio_data = decode_audio_chunk(message)
if audio_data is not None:
session.add_audio_frame(audio_data)
```
This avoids inline base64 and makes the message protocol switch easier to follow.
### 2. Move WAV assembly out of `LiveChatSession`
`LiveChatSession` currently manages both conversational state and audio file I/O. A small `AudioAssembler` keeps the session simple:
```python
# new helper
class AudioAssembler:
SAMPLE_RATE = 16000
SAMPLE_WIDTH = 2
CHANNELS = 1
@staticmethod
def assemble_wav(frames: list[bytes]) -> str:
temp_dir = os.path.join(get_astrbot_data_path(), "temp")
os.makedirs(temp_dir, exist_ok=True)
audio_path = os.path.join(temp_dir, f"live_audio_{uuid.uuid4()}.wav")
with wave.open(audio_path, "wb") as wav_file:
wav_file.setnchannels(AudioAssembler.CHANNELS)
wav_file.setsampwidth(AudioAssembler.SAMPLE_WIDTH)
wav_file.setframerate(AudioAssembler.SAMPLE_RATE)
for frame in frames:
wav_file.writeframes(frame)
return audio_path
```
Use in `LiveChatSession.end_speaking`:
```python
from .audio import AudioAssembler # wherever you place it
async def end_speaking(self, stamp: str) -> str | None:
...
if not self.audio_frames:
logger.warning("[Live Chat] 没有音频帧数据")
return None
try:
audio_path = AudioAssembler.assemble_wav(self.audio_frames)
self.temp_audio_path = audio_path
logger.info(
f"[Live Chat] 音频文件已保存: {audio_path}, 大小: {os.path.getsize(audio_path)} bytes"
)
return audio_path
...
```
This keeps `LiveChatSession` focused on session state instead of file format details.
### 3. Localize processing state and use an interrupt event
`is_processing` and `should_interrupt` are mostly used inside `_process_audio`. You can keep the external API as `session.request_interrupt()` while hiding internal flags:
```python
class LiveChatSession:
def __init__(...):
...
self._interrupt_event = asyncio.Event()
def request_interrupt(self):
self._interrupt_event.set()
def clear_interrupt(self):
self._interrupt_event.clear()
@property
def interrupt_event(self) -> asyncio.Event:
return self._interrupt_event
```
Use this in `LiveChatRoute`:
```python
elif msg_type == "interrupt":
session.request_interrupt()
logger.info(f"[Live Chat] 用户打断: {session.username}")
```
And in `_process_audio`, replace polling + `should_interrupt` flag:
```python
async def _process_audio(self, session: LiveChatSession, audio_path: str):
try:
session.is_processing = True
session.clear_interrupt()
...
back_queue = webchat_queue_mgr.get_or_create_back_queue(cid)
bot_text = ""
audio_playing = False
while True:
# wait either for new back_queue data or interrupt
done, _ = await asyncio.wait(
{
asyncio.create_task(back_queue.get()),
asyncio.create_task(session.interrupt_event.wait()),
},
return_when=asyncio.FIRST_COMPLETED,
)
if session.interrupt_event.is_set():
logger.info("[Live Chat] 检测到用户打断")
await websocket.send_json({"t": "stop_play"})
await self._save_interrupted_message(session, user_text, bot_text)
while not back_queue.empty():
try:
back_queue.get_nowait()
except asyncio.QueueEmpty:
break
break
result_task = next(iter(done))
result = result_task.result()
if not result:
continue
...
finally:
session.is_processing = False
session.clear_interrupt()
```
This removes the timeout-based polling logic and consolidates interrupt behavior into a single `Event`, while preserving the same semantics.
### 4. Thin `_process_audio` by extracting the pipeline orchestration
You can keep WebSocket concerns in `_process_audio` and move “STT + queue + back_queue” into a separate helper/service:
```python
# new helper class
class LiveChatPipeline:
def __init__(self, plugin_manager, queue_mgr):
self.plugin_manager = plugin_manager
self.queue_mgr = queue_mgr
async def run(self, session: LiveChatSession, audio_path: str) -> tuple[str, asyncio.Queue, str]:
ctx = self.plugin_manager.context
stt_provider = ctx.provider_manager.stt_provider_insts[0]
if not stt_provider:
raise RuntimeError("STT Provider 未配置")
user_text = await stt_provider.get_text(audio_path)
if not user_text:
raise RuntimeError("STT 识别结果为空")
cid = session.conversation_id
queue = self.queue_mgr.get_or_create_queue(cid)
back_queue = self.queue_mgr.get_or_create_back_queue(cid)
message_id = str(uuid.uuid4())
payload = {
"message_id": message_id,
"message": [{"type": "plain", "text": user_text}],
"action_type": "live",
}
await queue.put((session.username, cid, payload))
return user_text, back_queue, message_id
```
Then `_process_audio` is mostly wiring + WebSocket messaging:
```python
async def _process_audio(self, session: LiveChatSession, audio_path: str):
try:
session.is_processing = True
session.clear_interrupt()
pipeline = LiveChatPipeline(self.plugin_manager, webchat_queue_mgr)
user_text, back_queue, message_id = await pipeline.run(session, audio_path)
await websocket.send_json({
"t": "user_msg",
"data": {"text": user_text, "ts": int(time.time() * 1000)},
})
# existing loop over back_queue, but now without STT/queue setup clutter
...
...
```
This keeps the rich behavior intact while making each component smaller and easier to evolve independently.
</issue_to_address>Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Original comment in English
Hey - I've found 9 issues, and left some high level feedback:
- In
StandaloneChat.vuethe@openLiveMode=""binding onChatInputis effectively a no-op and may be confusing; either wire it to a handler or remove the event listener if live mode isn’t supported in standalone. - The WebSocket URL in
LiveMode.vueis hard-coded tolocalhost:6185, which will break in non-local environments; consider deriving the base URL fromwindow.locationor a config/env value instead of embedding the host/port. - In
useVADRecording.tstheaxiosimport is unused and can be removed to avoid dead code and keep the composable lean.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `StandaloneChat.vue` the `@openLiveMode=""` binding on `ChatInput` is effectively a no-op and may be confusing; either wire it to a handler or remove the event listener if live mode isn’t supported in standalone.
- The WebSocket URL in `LiveMode.vue` is hard-coded to `localhost:6185`, which will break in non-local environments; consider deriving the base URL from `window.location` or a config/env value instead of embedding the host/port.
- In `useVADRecording.ts` the `axios` import is unused and can be removed to avoid dead code and keep the composable lean.
## Individual Comments
### Comment 1
<location> `astrbot/core/platform/sources/webchat/webchat_event.py:132` </location>
<code_context>
message_id = self.message_obj.message_id
async for chain in generator:
+ # 处理音频流(Live Mode)
+ if chain.type == "audio_chunk":
+ # 音频流数据,直接发送
+ audio_b64 = chain.get_plain_text()
</code_context>
<issue_to_address>
**issue (bug_risk):** Guard against `chain` being `None` before accessing `chain.type` in streaming loop.
This loop can yield `None` (see other usages of `run_agent`/`run_live_agent`), but this branch assumes `chain` is always non-null and accesses `chain.type`. That will raise when `chain` is `None`. Please add a guard (e.g. `if chain is None: continue` at the top of the loop, or `if chain is not None and chain.type == "audio_chunk":`) to prevent a runtime error.
</issue_to_address>
### Comment 2
<location> `astrbot/dashboard/routes/live_chat.py:208` </location>
<code_context>
+
+ # 1. STT - 语音转文字
+ ctx = self.plugin_manager.context
+ stt_provider = ctx.provider_manager.stt_provider_insts[0]
+
+ if not stt_provider:
</code_context>
<issue_to_address>
**issue:** Accessing `stt_provider_insts[0]` without checking emptiness may raise when no STT provider is configured.
If `stt_provider_insts` is empty, this line will raise `IndexError` before the `if not stt_provider` check, causing an unhandled exception and closing the websocket. Consider first checking for an empty list (e.g. `if not ctx.provider_manager.stt_provider_insts:`) and returning the "未配置" error in that case.
</issue_to_address>
### Comment 3
<location> `astrbot/core/provider/provider.py:274-275` </location>
<code_context>
+ with open(audio_path, "rb") as f:
+ audio_data = f.read()
+ await audio_queue.put(audio_data)
+ except Exception:
+ # 出错时也要发送 None 结束标记
+ pass
+ # 发送结束标记
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Swallowing all exceptions in `get_audio_stream` hides TTS failures and makes debugging difficult.
Catching bare `Exception` and doing nothing here will hide real TTS or IO errors and make production issues very hard to trace. Please at least log the exception (with stack trace) before sending the `None` sentinel, or re-raise after logging so failures are visible to callers/monitoring.
Suggested implementation:
```python
except Exception:
# 记录异常以便排查 TTS 或 IO 问题,随后仍然发送结束标记
logger.exception(
"Error while generating or reading TTS audio for accumulated_text=%r",
accumulated_text,
)
# 发送结束标记
await audio_queue.put(None)
break
```
To make this compile and follow your existing logging conventions, you will also need to:
1. Ensure a module-level logger is defined in `astrbot/core/provider/provider.py`, e.g.:
```python
import logging
logger = logging.getLogger(__name__)
```
2. If you already have a different logging pattern in this file (e.g. `from astrbot.logger import logger` or similar), adjust the `logger.exception(...)` call to use the existing logger instance and style.
</issue_to_address>
### Comment 4
<location> `dashboard/src/components/chat/StandaloneChat.vue:39` </location>
<code_context>
@stopRecording="handleStopRecording"
@pasteImage="handlePaste"
@fileSelect="handleFileSelect"
+ @openLiveMode=""
ref="chatInputRef"
/>
</code_context>
<issue_to_address>
**issue (bug_risk):** Empty `@openLiveMode` handler is likely unintended and will emit warnings.
`@openLiveMode=""` creates an empty event handler and will trigger a Vue warning. If Live Mode isn’t supported here, remove the binding; if it is, connect it to a real handler or emit so the event is properly processed.
</issue_to_address>
### Comment 5
<location> `dashboard/src/components/chat/LiveMode.vue:230` </location>
<code_context>
+ }
+
+ const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
+ const wsUrl = `${protocol}//localhost:6185/api/live_chat/ws?token=${encodeURIComponent(token)}`;
+
+ ws = new WebSocket(wsUrl);
</code_context>
<issue_to_address>
**issue (bug_risk):** Hardcoding `localhost:6185` in the WebSocket URL breaks non-local deployments.
This hardcoded `localhost:6185` will break Live Mode when the dashboard is accessed from a different host/port or via a reverse proxy. Please derive the WebSocket host/port from `window.location` (e.g. `window.location.host`) or a configurable base URL so it works across environments.
</issue_to_address>
### Comment 6
<location> `dashboard/src/components/chat/LiveMode.vue:348-337` </location>
<code_context>
+ }
+}
+
+function stopAudioPlayback() {
+ // TODO: 实现停止当前播放的音频
+ isPlaying.value = false;
+}
+
</code_context>
<issue_to_address>
**issue (bug_risk):** `stopAudioPlayback` does not actually stop any playing audio sources.
The websocket protocol defines a `stop_play` message, but this only flips `isPlaying` to false and leaves the TODO, so any `AudioBufferSourceNode` started in `playAudioChunk` will keep playing. Track active sources (e.g., store them in an array when created) and call `stop()` on them here so playback can actually be interrupted.
</issue_to_address>
### Comment 7
<location> `astrbot/core/astr_agent_run_util.py:137` </location>
<code_context>
return
+
+
+async def run_live_agent(
+ agent_runner: AgentRunner,
+ tts_provider,
</code_context>
<issue_to_address>
**issue (complexity):** Consider refactoring `run_live_agent` and its helpers to stream directly from `run_agent` and delegate TTS details to the provider to reduce buffering and branching logic.
You can simplify `run_live_agent` and remove some state/control flow without losing functionality.
### 1. Remove `llm_stream_chunks` and stream directly from `run_agent`
Right now, `run_live_agent` fully buffers all `MessageChain` instances into `llm_stream_chunks` and then post-processes them. That’s at odds with the “live” behavior and adds complexity (`llm_stream_chunks`, `_process_stream_tts`, `_feed_text_to_tts`, `_process_full_tts`).
You can instead drive TTS directly from the `run_agent` async iterator:
```python
async def run_live_agent(
agent_runner: AgentRunner,
tts_provider,
max_step: int = 30,
show_tool_use: bool = True,
show_reasoning: bool = False,
) -> AsyncGenerator[MessageChain | None, None]:
support_stream = tts_provider.support_stream() if tts_provider else False
# No TTS: just stream the agent output
if not tts_provider:
async for chain in run_agent(
agent_runner,
max_step=max_step,
show_tool_use=show_tool_use,
stream_to_general=False,
show_reasoning=show_reasoning,
):
if chain is not None:
yield chain
return
if support_stream:
async for chain in _run_with_stream_tts(
agent_runner,
tts_provider,
max_step=max_step,
show_tool_use=show_tool_use,
show_reasoning=show_reasoning,
):
yield chain
else:
async for chain in _run_with_full_tts(
agent_runner,
tts_provider,
max_step=max_step,
show_tool_use=show_tool_use,
show_reasoning=show_reasoning,
):
yield chain
```
### 2. Collapse `_process_stream_tts` + `_feed_text_to_tts` into a single streaming pipeline
Instead of buffering into `chunks: list[MessageChain]` and then feeding a queue, you can push text into `text_queue` as each `MessageChain` arrives from `run_agent`, leveraging the provider’s `get_audio_stream` queues directly:
```python
async def _run_with_stream_tts(
agent_runner: AgentRunner,
tts_provider,
max_step: int,
show_tool_use: bool,
show_reasoning: bool,
) -> AsyncGenerator[MessageChain, None]:
import base64
text_queue: asyncio.Queue[str | None] = asyncio.Queue()
audio_queue: asyncio.Queue[bytes | None] = asyncio.Queue()
tts_task = asyncio.create_task(
tts_provider.get_audio_stream(text_queue, audio_queue)
)
chunk_size = 50
async def pump_agent_output():
try:
async for chain in run_agent(
agent_runner,
max_step=max_step,
show_tool_use=show_tool_use,
stream_to_general=False,
show_reasoning=show_reasoning,
):
if chain is None:
continue
text = chain.get_plain_text()
if not text:
continue
# simple chunking
while text:
chunk, text = text[:chunk_size], text[chunk_size:]
await text_queue.put(chunk)
finally:
await text_queue.put(None)
pump_task = asyncio.create_task(pump_agent_output())
try:
while True:
audio_data = await audio_queue.get()
if audio_data is None:
break
audio_b64 = base64.b64encode(audio_data).decode("utf-8")
yield MessageChain(chain=[Plain(audio_b64)], type="audio_chunk")
await pump_task
finally:
try:
await asyncio.wait_for(tts_task, timeout=5.0)
except asyncio.TimeoutError:
logger.warning("[Live TTS] TTS 任务超时,强制取消")
tts_task.cancel()
```
This lets you delete `_process_stream_tts` and `_feed_text_to_tts` entirely and removes the `chunks: list[MessageChain]` buffer.
### 3. Centralize full-TTS logic in the provider (or a shared helper)
`_process_full_tts` re-implements a “collect full text → call `get_audio` → read file → base64 encode” flow that is likely already mirrored in the provider’s non-streaming path.
You can:
1. Accumulate **only text**, not `MessageChain`:
2. Delegate the “text → audio bytes → base64” to a shared helper at provider level (reused by `get_audio_stream`’s non-streaming path and `run_live_agent`).
Example shape on the caller side:
```python
async def _run_with_full_tts(
agent_runner: AgentRunner,
tts_provider,
max_step: int,
show_tool_use: bool,
show_reasoning: bool,
) -> AsyncGenerator[MessageChain, None]:
import base64
full_text = []
async for chain in run_agent(
agent_runner,
max_step=max_step,
show_tool_use=show_tool_use,
stream_to_general=False,
show_reasoning=show_reasoning,
):
if chain is None:
continue
text = chain.get_plain_text()
if text:
full_text.append(text)
accumulated_text = "".join(full_text)
if not accumulated_text:
return
# Prefer a provider helper that returns bytes instead of a file path.
audio_bytes = await tts_provider.get_audio_bytes(accumulated_text)
audio_b64 = base64.b64encode(audio_bytes).decode("utf-8")
yield MessageChain(chain=[Plain(audio_b64)], type="audio_chunk")
```
On the provider side, you can keep `get_audio` for backward compatibility by making it a thin wrapper around `get_audio_bytes`, so the text→audio behavior is in one place.
This reduces duplication and keeps the “live” call site focused on orchestration, not low-level audio file handling.
</issue_to_address>
### Comment 8
<location> `astrbot/core/pipeline/process_stage/method/agent_sub_stages/internal.py:687` </location>
<code_context>
)
- if streaming_response and not stream_to_general:
+ # 检测 Live Mode
+ action_type = event.get_extra("action_type")
+ if action_type == "live":
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the Live Mode handling into a dedicated async helper to keep `process` focused on routing and reduce in-method branching complexity.
The Live Mode branch does increase the complexity of `process` in-place. You can keep the behavior identical while reducing branching by extracting the Live Mode handling into a helper.
One way is to move all Live Mode–specific logic (including the history save) into an async generator helper and delegate from `process`:
```python
# inside the class
async def _handle_live_mode(self, event, req, agent_runner):
logger.info("[Internal Agent] 检测到 Live Mode,启用 TTS 处理")
tts_provider = self.ctx.plugin_manager.context.get_using_tts_provider(
event.unified_msg_origin
)
if not tts_provider:
logger.warning("[Live Mode] TTS Provider 未配置,将使用普通流式模式")
event.set_result(
MessageEventResult()
.set_result_content_type(ResultContentType.STREAMING_RESULT)
.set_async_stream(
run_live_agent(
agent_runner,
tts_provider,
self.max_step,
self.show_tool_use,
show_reasoning=self.show_reasoning,
)
)
)
# mirror the original control flow: emit once, then post-process
yield
if not event.is_stopped() and agent_runner.done():
await self._save_to_history(
event,
req,
agent_runner.get_final_llm_resp(),
agent_runner.run_context.messages,
agent_runner.stats,
)
```
Then in `process`, the Live Mode branch becomes a simple dispatcher, keeping the core method focused on routing:
```python
action_type = event.get_extra("action_type")
if action_type == "live":
async for _ in self._handle_live_mode(event, req, agent_runner):
yield
elif streaming_response and not stream_to_general:
event.set_result(
MessageEventResult()
.set_result_content_type(ResultContentType.STREAMING_RESULT)
.set_async_stream(
run_agent(
agent_runner,
self.max_step,
self.show_tool_use,
show_reasoning=self.show_reasoning,
)
)
)
# existing yield / history handling remains unchanged
```
This keeps all functionality (including logging, TTS provider resolution, streaming behavior, and post-stream history saving) while reducing the cognitive load in `process` and isolating Live Mode–specific concerns.
</issue_to_address>
### Comment 9
<location> `astrbot/dashboard/routes/live_chat.py:19` </location>
<code_context>
+from .route import Route, RouteContext
+
+
+class LiveChatSession:
+ """Live Chat 会话管理器"""
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting audio decoding, WAV assembly, interrupt handling, and pipeline orchestration into small helpers to keep `LiveChatSession`, `_handle_message`, and `_process_audio` focused and easier to read.
You can keep the current behavior but reduce complexity by pulling a few responsibilities into small helpers, and by localizing state/interrupt handling.
### 1. Extract base64/audio handling from `_handle_message`
Right now `_handle_message` mixes protocol and binary decoding. A tiny helper keeps protocol handling readable:
```python
# new helper (same file or small utils module)
import base64
def decode_audio_chunk(message: dict) -> bytes | None:
audio_data_b64 = message.get("data")
if not audio_data_b64:
return None
try:
return base64.b64decode(audio_data_b64)
except Exception as e:
logger.error(f"[Live Chat] 解码音频数据失败: {e}")
return None
```
Then `_handle_message` becomes:
```python
elif msg_type == "speaking_part":
audio_data = decode_audio_chunk(message)
if audio_data is not None:
session.add_audio_frame(audio_data)
```
This avoids inline base64 and makes the message protocol switch easier to follow.
### 2. Move WAV assembly out of `LiveChatSession`
`LiveChatSession` currently manages both conversational state and audio file I/O. A small `AudioAssembler` keeps the session simple:
```python
# new helper
class AudioAssembler:
SAMPLE_RATE = 16000
SAMPLE_WIDTH = 2
CHANNELS = 1
@staticmethod
def assemble_wav(frames: list[bytes]) -> str:
temp_dir = os.path.join(get_astrbot_data_path(), "temp")
os.makedirs(temp_dir, exist_ok=True)
audio_path = os.path.join(temp_dir, f"live_audio_{uuid.uuid4()}.wav")
with wave.open(audio_path, "wb") as wav_file:
wav_file.setnchannels(AudioAssembler.CHANNELS)
wav_file.setsampwidth(AudioAssembler.SAMPLE_WIDTH)
wav_file.setframerate(AudioAssembler.SAMPLE_RATE)
for frame in frames:
wav_file.writeframes(frame)
return audio_path
```
Use in `LiveChatSession.end_speaking`:
```python
from .audio import AudioAssembler # wherever you place it
async def end_speaking(self, stamp: str) -> str | None:
...
if not self.audio_frames:
logger.warning("[Live Chat] 没有音频帧数据")
return None
try:
audio_path = AudioAssembler.assemble_wav(self.audio_frames)
self.temp_audio_path = audio_path
logger.info(
f"[Live Chat] 音频文件已保存: {audio_path}, 大小: {os.path.getsize(audio_path)} bytes"
)
return audio_path
...
```
This keeps `LiveChatSession` focused on session state instead of file format details.
### 3. Localize processing state and use an interrupt event
`is_processing` and `should_interrupt` are mostly used inside `_process_audio`. You can keep the external API as `session.request_interrupt()` while hiding internal flags:
```python
class LiveChatSession:
def __init__(...):
...
self._interrupt_event = asyncio.Event()
def request_interrupt(self):
self._interrupt_event.set()
def clear_interrupt(self):
self._interrupt_event.clear()
@property
def interrupt_event(self) -> asyncio.Event:
return self._interrupt_event
```
Use this in `LiveChatRoute`:
```python
elif msg_type == "interrupt":
session.request_interrupt()
logger.info(f"[Live Chat] 用户打断: {session.username}")
```
And in `_process_audio`, replace polling + `should_interrupt` flag:
```python
async def _process_audio(self, session: LiveChatSession, audio_path: str):
try:
session.is_processing = True
session.clear_interrupt()
...
back_queue = webchat_queue_mgr.get_or_create_back_queue(cid)
bot_text = ""
audio_playing = False
while True:
# wait either for new back_queue data or interrupt
done, _ = await asyncio.wait(
{
asyncio.create_task(back_queue.get()),
asyncio.create_task(session.interrupt_event.wait()),
},
return_when=asyncio.FIRST_COMPLETED,
)
if session.interrupt_event.is_set():
logger.info("[Live Chat] 检测到用户打断")
await websocket.send_json({"t": "stop_play"})
await self._save_interrupted_message(session, user_text, bot_text)
while not back_queue.empty():
try:
back_queue.get_nowait()
except asyncio.QueueEmpty:
break
break
result_task = next(iter(done))
result = result_task.result()
if not result:
continue
...
finally:
session.is_processing = False
session.clear_interrupt()
```
This removes the timeout-based polling logic and consolidates interrupt behavior into a single `Event`, while preserving the same semantics.
### 4. Thin `_process_audio` by extracting the pipeline orchestration
You can keep WebSocket concerns in `_process_audio` and move “STT + queue + back_queue” into a separate helper/service:
```python
# new helper class
class LiveChatPipeline:
def __init__(self, plugin_manager, queue_mgr):
self.plugin_manager = plugin_manager
self.queue_mgr = queue_mgr
async def run(self, session: LiveChatSession, audio_path: str) -> tuple[str, asyncio.Queue, str]:
ctx = self.plugin_manager.context
stt_provider = ctx.provider_manager.stt_provider_insts[0]
if not stt_provider:
raise RuntimeError("STT Provider 未配置")
user_text = await stt_provider.get_text(audio_path)
if not user_text:
raise RuntimeError("STT 识别结果为空")
cid = session.conversation_id
queue = self.queue_mgr.get_or_create_queue(cid)
back_queue = self.queue_mgr.get_or_create_back_queue(cid)
message_id = str(uuid.uuid4())
payload = {
"message_id": message_id,
"message": [{"type": "plain", "text": user_text}],
"action_type": "live",
}
await queue.put((session.username, cid, payload))
return user_text, back_queue, message_id
```
Then `_process_audio` is mostly wiring + WebSocket messaging:
```python
async def _process_audio(self, session: LiveChatSession, audio_path: str):
try:
session.is_processing = True
session.clear_interrupt()
pipeline = LiveChatPipeline(self.plugin_manager, webchat_queue_mgr)
user_text, back_queue, message_id = await pipeline.run(session, audio_path)
await websocket.send_json({
"t": "user_msg",
"data": {"text": user_text, "ts": int(time.time() * 1000)},
})
# existing loop over back_queue, but now without STT/queue setup clutter
...
...
```
This keeps the rich behavior intact while making each component smaller and easier to evolve independently.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| message_id = self.message_obj.message_id | ||
| async for chain in generator: | ||
| # 处理音频流(Live Mode) | ||
| if chain.type == "audio_chunk": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (bug_risk): 在流式循环中访问 chain.type 之前,需要先防御性地处理 chain 可能为 None 的情况。
这个循环可能会产生 None(参见其他对 run_agent / run_live_agent 的用法),但当前分支假设 chain 永远非空并直接访问 chain.type,如果 chain 为 None 就会抛异常。请添加保护逻辑(例如在循环顶部加 if chain is None: continue,或者改为 if chain is not None and chain.type == "audio_chunk":),以避免运行时错误。
Original comment in English
issue (bug_risk): Guard against chain being None before accessing chain.type in streaming loop.
This loop can yield None (see other usages of run_agent/run_live_agent), but this branch assumes chain is always non-null and accesses chain.type. That will raise when chain is None. Please add a guard (e.g. if chain is None: continue at the top of the loop, or if chain is not None and chain.type == "audio_chunk":) to prevent a runtime error.
|
|
||
| # 1. STT - 语音转文字 | ||
| ctx = self.plugin_manager.context | ||
| stt_provider = ctx.provider_manager.stt_provider_insts[0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue: 在没有检查是否为空的前提下访问 stt_provider_insts[0],在未配置任何 STT provider 时可能会抛异常。
如果 stt_provider_insts 为空,这一行会在执行到 if not stt_provider 之前就抛出 IndexError,导致未处理的异常并关闭 websocket。建议先检查列表是否为空(例如 if not ctx.provider_manager.stt_provider_insts:),并在该情况下返回“未配置”的错误。
Original comment in English
issue: Accessing stt_provider_insts[0] without checking emptiness may raise when no STT provider is configured.
If stt_provider_insts is empty, this line will raise IndexError before the if not stt_provider check, causing an unhandled exception and closing the websocket. Consider first checking for an empty list (e.g. if not ctx.provider_manager.stt_provider_insts:) and returning the "未配置" error in that case.
| except Exception: | ||
| # 出错时也要发送 None 结束标记 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (bug_risk): 在 get_audio_stream 中吞掉所有异常会掩盖 TTS 故障,并且让调试变得困难。
这里捕获裸的 Exception 且不做任何处理,会隐藏真实的 TTS 或 IO 错误,使线上问题非常难排查。建议至少在发送 None 作为结束标记之前记录异常(包含堆栈),或者在日志记录后重新抛出异常,以便调用方/监控能看到失败。
建议实现如下:
except Exception:
# 记录异常以便排查 TTS 或 IO 问题,随后仍然发送结束标记
logger.exception(
"Error while generating or reading TTS audio for accumulated_text=%r",
accumulated_text,
)
# 发送结束标记
await audio_queue.put(None)
break为了让这段代码可以编译并符合你们现有的日志规范,你还需要:
-
在
astrbot/core/provider/provider.py中定义一个模块级 logger,例如:import logging logger = logging.getLogger(__name__)
-
如果该文件中已经有其它日志用法(例如
from astrbot.logger import logger等),请将这里的logger.exception(...)调整为使用现有的 logger 实例和风格。
Original comment in English
suggestion (bug_risk): Swallowing all exceptions in get_audio_stream hides TTS failures and makes debugging difficult.
Catching bare Exception and doing nothing here will hide real TTS or IO errors and make production issues very hard to trace. Please at least log the exception (with stack trace) before sending the None sentinel, or re-raise after logging so failures are visible to callers/monitoring.
Suggested implementation:
except Exception:
# 记录异常以便排查 TTS 或 IO 问题,随后仍然发送结束标记
logger.exception(
"Error while generating or reading TTS audio for accumulated_text=%r",
accumulated_text,
)
# 发送结束标记
await audio_queue.put(None)
breakTo make this compile and follow your existing logging conventions, you will also need to:
-
Ensure a module-level logger is defined in
astrbot/core/provider/provider.py, e.g.:import logging logger = logging.getLogger(__name__)
-
If you already have a different logging pattern in this file (e.g.
from astrbot.logger import loggeror similar), adjust thelogger.exception(...)call to use the existing logger instance and style.
| @stopRecording="handleStopRecording" | ||
| @pasteImage="handlePaste" | ||
| @fileSelect="handleFileSelect" | ||
| @openLiveMode="" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (bug_risk): 空的 @openLiveMode 事件处理器很可能是无意为之,并且会产生警告。
@openLiveMode="" 会创建一个空的事件处理函数,从而触发 Vue 警告。如果这里不支持 Live Mode,请移除该绑定;如果支持,请将其连接到实际的处理函数或向上传递事件,以便正确处理。
Original comment in English
issue (bug_risk): Empty @openLiveMode handler is likely unintended and will emit warnings.
@openLiveMode="" creates an empty event handler and will trigger a Vue warning. If Live Mode isn’t supported here, remove the binding; if it is, connect it to a real handler or emit so the event is properly processed.
| } | ||
| const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; | ||
| const wsUrl = `${protocol}//localhost:6185/api/live_chat/ws?token=${encodeURIComponent(token)}`; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (bug_risk): 在 WebSocket URL 中硬编码 localhost:6185 会导致非本地部署环境下功能异常。
这个硬编码的 localhost:6185 会在仪表盘通过不同的 host/port 或反向代理访问时导致 Live Mode 失效。请从 window.location(例如 window.location.host)或可配置的基础 URL 推导 WebSocket 的 host/port,以便在不同环境中都能正常工作。
Original comment in English
issue (bug_risk): Hardcoding localhost:6185 in the WebSocket URL breaks non-local deployments.
This hardcoded localhost:6185 will break Live Mode when the dashboard is accessed from a different host/port or via a reverse proxy. Please derive the WebSocket host/port from window.location (e.g. window.location.host) or a configurable base URL so it works across environments.
| ) | ||
|
|
||
| if streaming_response and not stream_to_general: | ||
| # 检测 Live Mode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (complexity): 建议将 Live Mode 的处理提取到独立的异步 helper 中,让 process 更专注于路由,并降低方法内部的分支复杂度。
Live Mode 相关的分支确实增加了 process 方法内部的复杂度。你可以在保持行为不变的前提下,通过抽取 Live Mode 相关逻辑到 helper 来减少分支。
一种方式是把所有 Live Mode 相关逻辑(包括历史记录保存)迁移到一个异步生成器 helper 中,并在 process 中委托调用:
# inside the class
async def _handle_live_mode(self, event, req, agent_runner):
logger.info("[Internal Agent] 检测到 Live Mode,启用 TTS 处理")
tts_provider = self.ctx.plugin_manager.context.get_using_tts_provider(
event.unified_msg_origin
)
if not tts_provider:
logger.warning("[Live Mode] TTS Provider 未配置,将使用普通流式模式")
event.set_result(
MessageEventResult()
.set_result_content_type(ResultContentType.STREAMING_RESULT)
.set_async_stream(
run_live_agent(
agent_runner,
tts_provider,
self.max_step,
self.show_tool_use,
show_reasoning=self.show_reasoning,
)
)
)
# mirror the original control flow: emit once, then post-process
yield
if not event.is_stopped() and agent_runner.done():
await self._save_to_history(
event,
req,
agent_runner.get_final_llm_resp(),
agent_runner.run_context.messages,
agent_runner.stats,
)然后在 process 中,让 Live Mode 分支变成简单的分发逻辑,使核心方法专注于路由:
action_type = event.get_extra("action_type")
if action_type == "live":
async for _ in self._handle_live_mode(event, req, agent_runner):
yield
elif streaming_response and not stream_to_general:
event.set_result(
MessageEventResult()
.set_result_content_type(ResultContentType.STREAMING_RESULT)
.set_async_stream(
run_agent(
agent_runner,
self.max_step,
self.show_tool_use,
show_reasoning=self.show_reasoning,
)
)
)
# existing yield / history handling remains unchanged这样可以在保留全部功能(包括日志、TTS provider 解析、流式行为以及流结束后的历史保存)的同时,降低 process 的认知负担,并将 Live Mode 专有的关注点隔离出来。
Original comment in English
issue (complexity): Consider extracting the Live Mode handling into a dedicated async helper to keep process focused on routing and reduce in-method branching complexity.
The Live Mode branch does increase the complexity of process in-place. You can keep the behavior identical while reducing branching by extracting the Live Mode handling into a helper.
One way is to move all Live Mode–specific logic (including the history save) into an async generator helper and delegate from process:
# inside the class
async def _handle_live_mode(self, event, req, agent_runner):
logger.info("[Internal Agent] 检测到 Live Mode,启用 TTS 处理")
tts_provider = self.ctx.plugin_manager.context.get_using_tts_provider(
event.unified_msg_origin
)
if not tts_provider:
logger.warning("[Live Mode] TTS Provider 未配置,将使用普通流式模式")
event.set_result(
MessageEventResult()
.set_result_content_type(ResultContentType.STREAMING_RESULT)
.set_async_stream(
run_live_agent(
agent_runner,
tts_provider,
self.max_step,
self.show_tool_use,
show_reasoning=self.show_reasoning,
)
)
)
# mirror the original control flow: emit once, then post-process
yield
if not event.is_stopped() and agent_runner.done():
await self._save_to_history(
event,
req,
agent_runner.get_final_llm_resp(),
agent_runner.run_context.messages,
agent_runner.stats,
)Then in process, the Live Mode branch becomes a simple dispatcher, keeping the core method focused on routing:
action_type = event.get_extra("action_type")
if action_type == "live":
async for _ in self._handle_live_mode(event, req, agent_runner):
yield
elif streaming_response and not stream_to_general:
event.set_result(
MessageEventResult()
.set_result_content_type(ResultContentType.STREAMING_RESULT)
.set_async_stream(
run_agent(
agent_runner,
self.max_step,
self.show_tool_use,
show_reasoning=self.show_reasoning,
)
)
)
# existing yield / history handling remains unchangedThis keeps all functionality (including logging, TTS provider resolution, streaming behavior, and post-stream history saving) while reducing the cognitive load in process and isolating Live Mode–specific concerns.
| from .route import Route, RouteContext | ||
|
|
||
|
|
||
| class LiveChatSession: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (complexity): 建议将音频解码、WAV 组装、中断处理以及整个管线编排拆分为更小的 helper,以便让 LiveChatSession、_handle_message 和 _process_audio 更聚焦、更易读。
可以在保持当前行为的前提下,通过将若干职责拆分为小 helper,并本地化状态/中断处理,来降低复杂度。
1. 从 _handle_message 中抽取 base64/音频处理
目前 _handle_message 同时处理协议和二进制解码。一个小 helper 可以让协议分支更清晰:
# new helper (same file or small utils module)
import base64
def decode_audio_chunk(message: dict) -> bytes | None:
audio_data_b64 = message.get("data")
if not audio_data_b64:
return None
try:
return base64.b64decode(audio_data_b64)
except Exception as e:
logger.error(f"[Live Chat] 解码音频数据失败: {e}")
return None然后 _handle_message 可简化为:
elif msg_type == "speaking_part":
audio_data = decode_audio_chunk(message)
if audio_data is not None:
session.add_audio_frame(audio_data)这样可以避免内联 base64 逻辑,让消息协议的分支更易于理解。
2. 将 WAV 组装逻辑从 LiveChatSession 中移出
LiveChatSession 目前同时负责会话状态和音频文件 I/O。一个小的 AudioAssembler 可以让会话类更简洁:
# new helper
class AudioAssembler:
SAMPLE_RATE = 16000
SAMPLE_WIDTH = 2
CHANNELS = 1
@staticmethod
def assemble_wav(frames: list[bytes]) -> str:
temp_dir = os.path.join(get_astrbot_data_path(), "temp")
os.makedirs(temp_dir, exist_ok=True)
audio_path = os.path.join(temp_dir, f"live_audio_{uuid.uuid4()}.wav")
with wave.open(audio_path, "wb") as wav_file:
wav_file.setnchannels(AudioAssembler.CHANNELS)
wav_file.setsampwidth(AudioAssembler.SAMPLE_WIDTH)
wav_file.setframerate(AudioAssembler.SAMPLE_RATE)
for frame in frames:
wav_file.writeframes(frame)
return audio_path在 LiveChatSession.end_speaking 中使用:
from .audio import AudioAssembler # wherever you place it
async def end_speaking(self, stamp: str) -> str | None:
...
if not self.audio_frames:
logger.warning("[Live Chat] 没有音频帧数据")
return None
try:
audio_path = AudioAssembler.assemble_wav(self.audio_frames)
self.temp_audio_path = audio_path
logger.info(
f"[Live Chat] 音频文件已保存: {audio_path}, 大小: {os.path.getsize(audio_path)} bytes"
)
return audio_path
...这样可以让 LiveChatSession 更专注于会话状态,而不是文件格式细节。
3. 本地化处理状态,并使用中断事件
is_processing 和 should_interrupt 主要在 _process_audio 内部使用。你可以保持对外 API 仍然是 session.request_interrupt(),但在内部隐藏这些标志:
class LiveChatSession:
def __init__(...):
...
self._interrupt_event = asyncio.Event()
def request_interrupt(self):
self._interrupt_event.set()
def clear_interrupt(self):
self._interrupt_event.clear()
@property
def interrupt_event(self) -> asyncio.Event:
return self._interrupt_event在 LiveChatRoute 中使用:
elif msg_type == "interrupt":
session.request_interrupt()
logger.info(f"[Live Chat] 用户打断: {session.username}")在 _process_audio 中替换轮询 + should_interrupt 标志:
async def _process_audio(self, session: LiveChatSession, audio_path: str):
try:
session.is_processing = True
session.clear_interrupt()
...
back_queue = webchat_queue_mgr.get_or_create_back_queue(cid)
bot_text = ""
audio_playing = False
while True:
# wait either for new back_queue data or interrupt
done, _ = await asyncio.wait(
{
asyncio.create_task(back_queue.get()),
asyncio.create_task(session.interrupt_event.wait()),
},
return_when=asyncio.FIRST_COMPLETED,
)
if session.interrupt_event.is_set():
logger.info("[Live Chat] 检测到用户打断")
await websocket.send_json({"t": "stop_play"})
await self._save_interrupted_message(session, user_text, bot_text)
while not back_queue.empty():
try:
back_queue.get_nowait()
except asyncio.QueueEmpty:
break
break
result_task = next(iter(done))
result = result_task.result()
if not result:
continue
...
finally:
session.is_processing = False
session.clear_interrupt()这样可以移除基于超时的轮询逻辑,将中断行为统一收敛到一个 Event 上,同时保持语义不变。
4. 通过抽取管线编排来瘦身 _process_audio
可以把 _process_audio 中的 WebSocket 相关逻辑保留在原方法里,将“STT + queue + back_queue” 的逻辑抽取为单独的 helper/service:
# new helper class
class LiveChatPipeline:
def __init__(self, plugin_manager, queue_mgr):
self.plugin_manager = plugin_manager
self.queue_mgr = queue_mgr
async def run(self, session: LiveChatSession, audio_path: str) -> tuple[str, asyncio.Queue, str]:
ctx = self.plugin_manager.context
stt_provider = ctx.provider_manager.stt_provider_insts[0]
if not stt_provider:
raise RuntimeError("STT Provider 未配置")
user_text = await stt_provider.get_text(audio_path)
if not user_text:
raise RuntimeError("STT 识别结果为空")
cid = session.conversation_id
queue = self.queue_mgr.get_or_create_queue(cid)
back_queue = self.queue_mgr.get_or_create_back_queue(cid)
message_id = str(uuid.uuid4())
payload = {
"message_id": message_id,
"message": [{"type": "plain", "text": user_text}],
"action_type": "live",
}
await queue.put((session.username, cid, payload))
return user_text, back_queue, message_id这样 _process_audio 就主要是接线 + WebSocket 消息发送:
async def _process_audio(self, session: LiveChatSession, audio_path: str):
try:
session.is_processing = True
session.clear_interrupt()
pipeline = LiveChatPipeline(self.plugin_manager, webchat_queue_mgr)
user_text, back_queue, message_id = await pipeline.run(session, audio_path)
await websocket.send_json({
"t": "user_msg",
"data": {"text": user_text, "ts": int(time.time() * 1000)},
})
# existing loop over back_queue, but now without STT/queue setup clutter
...
...这样既保留了完整的行为,又让每个组件更小、更易于独立演进。
Original comment in English
issue (complexity): Consider extracting audio decoding, WAV assembly, interrupt handling, and pipeline orchestration into small helpers to keep LiveChatSession, _handle_message, and _process_audio focused and easier to read.
You can keep the current behavior but reduce complexity by pulling a few responsibilities into small helpers, and by localizing state/interrupt handling.
1. Extract base64/audio handling from _handle_message
Right now _handle_message mixes protocol and binary decoding. A tiny helper keeps protocol handling readable:
# new helper (same file or small utils module)
import base64
def decode_audio_chunk(message: dict) -> bytes | None:
audio_data_b64 = message.get("data")
if not audio_data_b64:
return None
try:
return base64.b64decode(audio_data_b64)
except Exception as e:
logger.error(f"[Live Chat] 解码音频数据失败: {e}")
return NoneThen _handle_message becomes:
elif msg_type == "speaking_part":
audio_data = decode_audio_chunk(message)
if audio_data is not None:
session.add_audio_frame(audio_data)This avoids inline base64 and makes the message protocol switch easier to follow.
2. Move WAV assembly out of LiveChatSession
LiveChatSession currently manages both conversational state and audio file I/O. A small AudioAssembler keeps the session simple:
# new helper
class AudioAssembler:
SAMPLE_RATE = 16000
SAMPLE_WIDTH = 2
CHANNELS = 1
@staticmethod
def assemble_wav(frames: list[bytes]) -> str:
temp_dir = os.path.join(get_astrbot_data_path(), "temp")
os.makedirs(temp_dir, exist_ok=True)
audio_path = os.path.join(temp_dir, f"live_audio_{uuid.uuid4()}.wav")
with wave.open(audio_path, "wb") as wav_file:
wav_file.setnchannels(AudioAssembler.CHANNELS)
wav_file.setsampwidth(AudioAssembler.SAMPLE_WIDTH)
wav_file.setframerate(AudioAssembler.SAMPLE_RATE)
for frame in frames:
wav_file.writeframes(frame)
return audio_pathUse in LiveChatSession.end_speaking:
from .audio import AudioAssembler # wherever you place it
async def end_speaking(self, stamp: str) -> str | None:
...
if not self.audio_frames:
logger.warning("[Live Chat] 没有音频帧数据")
return None
try:
audio_path = AudioAssembler.assemble_wav(self.audio_frames)
self.temp_audio_path = audio_path
logger.info(
f"[Live Chat] 音频文件已保存: {audio_path}, 大小: {os.path.getsize(audio_path)} bytes"
)
return audio_path
...This keeps LiveChatSession focused on session state instead of file format details.
3. Localize processing state and use an interrupt event
is_processing and should_interrupt are mostly used inside _process_audio. You can keep the external API as session.request_interrupt() while hiding internal flags:
class LiveChatSession:
def __init__(...):
...
self._interrupt_event = asyncio.Event()
def request_interrupt(self):
self._interrupt_event.set()
def clear_interrupt(self):
self._interrupt_event.clear()
@property
def interrupt_event(self) -> asyncio.Event:
return self._interrupt_eventUse this in LiveChatRoute:
elif msg_type == "interrupt":
session.request_interrupt()
logger.info(f"[Live Chat] 用户打断: {session.username}")And in _process_audio, replace polling + should_interrupt flag:
async def _process_audio(self, session: LiveChatSession, audio_path: str):
try:
session.is_processing = True
session.clear_interrupt()
...
back_queue = webchat_queue_mgr.get_or_create_back_queue(cid)
bot_text = ""
audio_playing = False
while True:
# wait either for new back_queue data or interrupt
done, _ = await asyncio.wait(
{
asyncio.create_task(back_queue.get()),
asyncio.create_task(session.interrupt_event.wait()),
},
return_when=asyncio.FIRST_COMPLETED,
)
if session.interrupt_event.is_set():
logger.info("[Live Chat] 检测到用户打断")
await websocket.send_json({"t": "stop_play"})
await self._save_interrupted_message(session, user_text, bot_text)
while not back_queue.empty():
try:
back_queue.get_nowait()
except asyncio.QueueEmpty:
break
break
result_task = next(iter(done))
result = result_task.result()
if not result:
continue
...
finally:
session.is_processing = False
session.clear_interrupt()This removes the timeout-based polling logic and consolidates interrupt behavior into a single Event, while preserving the same semantics.
4. Thin _process_audio by extracting the pipeline orchestration
You can keep WebSocket concerns in _process_audio and move “STT + queue + back_queue” into a separate helper/service:
# new helper class
class LiveChatPipeline:
def __init__(self, plugin_manager, queue_mgr):
self.plugin_manager = plugin_manager
self.queue_mgr = queue_mgr
async def run(self, session: LiveChatSession, audio_path: str) -> tuple[str, asyncio.Queue, str]:
ctx = self.plugin_manager.context
stt_provider = ctx.provider_manager.stt_provider_insts[0]
if not stt_provider:
raise RuntimeError("STT Provider 未配置")
user_text = await stt_provider.get_text(audio_path)
if not user_text:
raise RuntimeError("STT 识别结果为空")
cid = session.conversation_id
queue = self.queue_mgr.get_or_create_queue(cid)
back_queue = self.queue_mgr.get_or_create_back_queue(cid)
message_id = str(uuid.uuid4())
payload = {
"message_id": message_id,
"message": [{"type": "plain", "text": user_text}],
"action_type": "live",
}
await queue.put((session.username, cid, payload))
return user_text, back_queue, message_idThen _process_audio is mostly wiring + WebSocket messaging:
async def _process_audio(self, session: LiveChatSession, audio_path: str):
try:
session.is_processing = True
session.clear_interrupt()
pipeline = LiveChatPipeline(self.plugin_manager, webchat_queue_mgr)
user_text, back_queue, message_id = await pipeline.run(session, audio_path)
await websocket.send_json({
"t": "user_msg",
"data": {"text": user_text, "ts": int(time.time() * 1000)},
})
# existing loop over back_queue, but now without STT/queue setup clutter
...
...This keeps the rich behavior intact while making each component smaller and easier to evolve independently.
Modifications / 改动点
Screenshots or Test Results / 运行截图或测试结果
Checklist / 检查清单
requirements.txt和pyproject.toml文件相应位置。/ I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations inrequirements.txtandpyproject.toml.Summary by Sourcery
在网页聊天体验和后端中新增语音驱动的 Live Chat 模式与流式 TTS(文本转语音),包括基于 WebSocket 的实时会话以及基于 VAD 的麦克风处理。
新功能:
增强:
构建:
Original summary in English
Summary by Sourcery
Add a new voice-driven Live Chat mode with streaming TTS to the web chat experience and backend, including WebSocket-based live sessions and VAD-powered microphone handling.
New Features:
Enhancements:
Build: