Skip to content

Commit 8d85e02

Browse files
Merge pull request #44 from AnkushMalaker/faster-tests
Fix crucial bug in processor status endpoint and also make tests faster
2 parents 106dbbb + 89bc9fa commit 8d85e02

File tree

4 files changed

+177
-10
lines changed

4 files changed

+177
-10
lines changed

backends/advanced-backend/Docs/quickstart.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ At the moment, the basic functionalities are:
2424

2525
## Quick Start
2626

27+
### 0. Forbidden Knowledge
28+
This is a secret command for the daring. It runs a full end to end tests with a lot of logging. Folling that is probably THE quickest way to understand whats happening end to end.
29+
`source .env && export DEEPGRAM_API_KEY OPENAI_API_KEY && uv run pytest tests/test_integration.py -vv -s --log-cli-level=INFO --log-cli-format='%(asctime)s - %(levelname)s - %(message)s'`
30+
31+
2732
### 1. Environment Setup
2833

2934
Copy the `.env.template` file to `.env` and configure the required values:

backends/advanced-backend/src/advanced_omi_backend/processors.py

Lines changed: 85 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -680,6 +680,15 @@ async def _memory_processor(self):
680680
async def _process_memory_item(self, item: MemoryProcessingItem):
681681
"""Process a single memory item."""
682682
start_time = time.time()
683+
audio_logger.info(f"🚀 MEMORY PROCESSING STARTED for {item.audio_uuid} at {start_time}")
684+
685+
# Track memory processing start
686+
self.track_processing_stage(
687+
item.client_id,
688+
"memory",
689+
"started",
690+
{"audio_uuid": item.audio_uuid, "started_at": start_time},
691+
)
683692

684693
# Debug tracking removed for cleaner architecture
685694
# tracker = get_debug_tracker()
@@ -777,6 +786,18 @@ async def _process_memory_item(self, item: MemoryProcessingItem):
777786
)
778787
except Exception as e:
779788
audio_logger.warning(f"Failed to update memory status: {e}")
789+
790+
# Track memory processing completion
791+
self.track_processing_stage(
792+
item.client_id,
793+
"memory",
794+
"completed",
795+
{
796+
"audio_uuid": item.audio_uuid,
797+
"memories_created": len(created_memory_ids),
798+
"processing_time": time.time() - start_time,
799+
},
800+
)
780801
elif success and not created_memory_ids:
781802
# Successful processing but no memories created (likely empty transcript)
782803
audio_logger.info(
@@ -793,6 +814,19 @@ async def _process_memory_item(self, item: MemoryProcessingItem):
793814
)
794815
except Exception as e:
795816
audio_logger.warning(f"Failed to update memory status: {e}")
817+
818+
# Track memory processing completion (even though no memories created)
819+
self.track_processing_stage(
820+
item.client_id,
821+
"memory",
822+
"completed",
823+
{
824+
"audio_uuid": item.audio_uuid,
825+
"memories_created": 0,
826+
"processing_time": time.time() - start_time,
827+
"status": "skipped",
828+
},
829+
)
796830
else:
797831
# This shouldn't happen, but handle it gracefully
798832
audio_logger.warning(
@@ -813,6 +847,18 @@ async def _process_memory_item(self, item: MemoryProcessingItem):
813847
f"📝 Updated memory processing status to FAILED for {item.audio_uuid}"
814848
)
815849

850+
# Track memory processing failure
851+
self.track_processing_stage(
852+
item.client_id,
853+
"memory",
854+
"failed",
855+
{
856+
"audio_uuid": item.audio_uuid,
857+
"error": f"Unexpected result: success={success}, ids={created_memory_ids}",
858+
"processing_time": time.time() - start_time,
859+
},
860+
)
861+
816862
# Debug tracking removed for cleaner architecture
817863
# tracker.track_event(
818864
# transaction_id,
@@ -836,6 +882,18 @@ async def _process_memory_item(self, item: MemoryProcessingItem):
836882
f"📝 Updated memory processing status to FAILED for {item.audio_uuid}"
837883
)
838884

885+
# Track memory processing failure
886+
self.track_processing_stage(
887+
item.client_id,
888+
"memory",
889+
"failed",
890+
{
891+
"audio_uuid": item.audio_uuid,
892+
"error": "Memory service returned False",
893+
"processing_time": time.time() - start_time,
894+
},
895+
)
896+
839897
# Debug tracking removed for cleaner architecture
840898
# tracker.track_event(
841899
# transaction_id,
@@ -857,6 +915,18 @@ async def _process_memory_item(self, item: MemoryProcessingItem):
857915
except Exception as e:
858916
audio_logger.warning(f"Failed to update memory status: {e}")
859917

918+
# Track memory processing timeout failure
919+
self.track_processing_stage(
920+
item.client_id,
921+
"memory",
922+
"failed",
923+
{
924+
"audio_uuid": item.audio_uuid,
925+
"error": "Processing timeout (5 minutes)",
926+
"processing_time": time.time() - start_time,
927+
},
928+
)
929+
860930
# Debug tracking removed for cleaner architecture
861931
# tracker.track_event(
862932
# transaction_id,
@@ -877,6 +947,18 @@ async def _process_memory_item(self, item: MemoryProcessingItem):
877947
except Exception as repo_e:
878948
audio_logger.warning(f"Failed to update memory status: {repo_e}")
879949

