Skip to content

Commit 32e2e47

Browse files
committed
Refactor audio storage to MongoDB chunks and enhance cleanup settings management
- Replaced the legacy AudioFile model with AudioChunkDocument for storing audio data in MongoDB, optimizing storage and retrieval. - Introduced CleanupSettings dataclass for managing soft-deletion configurations, including auto-cleanup and retention days. - Added admin API routes for retrieving and saving cleanup settings, ensuring better control over data retention policies. - Updated audio processing workflows to utilize MongoDB chunks, removing dependencies on disk-based audio files. - Enhanced tests to validate the new audio chunk storage and cleanup functionalities, ensuring robust integration with existing systems.
1 parent 387385f commit 32e2e47

33 files changed

+3078
-539
lines changed

backends/advanced/src/advanced_omi_backend/app_factory.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,12 @@ async def lifespan(app: FastAPI):
5454
try:
5555
from beanie import init_beanie
5656
from advanced_omi_backend.models.conversation import Conversation
57-
from advanced_omi_backend.models.audio_file import AudioFile
57+
from advanced_omi_backend.models.audio_chunk import AudioChunkDocument
5858
from advanced_omi_backend.models.user import User
5959

6060
await init_beanie(
6161
database=config.db,
62-
document_models=[User, Conversation, AudioFile],
62+
document_models=[User, Conversation, AudioChunkDocument],
6363
)
6464
application_logger.info("Beanie initialized for all document models")
6565
except Exception as e:

backends/advanced/src/advanced_omi_backend/config.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99
import logging
1010
import os
1111
import shutil
12+
from dataclasses import dataclass, asdict
1213
from pathlib import Path
14+
from typing import Optional
1315

1416
logger = logging.getLogger(__name__)
1517

@@ -131,6 +133,101 @@ def save_diarization_settings_to_file(settings):
131133
return False
132134

133135

136+
# ============================================================================
137+
# Cleanup Settings (JSON file-based with in-memory caching)
138+
# ============================================================================
139+
140+
@dataclass
141+
class CleanupSettings:
142+
"""Cleanup configuration for soft-deleted conversations."""
143+
auto_cleanup_enabled: bool = False
144+
retention_days: int = 30
145+
146+
# Global cache for cleanup settings
147+
_cleanup_settings: Optional[CleanupSettings] = None
148+
149+
150+
def get_cleanup_config_path() -> Path:
151+
"""Get path to cleanup settings JSON file."""
152+
data_dir = Path(os.getenv("DATA_DIR", "/app/data"))
153+
data_dir.mkdir(parents=True, exist_ok=True)
154+
return data_dir / "cleanup_config.json"
155+
156+
157+
def load_cleanup_settings_from_file() -> CleanupSettings:
158+
"""
159+
Load cleanup settings from JSON file or return defaults.
160+
161+
Returns cached settings if available, otherwise loads from file.
162+
If file doesn't exist, returns default settings.
163+
"""
164+
global _cleanup_settings
165+
166+
# Return cached settings if available
167+
if _cleanup_settings is not None:
168+
return _cleanup_settings
169+
170+
config_path = get_cleanup_config_path()
171+
172+
# Try to load from file
173+
if config_path.exists():
174+
try:
175+
with open(config_path, "r") as f:
176+
data = json.load(f)
177+
_cleanup_settings = CleanupSettings(**data)
178+
logger.info(f"✅ Loaded cleanup settings: auto_cleanup={_cleanup_settings.auto_cleanup_enabled}, retention={_cleanup_settings.retention_days}d")
179+
return _cleanup_settings
180+
except Exception as e:
181+
logger.error(f"❌ Failed to load cleanup settings from {config_path}: {e}")
182+
183+
# Return defaults if file doesn't exist or failed to load
184+
_cleanup_settings = CleanupSettings()
185+
logger.info("Using default cleanup settings (auto_cleanup_enabled=False, retention_days=30)")
186+
return _cleanup_settings
187+
188+
189+
def save_cleanup_settings_to_file(settings: CleanupSettings) -> None:
190+
"""
191+
Save cleanup settings to JSON file and update in-memory cache.
192+
193+
Args:
194+
settings: CleanupSettings to persist
195+
196+
Raises:
197+
Exception: If file write fails
198+
"""
199+
global _cleanup_settings
200+
201+
config_path = get_cleanup_config_path()
202+
203+
try:
204+
# Save to JSON file
205+
with open(config_path, "w") as f:
206+
json.dump(asdict(settings), f, indent=2)
207+
208+
# Update in-memory cache
209+
_cleanup_settings = settings
210+
211+
logger.info(f"✅ Saved cleanup settings: auto_cleanup={settings.auto_cleanup_enabled}, retention={settings.retention_days}d")
212+
except Exception as e:
213+
logger.error(f"❌ Failed to save cleanup settings to {config_path}: {e}")
214+
raise
215+
216+
217+
def get_cleanup_settings() -> dict:
218+
"""
219+
Get current cleanup settings as dict (for API responses).
220+
221+
Returns:
222+
Dict with auto_cleanup_enabled and retention_days
223+
"""
224+
settings = load_cleanup_settings_from_file()
225+
return {
226+
"auto_cleanup_enabled": settings.auto_cleanup_enabled,
227+
"retention_days": settings.retention_days,
228+
}
229+
230+
134231
def get_speech_detection_settings():
135232
"""Get speech detection settings from environment or defaults."""
136233

backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py

Lines changed: 31 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717

1818
from advanced_omi_backend.utils.audio_utils import (
1919
AudioValidationError,
20-
write_audio_file,
20+
validate_and_prepare_audio,
2121
)
22+
from advanced_omi_backend.utils.audio_chunk_utils import convert_audio_to_chunks
2223
from advanced_omi_backend.models.job import JobPriority
2324
from advanced_omi_backend.models.user import User
2425
from advanced_omi_backend.models.conversation import create_conversation
@@ -86,33 +87,19 @@ async def upload_and_process_audio_files(
8687
# Generate audio UUID and timestamp
8788
if source == "gdrive":
8889
audio_uuid = getattr(file, "audio_uuid", None)
89-
if not audio_uuid:
90+
if not audio_uuid:
9091
audio_logger.error(f"Missing audio_uuid for gdrive file: {file.filename}")
91-
audio_uuid = str(uuid.uuid4())
92-
else:
92+
audio_uuid = str(uuid.uuid4())
93+
else:
9394
audio_uuid = str(uuid.uuid4())
9495
timestamp = int(time.time() * 1000)
9596

96-
# Determine output directory (with optional subfolder)
97-
from advanced_omi_backend.config import CHUNK_DIR
98-
if folder:
99-
chunk_dir = CHUNK_DIR / folder
100-
chunk_dir.mkdir(parents=True, exist_ok=True)
101-
else:
102-
chunk_dir = CHUNK_DIR
103-
104-
# Validate, write audio file and create AudioSession (all in one)
97+
# Validate and prepare audio (read format from WAV file)
10598
try:
106-
relative_audio_path, file_path, duration = await write_audio_file(
107-
raw_audio_data=content,
108-
audio_uuid=audio_uuid,
109-
source=source,
110-
client_id=client_id,
111-
user_id=user.user_id,
112-
user_email=user.email,
113-
timestamp=timestamp,
114-
chunk_dir=chunk_dir,
115-
validate=True, # Validate WAV format, convert stereo→mono
99+
audio_data, sample_rate, sample_width, channels, duration = await validate_and_prepare_audio(
100+
audio_data=content,
101+
expected_sample_rate=16000, # Expecting 16kHz
102+
convert_to_mono=True # Convert stereo to mono
116103
)
117104
except AudioValidationError as e:
118105
processed_files.append({
@@ -123,7 +110,7 @@ async def upload_and_process_audio_files(
123110
continue
124111

125112
audio_logger.info(
126-
f"📊 {file.filename}: {duration:.1f}s {relative_audio_path}"
113+
f"📊 {file.filename}: {duration:.1f}s ({sample_rate}Hz, {channels}ch, {sample_width} bytes/sample)"
127114
)
128115

129116
# Create conversation immediately for uploaded files (conversation_id auto-generated)
@@ -139,20 +126,37 @@ async def upload_and_process_audio_files(
139126
title=title,
140127
summary="Processing uploaded audio file..."
141128
)
142-
# Use the relative path returned by write_audio_file (already includes folder prefix if applicable)
143-
conversation.audio_path = relative_audio_path
144129
await conversation.insert()
145130
conversation_id = conversation.conversation_id # Get the auto-generated ID
146131

147132
audio_logger.info(f"📝 Created conversation {conversation_id} for uploaded file")
148133

134+
# Convert audio directly to MongoDB chunks
135+
try:
136+
num_chunks = await convert_audio_to_chunks(
137+
conversation_id=conversation_id,
138+
audio_data=audio_data,
139+
sample_rate=sample_rate,
140+
channels=channels,
141+
sample_width=sample_width,
142+
)
143+
audio_logger.info(
144+
f"📦 Converted uploaded file to {num_chunks} MongoDB chunks "
145+
f"(conversation {conversation_id[:12]})"
146+
)
147+
except Exception as chunk_error:
148+
audio_logger.error(
149+
f"Failed to convert uploaded file to chunks: {chunk_error}",
150+
exc_info=True
151+
)
152+
149153
# Enqueue post-conversation processing job chain
150154
from advanced_omi_backend.controllers.queue_controller import start_post_conversation_jobs
151155

152156
job_ids = start_post_conversation_jobs(
153157
conversation_id=conversation_id,
154158
audio_uuid=audio_uuid,
155-
audio_file_path=file_path,
159+
audio_file_path=None, # No file path - using MongoDB chunks
156160
user_id=user.user_id,
157161
post_transcription=True, # Run batch transcription for uploads
158162
client_id=client_id # Pass client_id for UI tracking
@@ -217,45 +221,3 @@ async def upload_and_process_audio_files(
217221
return JSONResponse(
218222
status_code=500, content={"error": f"File upload failed: {str(e)}"}
219223
)
220-
221-
222-
async def get_conversation_audio_path(conversation_id: str, user: User) -> Path:
223-
"""
224-
Get the file path for a conversation's audio file.
225-
226-
Args:
227-
conversation_id: The conversation ID
228-
user: The authenticated user
229-
230-
Returns:
231-
Path object for the audio file
232-
233-
Raises:
234-
ValueError: If conversation not found, access denied, or audio file not available
235-
"""
236-
# Get conversation by conversation_id (UUID field, not _id)
237-
conversation = await Conversation.find_one(Conversation.conversation_id == conversation_id)
238-
239-
if not conversation:
240-
raise ValueError("Conversation not found")
241-
242-
# Check ownership (admins can access all files)
243-
if not user.is_superuser and conversation.user_id != str(user.user_id):
244-
raise ValueError("Access denied")
245-
246-
# Get the audio path
247-
audio_path = conversation.audio_path
248-
249-
if not audio_path:
250-
raise ValueError(f"No audio file available for this conversation")
251-
252-
# Build full file path
253-
from advanced_omi_backend.app_config import get_audio_chunk_dir
254-
audio_dir = get_audio_chunk_dir()
255-
file_path = audio_dir / audio_path
256-
257-
# Check if file exists
258-
if not file_path.exists() or not file_path.is_file():
259-
raise ValueError("Audio file not found on disk")
260-
261-
return file_path

0 commit comments

Comments
 (0)