950+
# Track memory processing exception failure
951+
self.track_processing_stage(
952+
item.client_id,
953+
"memory",
954+
"failed",
955+
{
956+
"audio_uuid": item.audio_uuid,
957+
"error": f"Exception: {str(e)}",
958+
"processing_time": time.time() - start_time,
959+
},
960+
)
961+
880962
# Debug tracking removed for cleaner architecture
881963
# tracker.track_event(
882964
# transaction_id,
@@ -886,9 +968,10 @@ async def _process_memory_item(self, item: MemoryProcessingItem):
886968
# metadata={"processing_time": time.time() - start_time},
887969
# )
888970

889-
processing_time_ms = (time.time() - start_time) * 1000
971+
end_time = time.time()
972+
processing_time_ms = (end_time - start_time) * 1000
890973
audio_logger.info(
891-
f"🔄 Completed memory processing for {item.audio_uuid} in {processing_time_ms:.1f}ms"
974+
f"🏁 MEMORY PROCESSING COMPLETED for {item.audio_uuid} in {processing_time_ms:.1f}ms (end time: {end_time})"
892975
)
893976

894977
async def _cropping_processor(self):

backends/advanced-backend/src/advanced_omi_backend/task_manager.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,13 @@ def _task_done(self, task_id: str):
111111
except Exception:
112112
task_info.error = "Unknown error"
113113
else:
114-
logger.debug(f"Task completed: {task_info.name}")
114+
# DEBUG: Add more visible logging for memory task completion
115+
if "memory_" in task_info.name:
116+
logger.info(
117+
f"✅ MEMORY TASK COMPLETED: {task_info.name} at {task_info.completed_at}"
118+
)
119+
else:
120+
logger.debug(f"Task completed: {task_info.name}")
115121

116122
# Move to completed list
117123
del self.tasks[task_id]
@@ -181,8 +187,18 @@ def get_active_tasks(self) -> List[TaskInfo]:
181187
return list(self.tasks.values())
182188

183189
def get_task_info(self, task_id: str) -> Optional[TaskInfo]:
184-
"""Get task info by task ID."""
185-
return self.tasks.get(task_id)
190+
"""Get task info by task ID from both active and completed tasks."""
191+
# First check active tasks
192+
task_info = self.tasks.get(task_id)
193+
if task_info:
194+
return task_info
195+
196+
# Then check completed tasks
197+
for completed_task in self.completed_tasks:
198+
if f"{completed_task.name}_{id(completed_task.task)}" == task_id:
199+
return completed_task
200+
201+
return None
186202

187203
def get_task_count_by_type(self) -> Dict[str, int]:
188204
"""Get count of active tasks grouped by type."""

backends/advanced-backend/tests/test_integration.py

Lines changed: 68 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858
# Test constants
5959
BACKEND_URL = "http://localhost:8001" # Test backend port
6060
TEST_AUDIO_PATH = tests_dir.parent.parent.parent / "extras/test-audios/DIY Experts Glass Blowing_16khz_mono_4min.wav"
61-
MAX_STARTUP_WAIT = 120 # seconds
61+
MAX_STARTUP_WAIT = 60 # seconds
6262
PROCESSING_TIMEOUT = 300 # seconds for audio processing (5 minutes)
6363

6464

@@ -851,10 +851,16 @@ def wait_for_memory_processing(self, client_id: str, timeout: int = 120):
851851

852852
if response.status_code == 200:
853853
data = response.json()
854+
855+
# DEBUG: Log full API response to see exactly what we're getting
856+
logger.info(f"🔍 Full processor status API response: {data}")
857+
854858
stages = data.get("stages", {})
855859

856860
# Check if memory stage is complete
857861
memory_stage = stages.get("memory", {})
862+
logger.info(f"🧠 Memory stage data: {memory_stage}")
863+
858864
if memory_stage.get("completed", False):
859865
logger.info(f"✅ Memory processing completed for client_id: {client_id}")
860866
memory_processing_complete = True
@@ -873,6 +879,8 @@ def wait_for_memory_processing(self, client_id: str, timeout: int = 120):
873879
error = stage_info.get("error")
874880
status = "✅" if completed else "❌" if error else "⏳"
875881
logger.info(f" {status} {stage_name}: {'completed' if completed else 'error' if error else 'processing'}")
882+
# DEBUG: Show all fields in memory stage
883+
logger.info(f" All memory stage fields: {stage_info}")
876884

877885
else:
878886
logger.warning(f"❌ Processor status API call failed with status: {response.status_code}")
@@ -937,6 +945,10 @@ def test_runner():
937945
def test_full_pipeline_integration(test_runner):
938946
"""Test the complete audio processing pipeline."""
939947
try:
948+
# Test timing tracking
949+
test_start_time = time.time()
950+
phase_times = {}
951+
940952
# Immediate logging to debug environment
941953
logger.info("=" * 80)
942954
logger.info("🚀 STARTING INTEGRATION TEST")
@@ -946,18 +958,54 @@ def test_full_pipeline_integration(test_runner):
946958
logger.info(f"CI environment: {os.environ.get('CI', 'NOT SET')}")
947959
logger.info(f"GITHUB_ACTIONS: {os.environ.get('GITHUB_ACTIONS', 'NOT SET')}")
948960

949-
# Setup
961+
# Phase 1: Environment setup
962+
phase_start = time.time()
963+
logger.info("📋 Phase 1: Setting up test environment...")
950964
test_runner.setup_environment()
965+
phase_times['env_setup'] = time.time() - phase_start
966+
logger.info(f"✅ Environment setup completed in {phase_times['env_setup']:.2f}s")
967+
968+
# Phase 2: Service startup
969+
phase_start = time.time()
970+
logger.info("🐳 Phase 2: Starting services...")
951971
test_runner.start_services()
972+
phase_times['service_startup'] = time.time() - phase_start
973+
logger.info(f"✅ Service startup completed in {phase_times['service_startup']:.2f}s")
974+
975+
# Phase 3: Wait for services
976+
phase_start = time.time()
977+
logger.info("⏳ Phase 3: Waiting for services to be ready...")
952978
test_runner.wait_for_services()
979+
phase_times['service_readiness'] = time.time() - phase_start
980+
logger.info(f"✅ Service readiness check completed in {phase_times['service_readiness']:.2f}s")
981+
982+
# Phase 4: Authentication
983+
phase_start = time.time()
984+
logger.info("🔑 Phase 4: Authentication...")
953985
test_runner.authenticate()
986+
phase_times['authentication'] = time.time() - phase_start
987+
logger.info(f"✅ Authentication completed in {phase_times['authentication']:.2f}s")
954988

955-
# Test audio processing
989+
# Phase 5: Audio upload and processing
990+
phase_start = time.time()
991+
logger.info("📤 Phase 5: Audio upload...")
956992
client_id = test_runner.upload_test_audio()
993+
phase_times['audio_upload'] = time.time() - phase_start
994+
logger.info(f"✅ Audio upload completed in {phase_times['audio_upload']:.2f}s")
995+
996+
# Phase 6: Transcription processing
997+
phase_start = time.time()
998+
logger.info("🎤 Phase 6: Transcription processing...")
957999
conversation, transcription = test_runner.verify_processing_results(client_id)
1000+
phase_times['transcription_processing'] = time.time() - phase_start
1001+
logger.info(f"✅ Transcription processing completed in {phase_times['transcription_processing']:.2f}s")
9581002

959-
# Validate memory extraction
1003+
# Phase 7: Memory extraction
1004+
phase_start = time.time()
1005+
logger.info("🧠 Phase 7: Memory extraction...")
9601006
memories = test_runner.validate_memory_extraction(client_id)
1007+
phase_times['memory_extraction'] = time.time() - phase_start
1008+
logger.info(f"✅ Memory extraction completed in {phase_times['memory_extraction']:.2f}s")
9611009

9621010
# Basic assertions
9631011
assert conversation is not None
@@ -1003,10 +1051,25 @@ def test_full_pipeline_integration(test_runner):
10031051
"""
10041052
assert False, error_msg
10051053

1006-
# Log success
1054+
# Calculate total test time
1055+
total_test_time = time.time() - test_start_time
1056+
phase_times['total_test'] = total_test_time
1057+
1058+
# Log success with detailed timing
10071059
logger.info("=" * 80)
10081060
logger.info("🎉 INTEGRATION TEST PASSED!")
10091061
logger.info("=" * 80)
1062+
logger.info(f"⏱️ TIMING BREAKDOWN:")
1063+
logger.info(f" 📋 Environment Setup: {phase_times['env_setup']:>6.2f}s")
1064+
logger.info(f" 🐳 Service Startup: {phase_times['service_startup']:>6.2f}s")
1065+
logger.info(f" ⏳ Service Readiness: {phase_times['service_readiness']:>6.2f}s")
1066+
logger.info(f" 🔑 Authentication: {phase_times['authentication']:>6.2f}s")
1067+
logger.info(f" 📤 Audio Upload: {phase_times['audio_upload']:>6.2f}s")
1068+
logger.info(f" 🎤 Transcription: {phase_times['transcription_processing']:>6.2f}s")
1069+
logger.info(f" 🧠 Memory Extraction: {phase_times['memory_extraction']:>6.2f}s")
1070+
logger.info(f" {'─' * 35}")
1071+
logger.info(f" 🏁 TOTAL TEST TIME: {total_test_time:>6.2f}s ({total_test_time/60:.1f}m)")
1072+
logger.info("")
10101073
logger.info(f"📊 Test Results:")
10111074
logger.info(f" ✅ Audio file processed successfully")
10121075
logger.info(f" ✅ Transcription generated: {len(transcription)} characters")

0 commit comments

Comments
 (0)